2025-06-22 21:18:52 +00:00
|
|
|
import sys
|
2026-02-14 10:59:12 +00:00
|
|
|
import time
|
2025-06-22 21:18:52 +00:00
|
|
|
from celery import Celery
|
2026-02-22 17:58:20 +00:00
|
|
|
from celery.signals import worker_ready, task_prerun, task_postrun
|
2025-06-22 21:18:52 +00:00
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
import os
|
|
|
|
|
|
2026-02-14 10:59:12 +00:00
|
|
|
from logging_config import configure_logging
|
|
|
|
|
|
2025-06-22 21:18:52 +00:00
|
|
|
load_dotenv()
|
|
|
|
|
|
2026-02-14 10:59:12 +00:00
|
|
|
configure_logging(os.getenv("SERVICE_NAME", "celery-worker"))
|
|
|
|
|
|
2025-06-22 21:18:52 +00:00
|
|
|
app = Celery(
|
|
|
|
|
"celery_app",
|
|
|
|
|
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
|
|
|
|
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
|
2026-02-08 13:14:47 +00:00
|
|
|
include=["tasks.listing_tasks", "tasks.poi_tasks"],
|
2025-06-22 21:18:52 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
app.conf.update(
|
|
|
|
|
task_serializer="json",
|
|
|
|
|
result_serializer="json",
|
|
|
|
|
accept_content=["json"],
|
|
|
|
|
timezone="UTC",
|
|
|
|
|
enable_utc=True,
|
|
|
|
|
)
|
|
|
|
|
|
2026-02-14 10:59:12 +00:00
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Celery metrics via prometheus_client
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090"))
|
|
|
|
|
|
|
|
|
|
# Track task start times for duration measurement
|
|
|
|
|
_task_start_times: dict[str, float] = {}
|
|
|
|
|
|
2026-02-22 17:58:20 +00:00
|
|
|
# Initialise OTel metrics at module level so prefork children inherit the
|
|
|
|
|
# MeterProvider and PrometheusMetricReader. The prometheus_client collectors
|
|
|
|
|
# are registered in the default registry before fork, so child-process
|
|
|
|
|
# recordings are visible to the HTTP server started in the main process.
|
|
|
|
|
from api.metrics import init_metrics as _init_metrics # noqa: E402
|
|
|
|
|
_init_metrics(os.getenv("SERVICE_NAME", "celery-worker"))
|
2026-02-15 12:58:59 +00:00
|
|
|
|
|
|
|
|
|
2026-02-14 10:59:12 +00:00
|
|
|
@worker_ready.connect
|
|
|
|
|
def _start_metrics_server(**kwargs: object) -> None:
|
2026-02-15 12:58:59 +00:00
|
|
|
"""Start a lightweight Prometheus HTTP server in the main worker process."""
|
2026-02-14 10:59:12 +00:00
|
|
|
from prometheus_client import start_http_server
|
|
|
|
|
start_http_server(CELERY_METRICS_PORT)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@task_prerun.connect
|
|
|
|
|
def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None:
|
2026-02-14 11:21:49 +00:00
|
|
|
import api.metrics as m
|
2026-02-14 10:59:12 +00:00
|
|
|
task_name = getattr(task, "name", "unknown")
|
2026-02-14 11:21:49 +00:00
|
|
|
m.celery_tasks_active.add(1, {"task_name": task_name})
|
2026-02-14 10:59:12 +00:00
|
|
|
_task_start_times[task_id] = time.monotonic()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@task_postrun.connect
|
|
|
|
|
def _on_task_postrun(
|
|
|
|
|
task_id: str, task: object, state: str | None = None, **kwargs: object
|
|
|
|
|
) -> None:
|
2026-02-14 11:21:49 +00:00
|
|
|
import api.metrics as m
|
2026-02-14 10:59:12 +00:00
|
|
|
task_name = getattr(task, "name", "unknown")
|
|
|
|
|
status = state or "UNKNOWN"
|
|
|
|
|
|
2026-02-14 11:21:49 +00:00
|
|
|
m.celery_tasks_active.add(-1, {"task_name": task_name})
|
|
|
|
|
m.celery_tasks_total.add(1, {"task_name": task_name, "status": status})
|
2026-02-14 10:59:12 +00:00
|
|
|
|
|
|
|
|
start = _task_start_times.pop(task_id, None)
|
|
|
|
|
if start is not None:
|
2026-02-14 11:21:49 +00:00
|
|
|
m.celery_task_duration_seconds.record(
|
2026-02-14 10:59:12 +00:00
|
|
|
time.monotonic() - start, {"task_name": task_name}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
2025-06-22 21:18:52 +00:00
|
|
|
if __name__ == "__main__":
|
|
|
|
|
try:
|
|
|
|
|
with app.connection() as conn:
|
|
|
|
|
conn.ensure_connection(max_retries=0)
|
|
|
|
|
print("Broker connection OK")
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Broker connection failed: {e}")
|
|
|
|
|
sys.exit(1)
|