infra/stacks/openclaw/files/exporter.py
Viktor Barzin ae70faf8be openclaw: realtime usage dashboard via Prometheus exporter sidecar
Stdlib-only Python exporter ($1) reads ~/.openclaw/agents/*/sessions/*.jsonl
(assistant messages with usage) plus auth-profiles.json (OAuth expiry,
Plus-tier label) and exposes Prometheus text format on :9099/metrics.
Container is python:3.12-slim; pod template gets prometheus.io/scrape
annotations so the existing kubernetes-pods job picks it up — no
ServiceMonitor needed.

Metrics exported:
  openclaw_codex_messages_total{provider,model,session_kind}    counter
  openclaw_codex_input/output/cache_read/cache_write_tokens_total
  openclaw_codex_message_errors_total{reason}
  openclaw_codex_active_sessions{kind}                          gauge
  openclaw_codex_oauth_expiry_seconds{provider,account,plan}    gauge
  openclaw_codex_last_run_timestamp                             gauge

Grafana dashboard "OpenClaw — Codex Usage" (Applications folder, 30s
refresh): messages/5h vs Plus rate-card, % of 1,200 floor, tokens/5h,
cache hit %, OAuth expiry days, active sessions, last-turn age, errors,
plus per-model timeseries + bar gauge + error table.

Plus rate-card thresholds in the gauge are conservative (1,200/5h floor;
real cap is dynamic 1,200–7,000). Re-baseline if throttling shows up
below 80%.
2026-05-07 09:04:25 +00:00

264 lines
10 KiB
Python

#!/usr/bin/env python3
"""OpenClaw / Codex usage exporter.
Reads ~/.openclaw/agents/*/sessions/*.jsonl (assistant messages with usage)
and ~/.openclaw/agents/*/agent/auth-state.json (OAuth profiles), then exposes
Prometheus text-format metrics on :9099/metrics. Stdlib only — no pip install
needed at startup.
Metrics (all cumulative-since-session-start; use Prometheus increase()/rate()
for windowed views):
openclaw_codex_messages_total{provider,model,session_kind} counter
openclaw_codex_input_tokens_total{provider,model} counter
openclaw_codex_output_tokens_total{provider,model} counter
openclaw_codex_cache_read_tokens_total{provider,model} counter
openclaw_codex_cache_write_tokens_total{provider,model} counter
openclaw_codex_message_errors_total{provider,model,reason} counter
openclaw_codex_active_sessions{kind} gauge
openclaw_codex_oauth_expiry_seconds{provider,account} gauge
openclaw_codex_last_run_timestamp gauge
openclaw_codex_exporter_scrape_duration_ms gauge
"""
import glob
import json
import os
import re
import time
from datetime import datetime
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Lock
OPENCLAW_HOME = os.environ.get("OPENCLAW_HOME", "/home/node/.openclaw")
PORT = int(os.environ.get("METRICS_PORT", "9099"))
CACHE_SEC = float(os.environ.get("CACHE_SEC", "5"))
SKIP_FRAGMENTS = (".broken.", ".reset.", ".deleted.", ".bak.")
SESSION_RE = re.compile(r"^([0-9a-f-]{36})\.jsonl$")
_lock = Lock()
_cache = {"text": "", "ts": 0.0}
def _esc(value: str) -> str:
return str(value).replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def _line(name: str, labels: dict, value) -> str:
if labels:
rendered = ",".join(f'{k}="{_esc(v)}"' for k, v in sorted(labels.items()))
return f"{name}{{{rendered}}} {value}"
return f"{name} {value}"
def _kind_for(session_id: str, sessions_index: dict) -> str:
for key, val in sessions_index.items():
if val.get("sessionId") != session_id:
continue
if key.startswith("agent:main:cron:"):
return "cron"
if key.startswith("telegram:slash:"):
return "telegram-slash"
if key.startswith("agent:main:"):
return "main"
surface = (val.get("origin") or {}).get("surface")
if surface:
return surface
return key.split(":", 1)[0]
return "unknown"
def _parse_ts(value):
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).timestamp()
except ValueError:
return 0.0
return 0.0
def _build_text() -> str:
start = time.monotonic()
out = []
sessions_index: dict = {}
for sp in glob.glob(os.path.join(OPENCLAW_HOME, "agents/*/sessions/sessions.json")):
try:
with open(sp) as f:
sessions_index.update(json.load(f))
except Exception:
pass
msg_count: dict = {}
in_tok: dict = {}
out_tok: dict = {}
cr_tok: dict = {}
cw_tok: dict = {}
err_count: dict = {}
latest_ts = 0.0
for jsonl in glob.glob(os.path.join(OPENCLAW_HOME, "agents/*/sessions/*.jsonl")):
bn = os.path.basename(jsonl)
if any(s in bn for s in SKIP_FRAGMENTS):
continue
m = SESSION_RE.match(bn)
if not m:
continue
sid = m.group(1)
kind = _kind_for(sid, sessions_index)
try:
with open(jsonl) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if obj.get("type") != "message":
continue
msg = obj.get("message") or {}
if msg.get("role") != "assistant":
continue
provider = msg.get("provider") or "unknown"
model = msg.get("model") or "unknown"
usage = msg.get("usage") or {}
ts = _parse_ts(obj.get("timestamp"))
if ts > latest_ts:
latest_ts = ts
if msg.get("stopReason") == "error":
reason = (msg.get("errorMessage") or "unknown")[:80]
ek = (provider, model, reason)
err_count[ek] = err_count.get(ek, 0) + 1
continue
mk = (provider, model, kind)
msg_count[mk] = msg_count.get(mk, 0) + 1
pm = (provider, model)
in_tok[pm] = in_tok.get(pm, 0) + (usage.get("input") or 0)
out_tok[pm] = out_tok.get(pm, 0) + (usage.get("output") or 0)
cr_tok[pm] = cr_tok.get(pm, 0) + (usage.get("cacheRead") or 0)
cw_tok[pm] = cw_tok.get(pm, 0) + (usage.get("cacheWrite") or 0)
except Exception:
pass
out.append("# HELP openclaw_codex_messages_total Cumulative assistant messages")
out.append("# TYPE openclaw_codex_messages_total counter")
for (p, mdl, k), c in msg_count.items():
out.append(_line("openclaw_codex_messages_total",
{"provider": p, "model": mdl, "session_kind": k}, c))
for name, src, hlp in [
("openclaw_codex_input_tokens_total", in_tok, "Cumulative input tokens"),
("openclaw_codex_output_tokens_total", out_tok, "Cumulative output tokens"),
("openclaw_codex_cache_read_tokens_total", cr_tok, "Cumulative cache-read tokens"),
("openclaw_codex_cache_write_tokens_total", cw_tok, "Cumulative cache-write tokens"),
]:
out.append(f"# HELP {name} {hlp}")
out.append(f"# TYPE {name} counter")
for (p, mdl), c in src.items():
out.append(_line(name, {"provider": p, "model": mdl}, c))
out.append("# HELP openclaw_codex_message_errors_total Cumulative assistant errors")
out.append("# TYPE openclaw_codex_message_errors_total counter")
for (p, mdl, r), c in err_count.items():
out.append(_line("openclaw_codex_message_errors_total",
{"provider": p, "model": mdl, "reason": r}, c))
out.append("# HELP openclaw_codex_active_sessions Active sessions in sessions.json")
out.append("# TYPE openclaw_codex_active_sessions gauge")
kc: dict = {}
for k in sessions_index:
if k.startswith("agent:main:cron:"):
kk = "cron"
elif k.startswith("telegram:slash:"):
kk = "telegram-slash"
elif k.startswith("agent:main:"):
kk = "main"
else:
kk = k.split(":", 1)[0]
kc[kk] = kc.get(kk, 0) + 1
for k, c in kc.items():
out.append(_line("openclaw_codex_active_sessions", {"kind": k}, c))
if latest_ts:
out.append("# HELP openclaw_codex_last_run_timestamp Unix ts of newest assistant message")
out.append("# TYPE openclaw_codex_last_run_timestamp gauge")
out.append(_line("openclaw_codex_last_run_timestamp", {}, latest_ts))
out.append("# HELP openclaw_codex_oauth_expiry_seconds Seconds until OAuth token expires")
out.append("# TYPE openclaw_codex_oauth_expiry_seconds gauge")
now = time.time()
for af in glob.glob(os.path.join(OPENCLAW_HOME, "agents/*/agent/auth-profiles.json")):
try:
with open(af) as f:
data = json.load(f)
except Exception:
continue
# Schema: {"version": 1, "profiles": {"<id>": {...}}}.
# `expires` is Unix milliseconds.
for profile in (data.get("profiles") or {}).values():
exp_ms = profile.get("expires")
if not isinstance(exp_ms, (int, float)):
continue
exp_ts = exp_ms / 1000.0
out.append(_line(
"openclaw_codex_oauth_expiry_seconds",
{
"provider": profile.get("provider", "unknown"),
"account": profile.get("email") or profile.get("account") or "unknown",
"plan": profile.get("chatgptPlanType") or "unknown",
},
max(0, exp_ts - now),
))
out.append("# HELP openclaw_codex_exporter_scrape_duration_ms Last scrape duration ms")
out.append("# TYPE openclaw_codex_exporter_scrape_duration_ms gauge")
out.append(_line("openclaw_codex_exporter_scrape_duration_ms", {},
(time.monotonic() - start) * 1000))
return "\n".join(out) + "\n"
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/healthz":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
return
if self.path != "/metrics":
self.send_response(404)
self.end_headers()
return
with _lock:
now = time.time()
if now - _cache["ts"] > CACHE_SEC:
try:
_cache["text"] = _build_text()
except Exception as exc: # noqa: BLE001
_cache["text"] = (
f'openclaw_codex_exporter_errors_total{{kind="scrape"}} 1\n'
f'# scrape error: {_esc(str(exc))[:200]}\n'
)
_cache["ts"] = now
body = _cache["text"].encode()
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, *args, **kwargs):
pass
def main():
print(f"openclaw exporter listening on :{PORT}", flush=True)
HTTPServer(("0.0.0.0", PORT), Handler).serve_forever()
if __name__ == "__main__":
main()