Structured logging via JsonFormatter replaces uvicorn's default format so Loki can parse timestamps and fields. 14 business metrics (scrape stats, throttle events, circuit breaker state, cache hit rate, OCR success rate, Celery task lifecycle) are defined in a shared metrics module and instrumented across the scraper pipeline, API, and workers. Celery workers expose a Prometheus HTTP endpoint on configurable ports.
65 lines
2 KiB
Python
65 lines
2 KiB
Python
"""Audit logging middleware for API requests."""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
import jwt
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request
|
|
from starlette.responses import Response
|
|
|
|
audit_logger = logging.getLogger("uvicorn.audit")
|
|
|
|
|
|
def _extract_identity(request: Request) -> str:
|
|
"""Extract user email from JWT for audit logging."""
|
|
auth_header = request.headers.get("authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
return "anonymous"
|
|
token = auth_header[7:]
|
|
try:
|
|
payload = jwt.decode(token, options={"verify_signature": False, "verify_exp": False})
|
|
return payload.get("email", "unknown")
|
|
except jwt.PyJWTError:
|
|
return "invalid-token"
|
|
|
|
|
|
def _client_ip(request: Request) -> str:
|
|
"""Best-effort client IP."""
|
|
forwarded = request.headers.get("x-forwarded-for")
|
|
if forwarded:
|
|
return forwarded.split(",")[0].strip()
|
|
client = request.client
|
|
return client.host if client else "unknown"
|
|
|
|
|
|
class AuditLogMiddleware(BaseHTTPMiddleware):
|
|
"""Logs all /api/ requests with method, path, user, IP, status, and duration."""
|
|
|
|
async def dispatch(self, request: Request, call_next) -> Response: # type: ignore[no-untyped-def]
|
|
path = request.url.path
|
|
if not path.startswith("/api/"):
|
|
return await call_next(request)
|
|
|
|
start = time.monotonic()
|
|
identity = _extract_identity(request)
|
|
ip = _client_ip(request)
|
|
query = str(request.query_params) if request.query_params else ""
|
|
|
|
response = await call_next(request)
|
|
|
|
duration_ms = (time.monotonic() - start) * 1000
|
|
audit_logger.info(
|
|
"API request",
|
|
extra={
|
|
"method": request.method,
|
|
"path": path,
|
|
"query": query,
|
|
"user": identity,
|
|
"ip": ip,
|
|
"status": response.status_code,
|
|
"duration_ms": round(duration_ms, 1),
|
|
},
|
|
)
|
|
return response
|