wrongmove/celery_app.py

87 lines
2.8 KiB
Python
Raw Normal View History

2025-06-22 21:18:52 +00:00
import sys
import time
2025-06-22 21:18:52 +00:00
from celery import Celery
from celery.signals import worker_ready, task_prerun, task_postrun
2025-06-22 21:18:52 +00:00
from dotenv import load_dotenv
import os
from logging_config import configure_logging
2025-06-22 21:18:52 +00:00
load_dotenv()
configure_logging(os.getenv("SERVICE_NAME", "celery-worker"))
2025-06-22 21:18:52 +00:00
app = Celery(
"celery_app",
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
include=["tasks.listing_tasks", "tasks.poi_tasks"],
2025-06-22 21:18:52 +00:00
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
)
# ---------------------------------------------------------------------------
# Celery metrics via prometheus_client
# ---------------------------------------------------------------------------
CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090"))
# Track task start times for duration measurement
_task_start_times: dict[str, float] = {}
# Initialise OTel metrics at module level so prefork children inherit the
# MeterProvider and PrometheusMetricReader. The prometheus_client collectors
# are registered in the default registry before fork, so child-process
# recordings are visible to the HTTP server started in the main process.
from api.metrics import init_metrics as _init_metrics # noqa: E402
_init_metrics(os.getenv("SERVICE_NAME", "celery-worker"))
@worker_ready.connect
def _start_metrics_server(**kwargs: object) -> None:
"""Start a lightweight Prometheus HTTP server in the main worker process."""
from prometheus_client import start_http_server
start_http_server(CELERY_METRICS_PORT)
@task_prerun.connect
def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None:
import api.metrics as m
task_name = getattr(task, "name", "unknown")
m.celery_tasks_active.add(1, {"task_name": task_name})
_task_start_times[task_id] = time.monotonic()
@task_postrun.connect
def _on_task_postrun(
task_id: str, task: object, state: str | None = None, **kwargs: object
) -> None:
import api.metrics as m
task_name = getattr(task, "name", "unknown")
status = state or "UNKNOWN"
m.celery_tasks_active.add(-1, {"task_name": task_name})
m.celery_tasks_total.add(1, {"task_name": task_name, "status": status})
start = _task_start_times.pop(task_id, None)
if start is not None:
m.celery_task_duration_seconds.record(
time.monotonic() - start, {"task_name": task_name}
)
2025-06-22 21:18:52 +00:00
if __name__ == "__main__":
try:
with app.connection() as conn:
conn.ensure_connection(max_retries=0)
print("Broker connection OK")
sys.exit(0)
except Exception as e:
print(f"Broker connection failed: {e}")
sys.exit(1)