broker-sync/broker_sync/metrics.py

51 lines
1.9 KiB
Python

"""Pushgateway client for broker-sync providers.
One function: push a list of (metric, labels, value) tuples to Prometheus
Pushgateway under a given job name. Used by providers to surface per-run
drift / staleness / row counts that Prometheus can alert on.
In-cluster URL: http://prometheus-prometheus-pushgateway.monitoring:9091/metrics
Pass via the ``pushgateway_url`` argument or the ``PUSHGATEWAY_URL`` env var.
"""
from __future__ import annotations
import logging
import os
from collections.abc import Iterable
import httpx
log = logging.getLogger(__name__)
def _format_metric(name: str, labels: dict[str, str], value: float) -> str:
if labels:
body = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
return f"{name}{{{body}}} {value}\n"
return f"{name} {value}\n"
async def push_pushgateway(
job: str,
metrics: Iterable[tuple[str, dict[str, str], float]],
pushgateway_url: str | None = None,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
"""POST text-format metrics to Pushgateway under ``job``.
``pushgateway_url`` falls back to the env var ``PUSHGATEWAY_URL``.
Raises ``RuntimeError`` if the URL is unset or POST returns non-2xx.
"""
url = pushgateway_url or os.environ.get("PUSHGATEWAY_URL")
if not url:
raise RuntimeError("PUSHGATEWAY_URL not set and no override provided")
body = "".join(_format_metric(n, lbls, v) for n, lbls, v in metrics)
target = f"{url.rstrip('/')}/job/{job}"
async with httpx.AsyncClient(transport=transport, timeout=15.0) as c:
resp = await c.post(target, content=body, headers={"Content-Type": "text/plain"})
if resp.status_code >= 300:
raise RuntimeError(
f"pushgateway POST {target} returned HTTP {resp.status_code}: "
f"{resp.text[:200]}"
)
log.info("pushgateway: pushed %d metrics to job=%s", len(body.splitlines()), job)