From d6edb747d2bd43390be1c66d818a3d9a9d3da9d0 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 14 Feb 2026 10:59:12 +0000 Subject: [PATCH] Add structured JSON logging, OTel business metrics, and Grafana dashboard Structured logging via JsonFormatter replaces uvicorn's default format so Loki can parse timestamps and fields. 14 business metrics (scrape stats, throttle events, circuit breaker state, cache hit rate, OCR success rate, Celery task lifecycle) are defined in a shared metrics module and instrumented across the scraper pipeline, API, and workers. Celery workers expose a Prometheus HTTP endpoint on configurable ports. --- api/app.py | 23 +-- api/audit_middleware.py | 18 +- api/metrics.py | 157 ++++++++++++++++-- celery_app.py | 51 ++++++ docker-compose.yml | 9 + grafana/dashboard.json | 345 +++++++++++++++++++++++++++++++++++++++ listing_processor.py | 9 + logging_config.py | 80 +++++++++ rec/circuit_breaker.py | 51 ++++++ rec/throttle_detector.py | 15 ++ tasks/listing_tasks.py | 25 ++- tasks/poi_tasks.py | 8 +- 12 files changed, 742 insertions(+), 49 deletions(-) create mode 100644 grafana/dashboard.json create mode 100644 logging_config.py diff --git a/api/app.py b/api/app.py index b76d4e5..5a677ba 100644 --- a/api/app.py +++ b/api/app.py @@ -39,12 +39,13 @@ from services.listing_cache import ( from repositories.poi_repository import POIRepository from repositories.user_repository import UserRepository from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from api.metrics import metrics_app -from opentelemetry.metrics import get_meter +from api.metrics import init_metrics, get_metrics_asgi_app, geojson_cache_operations +from logging_config import configure_logging load_dotenv() -logger = logging.getLogger("uvicorn") +configure_logging("api") +logger = logging.getLogger(__name__) DEFAULT_BATCH_SIZE = 50 _rate_limit_config = RateLimitConfig.from_env() @@ -99,16 +100,8 @@ app = FastAPI( app.include_router(passkey_router) app.include_router(poi_router) app.include_router(ws_router) -app.mount("/metrics", metrics_app) -meter = get_meter(__name__) -request_counter = meter.create_counter( - name="custom_request_count", - description="Number of times /hello was called", -) -hist = meter.create_histogram( - name="custom_request_duration", - description="Duration of /hello requests in seconds", -) +init_metrics("realestate-crawler-api") +app.mount("/metrics", get_metrics_asgi_app()) # Allow CORS (for React frontend) @@ -146,8 +139,6 @@ async def unhandled_exception_handler(request: Request, exc: Exception) -> JSONR @app.get("/api/status") async def get_status() -> dict[str, str]: - request_counter.add(1, {"method": "GET", "path": "/status"}) - hist.record(1.5, {"method": "GET", "path": "/status"}) return {"status": "OK"} @@ -333,8 +324,10 @@ async def stream_listing_geojson( cached_count = get_cached_count(query_parameters) if cached_count is not None and cached_count > 0 and not include_poi_distances: + geojson_cache_operations.add(1, {"result": "hit"}) generator = _stream_from_cache(query_parameters, batch_size, limit) else: + geojson_cache_operations.add(1, {"result": "miss"}) generator = _stream_from_db( query_parameters, batch_size, limit, poi_distances_lookup, skip_cache=include_poi_distances, diff --git a/api/audit_middleware.py b/api/audit_middleware.py index 21f7ea9..e89f508 100644 --- a/api/audit_middleware.py +++ b/api/audit_middleware.py @@ -51,13 +51,15 @@ class AuditLogMiddleware(BaseHTTPMiddleware): duration_ms = (time.monotonic() - start) * 1000 audit_logger.info( - "method=%s path=%s query=%s user=%s ip=%s status=%d duration_ms=%.1f", - request.method, - path, - query, - identity, - ip, - response.status_code, - duration_ms, + "API request", + extra={ + "method": request.method, + "path": path, + "query": query, + "user": identity, + "ip": ip, + "status": response.status_code, + "duration_ms": round(duration_ms, 1), + }, ) return response diff --git a/api/metrics.py b/api/metrics.py index 1baae9e..51984ef 100644 --- a/api/metrics.py +++ b/api/metrics.py @@ -1,17 +1,152 @@ -# metrics.py -from opentelemetry.metrics import set_meter_provider +"""OpenTelemetry metrics with Prometheus export. + +Provides ``init_metrics()`` to lazily initialise the MeterProvider and all +business metric instruments. Safe to call from both the API and Celery +workers — the provider is created at most once per process. +""" +from __future__ import annotations + +from opentelemetry.metrics import ( + Counter, + Histogram, + Meter, + UpDownCounter, + get_meter, + set_meter_provider, +) from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.exporter.prometheus import PrometheusMetricReader from prometheus_client import make_asgi_app -# Set up Prometheus reader and meter provider -reader = PrometheusMetricReader() -provider = MeterProvider( - metric_readers=[reader], - resource=Resource.create({SERVICE_NAME: "fastapi-metrics-app"}), -) -set_meter_provider(provider) +_reader: PrometheusMetricReader | None = None +_meter: Meter | None = None -# Expose the Prometheus metrics endpoint -metrics_app = make_asgi_app() # Exposes /metrics +# --------------------------------------------------------------------------- +# Scrape metrics +# --------------------------------------------------------------------------- +scrape_listings_found: Counter +scrape_listings_processed: Counter +scrape_listings_failed: Counter +scrape_duration_seconds: Histogram +scrape_pages_fetched: Counter +scrape_subqueries_total: Counter + +# --------------------------------------------------------------------------- +# Throttle / circuit-breaker metrics +# --------------------------------------------------------------------------- +throttle_events_total: Counter +# circuit_breaker_state is registered as an ObservableGauge in circuit_breaker.py + +# --------------------------------------------------------------------------- +# API / cache metrics +# --------------------------------------------------------------------------- +geojson_cache_operations: Counter + +# --------------------------------------------------------------------------- +# OCR metrics +# --------------------------------------------------------------------------- +ocr_attempts: Counter +ocr_successes: Counter + +# --------------------------------------------------------------------------- +# Celery task metrics +# --------------------------------------------------------------------------- +celery_tasks_total: Counter +celery_task_duration_seconds: Histogram +celery_tasks_active: UpDownCounter + + +def init_metrics(service_name: str = "realestate-crawler") -> PrometheusMetricReader: + """Initialise the OTel MeterProvider and define all instruments. + + Returns the ``PrometheusMetricReader`` so the API can mount the ASGI app. + Calling this more than once is a no-op (returns the existing reader). + """ + global _reader, _meter + global scrape_listings_found, scrape_listings_processed, scrape_listings_failed + global scrape_duration_seconds, scrape_pages_fetched, scrape_subqueries_total + global throttle_events_total + global geojson_cache_operations + global ocr_attempts, ocr_successes + global celery_tasks_total, celery_task_duration_seconds, celery_tasks_active + + if _reader is not None: + return _reader + + _reader = PrometheusMetricReader() + provider = MeterProvider( + metric_readers=[_reader], + resource=Resource.create({SERVICE_NAME: service_name}), + ) + set_meter_provider(provider) + _meter = get_meter(__name__) + + # -- Scrape -- + scrape_listings_found = _meter.create_counter( + "scrape_listings_found_total", + description="Total listings discovered during scrape runs", + ) + scrape_listings_processed = _meter.create_counter( + "scrape_listings_processed_total", + description="Total listings successfully processed", + ) + scrape_listings_failed = _meter.create_counter( + "scrape_listings_failed_total", + description="Total listings that failed processing", + ) + scrape_duration_seconds = _meter.create_histogram( + "scrape_duration_seconds", + description="Duration of a full scrape run in seconds", + ) + scrape_pages_fetched = _meter.create_counter( + "scrape_pages_fetched_total", + description="Total API pages fetched during scraping", + ) + scrape_subqueries_total = _meter.create_counter( + "scrape_subqueries_total", + description="Total subqueries executed after query splitting", + ) + + # -- Throttle -- + throttle_events_total = _meter.create_counter( + "throttle_events_total", + description="Total throttling events by type", + ) + + # -- Cache -- + geojson_cache_operations = _meter.create_counter( + "geojson_cache_operations_total", + description="GeoJSON cache operations (hit/miss)", + ) + + # -- OCR -- + ocr_attempts = _meter.create_counter( + "ocr_attempts_total", + description="Total OCR detection attempts", + ) + ocr_successes = _meter.create_counter( + "ocr_successes_total", + description="Total OCR detections that found square meters", + ) + + # -- Celery -- + celery_tasks_total = _meter.create_counter( + "celery_tasks_total", + description="Total Celery tasks by name and status", + ) + celery_task_duration_seconds = _meter.create_histogram( + "celery_task_duration_seconds", + description="Duration of Celery tasks in seconds", + ) + celery_tasks_active = _meter.create_up_down_counter( + "celery_tasks_active", + description="Currently active Celery tasks", + ) + + return _reader + + +def get_metrics_asgi_app(): # type: ignore[no-untyped-def] + """Return the Prometheus ASGI app for mounting at /metrics.""" + return make_asgi_app() diff --git a/celery_app.py b/celery_app.py index 933c3cb..31099b8 100644 --- a/celery_app.py +++ b/celery_app.py @@ -1,10 +1,16 @@ import sys +import time from celery import Celery +from celery.signals import worker_ready, task_prerun, task_postrun from dotenv import load_dotenv import os +from logging_config import configure_logging + load_dotenv() +configure_logging(os.getenv("SERVICE_NAME", "celery-worker")) + app = Celery( "celery_app", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), @@ -20,6 +26,51 @@ app.conf.update( enable_utc=True, ) +# --------------------------------------------------------------------------- +# Celery metrics via prometheus_client +# --------------------------------------------------------------------------- +CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090")) + +# Track task start times for duration measurement +_task_start_times: dict[str, float] = {} + + +@worker_ready.connect +def _start_metrics_server(**kwargs: object) -> None: + """Start a lightweight Prometheus HTTP server in the worker process.""" + from api.metrics import init_metrics + init_metrics(os.getenv("SERVICE_NAME", "celery-worker")) + + from prometheus_client import start_http_server + start_http_server(CELERY_METRICS_PORT) + + +@task_prerun.connect +def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None: + from api.metrics import celery_tasks_active + task_name = getattr(task, "name", "unknown") + celery_tasks_active.add(1, {"task_name": task_name}) + _task_start_times[task_id] = time.monotonic() + + +@task_postrun.connect +def _on_task_postrun( + task_id: str, task: object, state: str | None = None, **kwargs: object +) -> None: + from api.metrics import celery_tasks_total, celery_task_duration_seconds, celery_tasks_active + task_name = getattr(task, "name", "unknown") + status = state or "UNKNOWN" + + celery_tasks_active.add(-1, {"task_name": task_name}) + celery_tasks_total.add(1, {"task_name": task_name, "status": status}) + + start = _task_start_times.pop(task_id, None) + if start is not None: + celery_task_duration_seconds.record( + time.monotonic() - start, {"task_name": task_name} + ) + + if __name__ == "__main__": try: with app.connection() as conn: diff --git a/docker-compose.yml b/docker-compose.yml index 98bb499..df0776d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,6 +51,7 @@ services: - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=api - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 @@ -74,11 +75,15 @@ services: context: . dockerfile: Dockerfile container_name: rec-celery + ports: + - "9090:9090" volumes: - .:/app - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=celery-worker + - CELERY_METRICS_PORT=9090 - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 @@ -98,11 +103,15 @@ services: context: . dockerfile: Dockerfile container_name: rec-celery-beat + ports: + - "9091:9091" volumes: - .:/app - app_venv:/app/.venv environment: - ENV=dev + - SERVICE_NAME=celery-beat + - CELERY_METRICS_PORT=9091 - DB_CONNECTION_STRING=mysql+mysqldb://wrongmove:wrongmove@mysql:3306/wrongmove - CELERY_BROKER_URL=redis://redis:6379/0 - CELERY_RESULT_BACKEND=redis://redis:6379/0 diff --git a/grafana/dashboard.json b/grafana/dashboard.json new file mode 100644 index 0000000..90ddebe --- /dev/null +++ b/grafana/dashboard.json @@ -0,0 +1,345 @@ +{ + "annotations": { + "list": [] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "panels": [ + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 }, + "id": 100, + "title": "Scrape Overview", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "unit": "s", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "yellow", "value": 300 }, + { "color": "red", "value": 600 } + ] + } + } + }, + "gridPos": { "h": 6, "w": 6, "x": 0, "y": 1 }, + "id": 1, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "title": "Last Scrape Duration", + "type": "stat", + "targets": [ + { + "expr": "histogram_quantile(0.5, rate(scrape_duration_seconds_bucket[24h]))", + "legendFormat": "p50" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 6, "w": 6, "x": 6, "y": 1 }, + "id": 2, + "title": "Listings Found vs Processed", + "type": "timeseries", + "targets": [ + { + "expr": "increase(scrape_listings_found_total[1h])", + "legendFormat": "Found" + }, + { + "expr": "increase(scrape_listings_processed_total[1h])", + "legendFormat": "Processed" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "unit": "short", + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 1 } + ] + } + } + }, + "gridPos": { "h": 6, "w": 6, "x": 12, "y": 1 }, + "id": 3, + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "title": "Failed Listings (Last Scrape)", + "type": "stat", + "targets": [ + { + "expr": "increase(scrape_listings_failed_total[1h])", + "legendFormat": "Failed" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 6, "w": 6, "x": 18, "y": 1 }, + "id": 4, + "title": "Pages Fetched & Subqueries", + "type": "timeseries", + "targets": [ + { + "expr": "increase(scrape_pages_fetched_total[1h])", + "legendFormat": "Pages" + }, + { + "expr": "increase(scrape_subqueries_total[1h])", + "legendFormat": "Subqueries" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 7 }, + "id": 101, + "title": "Throttle & Circuit Breaker", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "id": 5, + "title": "Throttle Events by Type", + "type": "timeseries", + "targets": [ + { + "expr": "increase(throttle_events_total[5m])", + "legendFormat": "{{ type }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { + "mappings": [ + { "options": { "0": { "text": "CLOSED", "color": "green" } }, "type": "value" }, + { "options": { "1": { "text": "HALF_OPEN", "color": "yellow" } }, "type": "value" }, + { "options": { "2": { "text": "OPEN", "color": "red" } }, "type": "value" } + ] + } + }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 8 }, + "id": 6, + "title": "Circuit Breaker State", + "type": "state-timeline", + "targets": [ + { + "expr": "circuit_breaker_state", + "legendFormat": "State" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 16 }, + "id": 102, + "title": "API Performance", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "reqps" } }, + "gridPos": { "h": 8, "w": 8, "x": 0, "y": 17 }, + "id": 7, + "title": "Request Rate by Endpoint", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(http_server_duration_milliseconds_count[5m])) by (http_route)", + "legendFormat": "{{ http_route }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "ms" } }, + "gridPos": { "h": 8, "w": 8, "x": 8, "y": 17 }, + "id": 8, + "title": "Latency Percentiles", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p50" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p95" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(http_server_duration_milliseconds_bucket[5m])) by (le))", + "legendFormat": "p99" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { + "defaults": { "unit": "percentunit" } + }, + "gridPos": { "h": 8, "w": 8, "x": 16, "y": 17 }, + "id": 9, + "title": "GeoJSON Cache Hit Rate", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(geojson_cache_operations_total{result=\"hit\"}[5m])) / sum(rate(geojson_cache_operations_total[5m]))", + "legendFormat": "Hit Rate" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 25 }, + "id": 103, + "title": "Celery Tasks", + "type": "row" + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 0, "y": 26 }, + "id": 10, + "title": "Active Tasks", + "type": "stat", + "options": { + "reduceOptions": { "calcs": ["lastNotNull"] } + }, + "targets": [ + { + "expr": "sum(celery_tasks_active)", + "legendFormat": "Active" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 6, "y": 26 }, + "id": 11, + "title": "Task Completion Rate", + "type": "timeseries", + "targets": [ + { + "expr": "sum(rate(celery_tasks_total[5m])) by (status)", + "legendFormat": "{{ status }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "gridPos": { "h": 8, "w": 6, "x": 12, "y": 26 }, + "id": 12, + "title": "Task Duration (p50/p95)", + "type": "timeseries", + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(celery_task_duration_seconds_bucket[5m])) by (le, task_name))", + "legendFormat": "p50 {{ task_name }}" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(celery_task_duration_seconds_bucket[5m])) by (le, task_name))", + "legendFormat": "p95 {{ task_name }}" + } + ] + }, + { + "datasource": { "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 26 }, + "id": 13, + "title": "OCR Success Rate", + "type": "timeseries", + "targets": [ + { + "expr": "increase(ocr_attempts_total[1h])", + "legendFormat": "Attempts" + }, + { + "expr": "increase(ocr_successes_total[1h])", + "legendFormat": "Successes" + } + ] + }, + { + "collapsed": false, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 34 }, + "id": 104, + "title": "Logs (Loki)", + "type": "row" + }, + { + "datasource": { "type": "loki", "uid": "${DS_LOKI}" }, + "gridPos": { "h": 10, "w": 24, "x": 0, "y": 35 }, + "id": 14, + "title": "Error Logs", + "type": "logs", + "options": { + "showTime": true, + "showLabels": true, + "showCommonLabels": false, + "wrapLogMessage": true, + "prettifyLogMessage": true, + "enableLogDetails": true, + "sortOrder": "Descending", + "dedupStrategy": "none" + }, + "targets": [ + { + "expr": "{job=\"realestate-crawler\"} | json | level = \"ERROR\"", + "legendFormat": "" + } + ] + } + ], + "refresh": "30s", + "schemaVersion": 39, + "tags": ["realestate-crawler", "monitoring"], + "templating": { + "list": [ + { + "current": {}, + "hide": 0, + "name": "DS_PROMETHEUS", + "type": "datasource", + "query": "prometheus" + }, + { + "current": {}, + "hide": 0, + "name": "DS_LOKI", + "type": "datasource", + "query": "loki" + } + ] + }, + "time": { "from": "now-24h", "to": "now" }, + "timepicker": {}, + "timezone": "browser", + "title": "Realestate Crawler", + "uid": "realestate-crawler", + "version": 1 +} diff --git a/listing_processor.py b/listing_processor.py index b37e81b..a2efb7d 100644 --- a/listing_processor.py +++ b/listing_processor.py @@ -324,6 +324,15 @@ class DetectFloorplanStep(Step): listing.square_meters = max_sqm await self.listing_repository.upsert_listings([listing]) + # Record OCR metrics + try: + from api.metrics import ocr_attempts, ocr_successes + ocr_attempts.add(1) + if max_sqm > 0: + ocr_successes.add(1) + except Exception: + pass # Metrics not initialised + if max_sqm > 0: logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm") else: diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..6b473dd --- /dev/null +++ b/logging_config.py @@ -0,0 +1,80 @@ +"""Centralized structured JSON logging configuration.""" +from __future__ import annotations + +import json +import logging +import sys +from datetime import datetime, timezone +from typing import Any + + +class JsonFormatter(logging.Formatter): + """Outputs log records as single-line JSON for Loki ingestion.""" + + def __init__(self, service: str = "unknown") -> None: + super().__init__() + self.service = service + + def format(self, record: logging.LogRecord) -> str: + log_entry: dict[str, Any] = { + "timestamp": datetime.fromtimestamp( + record.created, tz=timezone.utc + ).isoformat(), + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + "service": self.service, + } + + # Merge any extra fields passed via `extra={...}` on the log call. + # Standard LogRecord attributes are excluded to keep output clean. + _standard = logging.LogRecord("", 0, "", 0, "", (), None).__dict__.keys() + for key, value in record.__dict__.items(): + if key not in _standard and key not in log_entry: + log_entry[key] = value + + if record.exc_info and record.exc_info[1] is not None: + log_entry["exception"] = self.formatException(record.exc_info) + + return json.dumps(log_entry, default=str) + + +class _ServiceFilter(logging.Filter): + """Injects the ``service`` field into every log record.""" + + def __init__(self, service: str) -> None: + super().__init__() + self.service = service + + def filter(self, record: logging.LogRecord) -> bool: + record.service = self.service # type: ignore[attr-defined] + return True + + +def configure_logging(service_name: str) -> None: + """Replace all handlers on the root logger with a single JSON stdout handler. + + Uvicorn's access and error loggers are reconfigured to propagate through + the root logger so they also emit JSON. + """ + formatter = JsonFormatter(service=service_name) + + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + + root = logging.getLogger() + root.handlers.clear() + root.addHandler(handler) + root.setLevel(logging.INFO) + root.addFilter(_ServiceFilter(service_name)) + + # Make uvicorn loggers propagate to root instead of using their own handlers + for uvicorn_logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access"): + uv_logger = logging.getLogger(uvicorn_logger_name) + uv_logger.handlers.clear() + uv_logger.propagate = True + + # Same for celery task logger + celery_logger = logging.getLogger("celery.task") + celery_logger.handlers.clear() + celery_logger.propagate = True diff --git a/rec/circuit_breaker.py b/rec/circuit_breaker.py index 27bf12f..0178fbe 100644 --- a/rec/circuit_breaker.py +++ b/rec/circuit_breaker.py @@ -135,3 +135,54 @@ class CircuitBreaker: def is_half_open(self) -> bool: """Check if circuit is currently half-open.""" return self.state == CircuitState.HALF_OPEN + + @property + def state_as_int(self) -> int: + """Return the current state as an integer for metrics. + + 0 = closed, 1 = half_open, 2 = open. + """ + return { + CircuitState.CLOSED: 0, + CircuitState.HALF_OPEN: 1, + CircuitState.OPEN: 2, + }[self.state] + + +# --------------------------------------------------------------------------- +# Global circuit breaker instance used by the scraper +# --------------------------------------------------------------------------- +_global_circuit_breaker: CircuitBreaker | None = None + + +def get_circuit_breaker() -> CircuitBreaker | None: + """Return the global circuit breaker, if one has been set.""" + return _global_circuit_breaker + + +def set_global_circuit_breaker(cb: CircuitBreaker) -> None: + """Set the global circuit breaker instance (called during scraper init).""" + global _global_circuit_breaker + _global_circuit_breaker = cb + + +def register_circuit_breaker_gauge() -> None: + """Register an ObservableGauge that reports the circuit breaker state.""" + try: + from opentelemetry.metrics import get_meter + + meter = get_meter(__name__) + + def _observe_cb_state(options: object) -> list: # type: ignore[type-arg] + from opentelemetry.sdk.metrics._internal.measurement import Measurement + cb = get_circuit_breaker() + value = cb.state_as_int if cb is not None else 0 + return [Measurement(value)] + + meter.create_observable_gauge( + "circuit_breaker_state", + callbacks=[_observe_cb_state], + description="Circuit breaker state: 0=closed, 1=half_open, 2=open", + ) + except Exception: + pass # Metrics not initialised diff --git a/rec/throttle_detector.py b/rec/throttle_detector.py index dc999ed..ccebf04 100644 --- a/rec/throttle_detector.py +++ b/rec/throttle_detector.py @@ -45,14 +45,17 @@ class ThrottleMetrics: def record_rate_limit(self) -> None: """Record a rate limit error (HTTP 429).""" self.rate_limit_count += 1 + _increment_throttle_metric("rate_limit") def record_service_unavailable(self) -> None: """Record a service unavailable error (HTTP 503).""" self.service_unavailable_count += 1 + _increment_throttle_metric("service_unavailable") def record_ip_blocked(self) -> None: """Record an IP blocked error (HTTP 403).""" self.ip_blocked_count += 1 + _increment_throttle_metric("ip_blocked") def record_slow_response(self, response_time: float) -> None: """Record a slow response. @@ -63,14 +66,17 @@ class ThrottleMetrics: self.slow_response_count += 1 self.total_response_time += response_time self.total_requests += 1 + _increment_throttle_metric("slow_response") def record_empty_response(self) -> None: """Record an unexpected empty response.""" self.empty_response_count += 1 + _increment_throttle_metric("empty_response") def record_invalid_response(self) -> None: """Record an invalid or error response.""" self.invalid_response_count += 1 + _increment_throttle_metric("invalid_response") def record_request(self, response_time: float) -> None: """Record a successful request. @@ -150,6 +156,15 @@ def reset_throttle_metrics() -> None: _global_metrics = ThrottleMetrics() +def _increment_throttle_metric(event_type: str) -> None: + """Safely increment the OTel throttle counter if metrics are initialised.""" + try: + from api.metrics import throttle_events_total + throttle_events_total.add(1, {"type": event_type}) + except Exception: + pass # Metrics not yet initialised (e.g. during tests) + + def validate_response( response: aiohttp.ClientResponse, response_time: float, diff --git a/tasks/listing_tasks.py b/tasks/listing_tasks.py index 42a5760..e9b1375 100644 --- a/tasks/listing_tasks.py +++ b/tasks/listing_tasks.py @@ -23,15 +23,8 @@ from services.task_progress_publisher import publish_task_progress logger = logging.getLogger("uvicorn.error") -# Also configure a celery-specific logger that always outputs to stdout +# Central logging is now configured in celery_app.py via logging_config celery_logger = logging.getLogger("celery.task") -if not celery_logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter( - "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - )) - celery_logger.addHandler(handler) - celery_logger.setLevel(logging.INFO) SCRAPE_LOCK_NAME = "scrape_listings" LOG_BUFFER_MAX_LINES = 200 @@ -574,6 +567,22 @@ async def _dump_listings_full_inner( ) celery_logger.info("=" * 60) + # Record scrape metrics + from api.metrics import ( + scrape_listings_found, + scrape_listings_processed, + scrape_listings_failed, + scrape_duration_seconds, + scrape_pages_fetched, + scrape_subqueries_total as scrape_subqueries_metric, + ) + scrape_listings_found.add(state.ids_collected) + scrape_listings_processed.add(state.processed_count) + scrape_listings_failed.add(state.failed_count) + scrape_duration_seconds.record(elapsed) + scrape_pages_fetched.add(state.total_pages_fetched) + scrape_subqueries_metric.add(state.completed_subqueries) + invalidate_cache() _update_task_state(task, "Completed", { diff --git a/tasks/poi_tasks.py b/tasks/poi_tasks.py index 635f31b..7688389 100644 --- a/tasks/poi_tasks.py +++ b/tasks/poi_tasks.py @@ -14,14 +14,8 @@ from services.task_progress_publisher import publish_task_progress logger = logging.getLogger(__name__) +# Central logging is now configured in celery_app.py via logging_config celery_logger = logging.getLogger("celery.task") -if not celery_logger.handlers: - handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter( - "%(asctime)s [%(levelname)s] %(name)s: %(message)s" - )) - celery_logger.addHandler(handler) - celery_logger.setLevel(logging.INFO) @app.task(