wrongmove/api/audit_middleware.py
Viktor Barzin d6edb747d2
Add structured JSON logging, OTel business metrics, and Grafana dashboard
Structured logging via JsonFormatter replaces uvicorn's default format so
Loki can parse timestamps and fields.  14 business metrics (scrape stats,
throttle events, circuit breaker state, cache hit rate, OCR success rate,
Celery task lifecycle) are defined in a shared metrics module and
instrumented across the scraper pipeline, API, and workers.  Celery
workers expose a Prometheus HTTP endpoint on configurable ports.
2026-02-14 10:59:12 +00:00

65 lines
2 KiB
Python

"""Audit logging middleware for API requests."""
from __future__ import annotations
import logging
import time
import jwt
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
audit_logger = logging.getLogger("uvicorn.audit")
def _extract_identity(request: Request) -> str:
"""Extract user email from JWT for audit logging."""
auth_header = request.headers.get("authorization", "")
if not auth_header.startswith("Bearer "):
return "anonymous"
token = auth_header[7:]
try:
payload = jwt.decode(token, options={"verify_signature": False, "verify_exp": False})
return payload.get("email", "unknown")
except jwt.PyJWTError:
return "invalid-token"
def _client_ip(request: Request) -> str:
"""Best-effort client IP."""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
return forwarded.split(",")[0].strip()
client = request.client
return client.host if client else "unknown"
class AuditLogMiddleware(BaseHTTPMiddleware):
"""Logs all /api/ requests with method, path, user, IP, status, and duration."""
async def dispatch(self, request: Request, call_next) -> Response: # type: ignore[no-untyped-def]
path = request.url.path
if not path.startswith("/api/"):
return await call_next(request)
start = time.monotonic()
identity = _extract_identity(request)
ip = _client_ip(request)
query = str(request.query_params) if request.query_params else ""
response = await call_next(request)
duration_ms = (time.monotonic() - start) * 1000
audit_logger.info(
"API request",
extra={
"method": request.method,
"path": path,
"query": query,
"user": identity,
"ip": ip,
"status": response.status_code,
"duration_ms": round(duration_ms, 1),
},
)
return response