wrongmove/celery_app.py
Viktor Barzin 67d4ab3821
Fix celery metrics not reaching Prometheus and update Grafana dashboard
Init OTel metrics at module level in celery_app.py so prefork child
processes inherit the MeterProvider and PrometheusMetricReader from
the parent.  Previously, worker_process_init created a separate
MeterProvider in each child, disconnected from the HTTP server in
the main process — so all scrape/celery/OCR metrics were silently
lost.

Update Grafana dashboard with API Performance and Frontend Performance
sections, synced from the live cluster dashboard.
2026-02-22 17:58:20 +00:00

86 lines
2.8 KiB
Python

import sys
import time
from celery import Celery
from celery.signals import worker_ready, task_prerun, task_postrun
from dotenv import load_dotenv
import os
from logging_config import configure_logging
load_dotenv()
configure_logging(os.getenv("SERVICE_NAME", "celery-worker"))
app = Celery(
"celery_app",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
include=["tasks.listing_tasks", "tasks.poi_tasks"],
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
)
# ---------------------------------------------------------------------------
# Celery metrics via prometheus_client
# ---------------------------------------------------------------------------
CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090"))
# Track task start times for duration measurement
_task_start_times: dict[str, float] = {}
# Initialise OTel metrics at module level so prefork children inherit the
# MeterProvider and PrometheusMetricReader. The prometheus_client collectors
# are registered in the default registry before fork, so child-process
# recordings are visible to the HTTP server started in the main process.
from api.metrics import init_metrics as _init_metrics # noqa: E402
_init_metrics(os.getenv("SERVICE_NAME", "celery-worker"))
@worker_ready.connect
def _start_metrics_server(**kwargs: object) -> None:
"""Start a lightweight Prometheus HTTP server in the main worker process."""
from prometheus_client import start_http_server
start_http_server(CELERY_METRICS_PORT)
@task_prerun.connect
def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None:
import api.metrics as m
task_name = getattr(task, "name", "unknown")
m.celery_tasks_active.add(1, {"task_name": task_name})
_task_start_times[task_id] = time.monotonic()
@task_postrun.connect
def _on_task_postrun(
task_id: str, task: object, state: str | None = None, **kwargs: object
) -> None:
import api.metrics as m
task_name = getattr(task, "name", "unknown")
status = state or "UNKNOWN"
m.celery_tasks_active.add(-1, {"task_name": task_name})
m.celery_tasks_total.add(1, {"task_name": task_name, "status": status})
start = _task_start_times.pop(task_id, None)
if start is not None:
m.celery_task_duration_seconds.record(
time.monotonic() - start, {"task_name": task_name}
)
if __name__ == "__main__":
try:
with app.connection() as conn:
conn.ensure_connection(max_retries=0)
print("Broker connection OK")
sys.exit(0)
except Exception as e:
print(f"Broker connection failed: {e}")
sys.exit(1)