diff --git a/broker_sync/metrics.py b/broker_sync/metrics.py new file mode 100644 index 0000000..41566d8 --- /dev/null +++ b/broker_sync/metrics.py @@ -0,0 +1,51 @@ +"""Pushgateway client for broker-sync providers. + +One function: push a list of (metric, labels, value) tuples to Prometheus +Pushgateway under a given job name. Used by providers to surface per-run +drift / staleness / row counts that Prometheus can alert on. + +In-cluster URL: http://prometheus-prometheus-pushgateway.monitoring:9091/metrics +Pass via the ``pushgateway_url`` argument or the ``PUSHGATEWAY_URL`` env var. +""" +from __future__ import annotations + +import logging +import os +from collections.abc import Iterable + +import httpx + +log = logging.getLogger(__name__) + + +def _format_metric(name: str, labels: dict[str, str], value: float) -> str: + if labels: + body = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items())) + return f"{name}{{{body}}} {value}\n" + return f"{name} {value}\n" + + +async def push_pushgateway( + job: str, + metrics: Iterable[tuple[str, dict[str, str], float]], + pushgateway_url: str | None = None, + transport: httpx.AsyncBaseTransport | None = None, +) -> None: + """POST text-format metrics to Pushgateway under ``job``. + + ``pushgateway_url`` falls back to the env var ``PUSHGATEWAY_URL``. + Raises ``RuntimeError`` if the URL is unset or POST returns non-2xx. + """ + url = pushgateway_url or os.environ.get("PUSHGATEWAY_URL") + if not url: + raise RuntimeError("PUSHGATEWAY_URL not set and no override provided") + body = "".join(_format_metric(n, lbls, v) for n, lbls, v in metrics) + target = f"{url.rstrip('/')}/job/{job}" + async with httpx.AsyncClient(transport=transport, timeout=15.0) as c: + resp = await c.post(target, content=body, headers={"Content-Type": "text/plain"}) + if resp.status_code >= 300: + raise RuntimeError( + f"pushgateway POST {target} returned HTTP {resp.status_code}: " + f"{resp.text[:200]}" + ) + log.info("pushgateway: pushed %d metrics to job=%s", len(body.splitlines()), job) diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..6a82012 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import httpx +import pytest + +from broker_sync.metrics import push_pushgateway + + +async def test_push_pushgateway_posts_text_format() -> None: + captured: dict[str, str] = {} + + def transport_handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["method"] = request.method + captured["body"] = request.content.decode("utf-8") + return httpx.Response(200) + + transport = httpx.MockTransport(transport_handler) + await push_pushgateway( + job="broker-sync-ibkr", + metrics=[ + ("ibkr_position_drift_shares", {"symbol": "VUAG.L"}, 0.0), + ("ibkr_sync_last_success_timestamp_seconds", {}, 1779830000.0), + ], + pushgateway_url="http://pg.example/metrics", + transport=transport, + ) + assert captured["method"] == "POST" + assert captured["url"] == "http://pg.example/metrics/job/broker-sync-ibkr" + body = captured["body"] + assert 'ibkr_position_drift_shares{symbol="VUAG.L"} 0.0' in body + assert "ibkr_sync_last_success_timestamp_seconds 1779830000.0" in body + + +async def test_push_pushgateway_raises_on_non_2xx() -> None: + transport = httpx.MockTransport(lambda r: httpx.Response(500, text="boom")) + with pytest.raises(RuntimeError, match="pushgateway.*500"): + await push_pushgateway( + job="x", + metrics=[("m", {}, 1.0)], + pushgateway_url="http://pg/metrics", + transport=transport, + ) + + +async def test_push_pushgateway_uses_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + captured: dict[str, str] = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + return httpx.Response(200) + + transport = httpx.MockTransport(handler) + monkeypatch.setenv("PUSHGATEWAY_URL", "http://from-env/metrics") + await push_pushgateway( + job="j", + metrics=[("m", {}, 1.0)], + transport=transport, + ) + assert captured["url"] == "http://from-env/metrics/job/j" + + +async def test_push_pushgateway_raises_when_url_missing(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("PUSHGATEWAY_URL", raising=False) + with pytest.raises(RuntimeError, match="PUSHGATEWAY_URL not set"): + await push_pushgateway(job="j", metrics=[("m", {}, 1.0)])