Init OTel metrics at module level in celery_app.py so prefork child processes inherit the MeterProvider and PrometheusMetricReader from the parent. Previously, worker_process_init created a separate MeterProvider in each child, disconnected from the HTTP server in the main process — so all scrape/celery/OCR metrics were silently lost. Update Grafana dashboard with API Performance and Frontend Performance sections, synced from the live cluster dashboard.
86 lines
2.8 KiB
Python
86 lines
2.8 KiB
Python
import sys
|
|
import time
|
|
from celery import Celery
|
|
from celery.signals import worker_ready, task_prerun, task_postrun
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
from logging_config import configure_logging
|
|
|
|
load_dotenv()
|
|
|
|
configure_logging(os.getenv("SERVICE_NAME", "celery-worker"))
|
|
|
|
app = Celery(
|
|
"celery_app",
|
|
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
|
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
|
|
include=["tasks.listing_tasks", "tasks.poi_tasks"],
|
|
)
|
|
|
|
app.conf.update(
|
|
task_serializer="json",
|
|
result_serializer="json",
|
|
accept_content=["json"],
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Celery metrics via prometheus_client
|
|
# ---------------------------------------------------------------------------
|
|
CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090"))
|
|
|
|
# Track task start times for duration measurement
|
|
_task_start_times: dict[str, float] = {}
|
|
|
|
# Initialise OTel metrics at module level so prefork children inherit the
|
|
# MeterProvider and PrometheusMetricReader. The prometheus_client collectors
|
|
# are registered in the default registry before fork, so child-process
|
|
# recordings are visible to the HTTP server started in the main process.
|
|
from api.metrics import init_metrics as _init_metrics # noqa: E402
|
|
_init_metrics(os.getenv("SERVICE_NAME", "celery-worker"))
|
|
|
|
|
|
@worker_ready.connect
|
|
def _start_metrics_server(**kwargs: object) -> None:
|
|
"""Start a lightweight Prometheus HTTP server in the main worker process."""
|
|
from prometheus_client import start_http_server
|
|
start_http_server(CELERY_METRICS_PORT)
|
|
|
|
|
|
@task_prerun.connect
|
|
def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None:
|
|
import api.metrics as m
|
|
task_name = getattr(task, "name", "unknown")
|
|
m.celery_tasks_active.add(1, {"task_name": task_name})
|
|
_task_start_times[task_id] = time.monotonic()
|
|
|
|
|
|
@task_postrun.connect
|
|
def _on_task_postrun(
|
|
task_id: str, task: object, state: str | None = None, **kwargs: object
|
|
) -> None:
|
|
import api.metrics as m
|
|
task_name = getattr(task, "name", "unknown")
|
|
status = state or "UNKNOWN"
|
|
|
|
m.celery_tasks_active.add(-1, {"task_name": task_name})
|
|
m.celery_tasks_total.add(1, {"task_name": task_name, "status": status})
|
|
|
|
start = _task_start_times.pop(task_id, None)
|
|
if start is not None:
|
|
m.celery_task_duration_seconds.record(
|
|
time.monotonic() - start, {"task_name": task_name}
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
with app.connection() as conn:
|
|
conn.ensure_connection(max_retries=0)
|
|
print("Broker connection OK")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"Broker connection failed: {e}")
|
|
sys.exit(1)
|