import sys import time from celery import Celery from celery.signals import worker_ready, task_prerun, task_postrun from dotenv import load_dotenv import os from logging_config import configure_logging load_dotenv() configure_logging(os.getenv("SERVICE_NAME", "celery-worker")) app = Celery( "celery_app", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"), include=["tasks.listing_tasks", "tasks.poi_tasks"], ) app.conf.update( task_serializer="json", result_serializer="json", accept_content=["json"], timezone="UTC", enable_utc=True, ) # --------------------------------------------------------------------------- # Celery metrics via prometheus_client # --------------------------------------------------------------------------- CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090")) # Track task start times for duration measurement _task_start_times: dict[str, float] = {} @worker_ready.connect def _start_metrics_server(**kwargs: object) -> None: """Start a lightweight Prometheus HTTP server in the worker process.""" from api.metrics import init_metrics init_metrics(os.getenv("SERVICE_NAME", "celery-worker")) from prometheus_client import start_http_server start_http_server(CELERY_METRICS_PORT) @task_prerun.connect def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None: import api.metrics as m task_name = getattr(task, "name", "unknown") m.celery_tasks_active.add(1, {"task_name": task_name}) _task_start_times[task_id] = time.monotonic() @task_postrun.connect def _on_task_postrun( task_id: str, task: object, state: str | None = None, **kwargs: object ) -> None: import api.metrics as m task_name = getattr(task, "name", "unknown") status = state or "UNKNOWN" m.celery_tasks_active.add(-1, {"task_name": task_name}) m.celery_tasks_total.add(1, {"task_name": task_name, "status": status}) start = _task_start_times.pop(task_id, None) if start is not None: m.celery_task_duration_seconds.record( time.monotonic() - start, {"task_name": task_name} ) if __name__ == "__main__": try: with app.connection() as conn: conn.ensure_connection(max_retries=0) print("Broker connection OK") sys.exit(0) except Exception as e: print(f"Broker connection failed: {e}") sys.exit(1)