diff --git a/api/app.py b/api/app.py index b76d4e5..5a677ba 100644 --- a/api/app.py +++ b/api/app.py @@ -39,12 +39,13 @@ from services.listing_cache import ( from repositories.poi_repository import POIRepository from repositories.user_repository import UserRepository from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from api.metrics import metrics_app -from opentelemetry.metrics import get_meter +from api.metrics import init_metrics, get_metrics_asgi_app, geojson_cache_operations +from logging_config import configure_logging load_dotenv() -logger = logging.getLogger("uvicorn") +configure_logging("api") +logger = logging.getLogger(__name__) DEFAULT_BATCH_SIZE = 50 _rate_limit_config = RateLimitConfig.from_env() @@ -99,16 +100,8 @@ app = FastAPI( app.include_router(passkey_router) app.include_router(poi_router) app.include_router(ws_router) -app.mount("/metrics", metrics_app) -meter = get_meter(__name__) -request_counter = meter.create_counter( - name="custom_request_count", - description="Number of times /hello was called", -) -hist = meter.create_histogram( - name="custom_request_duration", - description="Duration of /hello requests in seconds", -) +init_metrics("realestate-crawler-api") +app.mount("/metrics", get_metrics_asgi_app()) # Allow CORS (for React frontend) @@ -146,8 +139,6 @@ async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONR @app.get("/api/status") async def get_status() -> dict[str, str]: - request_counter.add(1, {"method": "GET", "path": "/status"}) - hist.record(1.5, {"method": "GET", "path": "/status"}) return {"status": "OK"} @@ -333,8 +324,10 @@ async def stream_listing_geojson( cached_count = get_cached_count(query_parameters) if cached_count is not None and cached_count > 0 and not include_poi_distances: + geojson_cache_operations.add(1, {"result": "hit"}) generator = _stream_from_cache(query_parameters, batch_size, limit) else: + geojson_cache_operations.add(1, {"result": "miss"}) generator = _stream_from_db( query_parameters, batch_size, limit, poi_distances_lookup, skip_cache=include_poi_distances, diff --git a/api/audit_middleware.py b/api/audit_middleware.py index 21f7ea9..e89f508 100644 --- a/api/audit_middleware.py +++ b/api/audit_middleware.py @@ -51,13 +51,15 @@ class AuditLogMiddleware(BaseHTTPMiddleware): duration_ms = (time.monotonic() - start) * 1000 audit_logger.info( - "method=%s path=%s query=%s user=%s ip=%s status=%d duration_ms=%.1f", - request.method, - path, - query, - identity, - ip, - response.status_code, - duration_ms, + "API request", + extra={ + "method": request.method, + "path": path, + "query": query, + "user": identity, + "ip": ip, + "status": response.status_code, + "duration_ms": round(duration_ms, 1), + }, ) return response diff --git a/api/metrics.py b/api/metrics.py index 1baae9e..51984ef 100644 --- a/api/metrics.py +++ b/api/metrics.py @@ -1,17 +1,152 @@ -# metrics.py -from opentelemetry.metrics import set_meter_provider +"""OpenTelemetry metrics with Prometheus export. + +Provides ``init_metrics()`` to lazily initialise the MeterProvider and all +business metric instruments. Safe to call from both the API and Celery +workers — the provider is created at most once per process. +""" +from __future__ import annotations + +from opentelemetry.metrics import ( + Counter, + Histogram, + Meter, + UpDownCounter, + get_meter, + set_meter_provider, +) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.exporter.prometheus import PrometheusMetricReader from prometheus_client import make_asgi_app -# Set up Prometheus reader and meter provider -reader = PrometheusMetricReader() -provider = MeterProvider( - metric_readers=[reader], - resource=Resource.create({SERVICE_NAME: "fastapi-metrics-app"}), -) -set_meter_provider(provider) +_reader: PrometheusMetricReader | None = None +_meter: Meter | None = None -# Expose the Prometheus metrics endpoint -metrics_app = make_asgi_app() # Exposes /metrics +# --------------------------------------------------------------------------- +# Scrape metrics +# --------------------------------------------------------------------------- +scrape_listings_found: Counter +scrape_listings_processed: Counter +scrape_listings_failed: Counter +scrape_duration_seconds: Histogram +scrape_pages_fetched: Counter +scrape_subqueries_total: Counter + +# --------------------------------------------------------------------------- +# Throttle / circuit-breaker metrics +# --------------------------------------------------------------------------- +throttle_events_total: Counter +# circuit_breaker_state is registered as an ObservableGauge in circuit_breaker.py + +# --------------------------------------------------------------------------- +# API / cache metrics +# --------------------------------------------------------------------------- +geojson_cache_operations: Counter + +# --------------------------------------------------------------------------- +# OCR metrics +# --------------------------------------------------------------------------- +ocr_attempts: Counter +ocr_successes: Counter + +# --------------------------------------------------------------------------- +# Celery task metrics +# --------------------------------------------------------------------------- +celery_tasks_total: Counter +celery_task_duration_seconds: Histogram +celery_tasks_active: UpDownCounter + + +def init_metrics(service_name: str = "realestate-crawler") -> PrometheusMetricReader: + """Initialise the OTel MeterProvider and define all instruments. + + Returns the ``PrometheusMetricReader`` so the API can mount the ASGI app. + Calling this more than once is a no-op (returns the existing reader). + """ + global _reader, _meter + global scrape_listings_found, scrape_listings_processed, scrape_listings_failed + global scrape_duration_seconds, scrape_pages_fetched, scrape_subqueries_total + global throttle_events_total + global geojson_cache_operations + global ocr_attempts, ocr_successes + global celery_tasks_total, celery_task_duration_seconds, celery_tasks_active + + if _reader is not None: + return _reader + + _reader = PrometheusMetricReader() + provider = MeterProvider( + metric_readers=[_reader], + resource=Resource.create({SERVICE_NAME: service_name}), + ) + set_meter_provider(provider) + _meter = get_meter(__name__) + + # -- Scrape -- + scrape_listings_found = _meter.create_counter( + "scrape_listings_found_total", + description="Total listings discovered during scrape runs", + ) + scrape_listings_processed = _meter.create_counter( + "scrape_listings_processed_total", + description="Total listings successfully processed", + ) + scrape_listings_failed = _meter.create_counter( + "scrape_listings_failed_total", + description="Total listings that failed processing", + ) + scrape_duration_seconds = _meter.create_histogram( + "scrape_duration_seconds", + description="Duration of a full scrape run in seconds", + ) + scrape_pages_fetched = _meter.create_counter( + "scrape_pages_fetched_total", + description="Total API pages fetched during scraping", + ) + scrape_subqueries_total = _meter.create_counter( + "scrape_subqueries_total", + description="Total subqueries executed after query splitting", + ) + + # -- Throttle -- + throttle_events_total = _meter.create_counter( + "throttle_events_total", + description="Total throttling events by type", + ) + + # -- Cache -- + geojson_cache_operations = _meter.create_counter( + "geojson_cache_operations_total", + description="GeoJSON cache operations (hit/miss)", + ) + + # -- OCR -- + ocr_attempts = _meter.create_counter( + "ocr_attempts_total", + description="Total OCR detection attempts", + ) + ocr_successes = _meter.create_counter( + "ocr_successes_total", + description="Total OCR detections that found square meters", + ) + + # -- Celery -- + celery_tasks_total = _meter.create_counter( + "celery_tasks_total", + description="Total Celery tasks by name and status", + ) + celery_task_duration_seconds = _meter.create_histogram( + "celery_task_duration_seconds", + description="Duration of Celery tasks in seconds", + ) + celery_tasks_active = _meter.create_up_down_counter( + "celery_tasks_active", + description="Currently active Celery tasks", + ) + + return _reader + + +def get_metrics_asgi_app(): # type: ignore[no-untyped-def] + """Return the Prometheus ASGI app for mounting at /metrics.""" + return make_asgi_app() diff --git a/celery_app.py b/celery_app.py index 933c3cb..31099b8 100644 --- a/celery_app.py +++ b/celery_app.py @@ -1,10 +1,16 @@ import sys +import time from celery import Celery +from celery.signals import worker_ready, task_prerun, task_postrun from dotenv import load_dotenv import os +from logging_config import configure_logging + load_dotenv() +configure_logging(os.getenv("SERVICE_NAME", "celery-worker")) + app = Celery( "celery_app", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), @@ -20,6 +26,51 @@ app.conf.update( enable_utc=True, ) +# --------------------------------------------------------------------------- +# Celery metrics via prometheus_client +# --------------------------------------------------------------------------- +CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090")) + +# Track task start times for duration measurement +_task_start_times: dict[str, float] = {} + + +@worker_ready.connect +def _start_metrics_server(**kwargs: object) -> None: + """Start a lightweight Prometheus HTTP server in the worker process.""" + from api.metrics import init_metrics + init_metrics(os.getenv("SERVICE_NAME", "celery-worker")) + + from prometheus_client import start_http_server + start_http_server(CELERY_METRICS_PORT) + + +@task_prerun.connect +def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None: + from api.metrics import celery_tasks_active + task_name = getattr(task, "name", "unknown") + celery_tasks_active.add(1, {"task_name": task_name}) + _task_start_times[task_id] = time.monotonic() + + +@task_postrun.connect +def _on_task_postrun( + task_id: str, task: object, state: str | None = None, **kwargs: object +) -> None: + from api.metrics import celery_tasks_total, celery_task_duration_seconds, celery_tasks_active + task_name = getattr(task, "name", "unknown") + status = state or "UNKNOWN" + + celery_tasks_active.add(-1, {"task_name": task_name}) + celery_tasks_total.add(1, {"task_name": task_name, "status": status}) + + start = _task_start_times.pop(task_id, None) + if start is not None: + celery_task_duration_seconds.record( + time.monotonic() - start, {"task_name": task_name} + ) + + if __name__ == "__main__": try: with app.connection() as conn: diff --git a/docker-compose.yml b/docker-compose.yml index 98bb499..df0776d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,6 +51,7 @@ services: - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=api - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 @@ -74,11 +75,15 @@ services: context: . dockerfile: Dockerfile container_name: rec-celery + ports: + - "9090:9090" volumes: - .:/app - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=celery-worker + - CELERY_METRICS_PORT=9090 - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 @@ -98,11 +103,15 @@ services: context: . dockerfile: Dockerfile container_name: rec-celery-beat + ports: + - "9091:9091" volumes: - .:/app - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=celery-beat + - CELERY_METRICS_PORT=9091 - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 diff --git a/grafana/dashboard.json b/grafana/dashboard.json new file mode 100644 index 0000000..90ddebe --- /dev/null +++ b/grafana/dashboard.json @@ -0,0 +1,345 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 }, + "id": 100, + "title": "Scrape Overview", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "unit": "s", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 300 }, + { "color": "red", "value": 600 } + ] + } + } + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 1 }, + "id": 1, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "title": "Last Scrape Duration", + "type": "stat", + "targets": [ + { + "expr": "histogram_quantile(0.5, rate(scrape_duration_seconds_bucket[24h]))", + "legendFormat": "p50" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 6, "w": 6, "x": 6, "y": 1 }, + "id": 2, + "title": "Listings Found vs Processed", + "type": "timeseries", + "targets": [ + { + "expr": "increase(scrape_listings_found_total[1h])", + "legendFormat": "Found" + }, + { + "expr": "increase(scrape_listings_processed_total[1h])", + "legendFormat": "Processed" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "unit": "short", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 1 } + ] + } + } + }, + "gridPos": { "h": 6, "w": 6, "x": 12, "y": 1 }, + "id": 3, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "title": "Failed Listings (Last Scrape)", + "type": "stat", + "targets": [ + { + "expr": "increase(scrape_listings_failed_total[1h])", + "legendFormat": "Failed" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 6, "w": 6, "x": 18, "y": 1 }, + "id": 4, + "title": "Pages Fetched & Subqueries", + "type": "timeseries", + "targets": [ + { + "expr": "increase(scrape_pages_fetched_total[1h])", + "legendFormat": "Pages" + }, + { + "expr": "increase(scrape_subqueries_total[1h])", + "legendFormat": "Subqueries" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 7 }, + "id": 101, + "title": "Throttle & Circuit Breaker", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "id": 5, + "title": "Throttle Events by Type", + "type": "timeseries", + "targets": [ + { + "expr": "increase(throttle_events_total[5m])", + "legendFormat": "{{ type }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "mappings": [ + { "options": { "0": { "text": "CLOSED", "color": "green" } }, "type": "value" }, + { "options": { "1": { "text": "HALF_OPEN", "color": "yellow" } }, "type": "value" }, + { "options": { "2": { "text": "OPEN", "color": "red" } }, "type": "value" } + ] + } + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "id": 6, + "title": "Circuit Breaker State", + "type": "state-timeline", + "targets": [ + { + "expr": "circuit_breaker_state", + "legendFormat": "State" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 16 }, + "id": 102, + "title": "API Performance", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "reqps" } }, + "gridPos": { "h": 8, "w": 8, "x": 0, "y": 17 }, + "id": 7, + "title": "Request Rate by Endpoint", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(http_server_duration_milliseconds_count[5m])) by (http_route)", + "legendFormat": "{{ http_route }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "ms" } }, + "gridPos": { "h": 8, "w": 8, "x": 8, "y": 17 }, + "id": 8, + "title": "Latency Percentiles", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p50" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p95" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p99" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { "unit": "percentunit" } + }, + "gridPos": { "h": 8, "w": 8, "x": 16, "y": 17 }, + "id": 9, + "title": "GeoJSON Cache Hit Rate", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(geojson_cache_operations_total{result=\"hit\"}[5m])) / sum(rate(geojson_cache_operations_total[5m]))", + "legendFormat": "Hit Rate" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 25 }, + "id": 103, + "title": "Celery Tasks", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 0, "y": 26 }, + "id": 10, + "title": "Active Tasks", + "type": "stat", + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "targets": [ + { + "expr": "sum(celery_tasks_active)", + "legendFormat": "Active" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 6, "y": 26 }, + "id": 11, + "title": "Task Completion Rate", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(celery_tasks_total[5m])) by (status)", + "legendFormat": "{{ status }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "gridPos": { "h": 8, "w": 6, "x": 12, "y": 26 }, + "id": 12, + "title": "Task Duration (p50/p95)", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(celery_task_duration_seconds_bucket[5m])) by (le, task_name))", + "legendFormat": "p50 {{ task_name }}" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(celery_task_duration_seconds_bucket[5m])) by (le, task_name))", + "legendFormat": "p95 {{ task_name }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 26 }, + "id": 13, + "title": "OCR Success Rate", + "type": "timeseries", + "targets": [ + { + "expr": "increase(ocr_attempts_total[1h])", + "legendFormat": "Attempts" + }, + { + "expr": "increase(ocr_successes_total[1h])", + "legendFormat": "Successes" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 34 }, + "id": 104, + "title": "Logs (Loki)", + "type": "row" + }, + { + "datasource": { "type": "loki", "uid": "${DS_LOKI}" }, + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 35 }, + "id": 14, + "title": "Error Logs", + "type": "logs", + "options": { + "showTime": true, + "showLabels": true, + "showCommonLabels": false, + "wrapLogMessage": true, + "prettifyLogMessage": true, + "enableLogDetails": true, + "sortOrder": "Descending", + "dedupStrategy": "none" + }, + "targets": [ + { + "expr": "{job=\"realestate-crawler\"} | json | level = \"ERROR\"", + "legendFormat": "" + } + ] + } + ], + "refresh": "30s", + "schemaVersion": 39, + "tags": ["realestate-crawler", "monitoring"], + "templating": { + "list": [ + { + "current": {}, + "hide": 0, + "name": "DS_PROMETHEUS", + "type": "datasource", + "query": "prometheus" + }, + { + "current": {}, + "hide": 0, + "name": "DS_LOKI", + "type": "datasource", + "query": "loki" + } + ] + }, + "time": { "from": "now-24h", "to": "now" }, + "timepicker": {}, + "timezone": "browser", + "title": "Realestate Crawler", + "uid": "realestate-crawler", + "version": 1 +} diff --git a/listing_processor.py b/listing_processor.py index b37e81b..a2efb7d 100644 --- a/listing_processor.py +++ b/listing_processor.py @@ -324,6 +324,15 @@ class DetectFloorplanStep(Step): listing.square_meters = max_sqm await self.listing_repository.upsert_listings([listing]) + # Record OCR metrics + try: + from api.metrics import ocr_attempts, ocr_successes + ocr_attempts.add(1) + if max_sqm > 0: + ocr_successes.add(1) + except Exception: + pass # Metrics not initialised + if max_sqm > 0: logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm") else: diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..6b473dd --- /dev/null +++ b/logging_config.py @@ -0,0 +1,80 @@ +"""Centralized structured JSON logging configuration.""" +from __future__ import annotations + +import json +import logging +import sys +from datetime import datetime, timezone +from typing import Any + + +class JsonFormatter(logging.Formatter): + """Outputs log records as single-line JSON for Loki ingestion.""" + + def __init__(self, service: str = "unknown") -> None: + super().__init__() + self.service = service + + def format(self, record: logging.LogRecord) -> str: + log_entry: dict[str, Any] = { + "timestamp": datetime.fromtimestamp( + record.created, tz=timezone.utc + ).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "service": self.service, + } + + # Merge any extra fields passed via `extra={...}` on the log call. + # Standard LogRecord attributes are excluded to keep output clean. + _standard = logging.LogRecord("", 0, "", 0, "", (), None).__dict__.keys() + for key, value in record.__dict__.items(): + if key not in _standard and key not in log_entry: + log_entry[key] = value + + if record.exc_info and record.exc_info[1] is not None: + log_entry["exception"] = self.formatException(record.exc_info) + + return json.dumps(log_entry, default=str) + + +class _ServiceFilter(logging.Filter): + """Injects the ``service`` field into every log record.""" + + def __init__(self, service: str) -> None: + super().__init__() + self.service = service + + def filter(self, record: logging.LogRecord) -> bool: + record.service = self.service # type: ignore[attr-defined] + return True + + +def configure_logging(service_name: str) -> None: + """Replace all handlers on the root logger with a single JSON stdout handler. + + Uvicorn's access and error loggers are reconfigured to propagate through + the root logger so they also emit JSON. + """ + formatter = JsonFormatter(service=service_name) + + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(logging.INFO) + root.addFilter(_ServiceFilter(service_name)) + + # Make uvicorn loggers propagate to root instead of using their own handlers + for uvicorn_logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"): + uv_logger = logging.getLogger(uvicorn_logger_name) + uv_logger.handlers.clear() + uv_logger.propagate = True + + # Same for celery task logger + celery_logger = logging.getLogger("celery.task") + celery_logger.handlers.clear() + celery_logger.propagate = True diff --git a/rec/circuit_breaker.py b/rec/circuit_breaker.py index 27bf12f..0178fbe 100644 --- a/rec/circuit_breaker.py +++ b/rec/circuit_breaker.py @@ -135,3 +135,54 @@ class CircuitBreaker: def is_half_open(self) -> bool: """Check if circuit is currently half-open.""" return self.state == CircuitState.HALF_OPEN + + @property + def state_as_int(self) -> int: + """Return the current state as an integer for metrics. + + 0 = closed, 1 = half_open, 2 = open. + """ + return { + CircuitState.CLOSED: 0, + CircuitState.HALF_OPEN: 1, + CircuitState.OPEN: 2, + }[self.state] + + +# --------------------------------------------------------------------------- +# Global circuit breaker instance used by the scraper +# --------------------------------------------------------------------------- +_global_circuit_breaker: CircuitBreaker | None = None + + +def get_circuit_breaker() -> CircuitBreaker | None: + """Return the global circuit breaker, if one has been set.""" + return _global_circuit_breaker + + +def set_global_circuit_breaker(cb: CircuitBreaker) -> None: + """Set the global circuit breaker instance (called during scraper init).""" + global _global_circuit_breaker + _global_circuit_breaker = cb + + +def register_circuit_breaker_gauge() -> None: + """Register an ObservableGauge that reports the circuit breaker state.""" + try: + from opentelemetry.metrics import get_meter + + meter = get_meter(__name__) + + def _observe_cb_state(options: object) -> list: # type: ignore[type-arg] + from opentelemetry.sdk.metrics._internal.measurement import Measurement + cb = get_circuit_breaker() + value = cb.state_as_int if cb is not None else 0 + return [Measurement(value)] + + meter.create_observable_gauge( + "circuit_breaker_state", + callbacks=[_observe_cb_state], + description="Circuit breaker state: 0=closed, 1=half_open, 2=open", + ) + except Exception: + pass # Metrics not initialised diff --git a/rec/throttle_detector.py b/rec/throttle_detector.py index dc999ed..ccebf04 100644 --- a/rec/throttle_detector.py +++ b/rec/throttle_detector.py @@ -45,14 +45,17 @@ class ThrottleMetrics: def record_rate_limit(self) -> None: """Record a rate limit error (HTTP 429).""" self.rate_limit_count += 1 + _increment_throttle_metric("rate_limit") def record_service_unavailable(self) -> None: """Record a service unavailable error (HTTP 503).""" self.service_unavailable_count += 1 + _increment_throttle_metric("service_unavailable") def record_ip_blocked(self) -> None: """Record an IP blocked error (HTTP 403).""" self.ip_blocked_count += 1 + _increment_throttle_metric("ip_blocked") def record_slow_response(self, response_time: float) -> None: """Record a slow response. @@ -63,14 +66,17 @@ class ThrottleMetrics: self.slow_response_count += 1 self.total_response_time += response_time self.total_requests += 1 + _increment_throttle_metric("slow_response") def record_empty_response(self) -> None: """Record an unexpected empty response.""" self.empty_response_count += 1 + _increment_throttle_metric("empty_response") def record_invalid_response(self) -> None: """Record an invalid or error response.""" self.invalid_response_count += 1 + _increment_throttle_metric("invalid_response") def record_request(self, response_time: float) -> None: """Record a successful request. @@ -150,6 +156,15 @@ def reset_throttle_metrics() -> None: _global_metrics = ThrottleMetrics() +def _increment_throttle_metric(event_type: str) -> None: + """Safely increment the OTel throttle counter if metrics are initialised.""" + try: + from api.metrics import throttle_events_total + throttle_events_total.add(1, {"type": event_type}) + except Exception: + pass # Metrics not yet initialised (e.g. during tests) + + def validate_response( response: aiohttp.ClientResponse, response_time: float, diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py index 42a5760..e9b1375 100644 --- a/tasks/listing_tasks.py +++ b/tasks/listing_tasks.py @@ -23,15 +23,8 @@ from services.task_progress_publisher import publish_task_progress logger = logging.getLogger("uvicorn.error") -# Also configure a celery-specific logger that always outputs to stdout +# Central logging is now configured in celery_app.py via logging_config celery_logger = logging.getLogger("celery.task") -if not celery_logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter( - "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - )) - celery_logger.addHandler(handler) - celery_logger.setLevel(logging.INFO) SCRAPE_LOCK_NAME = "scrape_listings" LOG_BUFFER_MAX_LINES = 200 @@ -574,6 +567,22 @@ async def _dump_listings_full_inner( ) celery_logger.info("=" * 60) + # Record scrape metrics + from api.metrics import ( + scrape_listings_found, + scrape_listings_processed, + scrape_listings_failed, + scrape_duration_seconds, + scrape_pages_fetched, + scrape_subqueries_total as scrape_subqueries_metric, + ) + scrape_listings_found.add(state.ids_collected) + scrape_listings_processed.add(state.processed_count) + scrape_listings_failed.add(state.failed_count) + scrape_duration_seconds.record(elapsed) + scrape_pages_fetched.add(state.total_pages_fetched) + scrape_subqueries_metric.add(state.completed_subqueries) + invalidate_cache() _update_task_state(task, "Completed", { diff --git a/tasks/poi_tasks.py b/tasks/poi_tasks.py index 635f31b..7688389 100644 --- a/tasks/poi_tasks.py +++ b/tasks/poi_tasks.py @@ -14,14 +14,8 @@ from services.task_progress_publisher import publish_task_progress logger = logging.getLogger(__name__) +# Central logging is now configured in celery_app.py via logging_config celery_logger = logging.getLogger("celery.task") -if not celery_logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter( - "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - )) - celery_logger.addHandler(handler) - celery_logger.setLevel(logging.INFO) @app.task(