"""Audit logging middleware for API requests.""" from __future__ import annotations import logging import time import jwt from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response audit_logger = logging.getLogger("uvicorn.audit") def _extract_identity(request: Request) -> str: """Extract user email from JWT for audit logging.""" auth_header = request.headers.get("authorization", "") if not auth_header.startswith("Bearer "): return "anonymous" token = auth_header[7:] try: payload = jwt.decode(token, options={"verify_signature": False, "verify_exp": False}) return payload.get("email", "unknown") except jwt.PyJWTError: return "invalid-token" def _client_ip(request: Request) -> str: """Best-effort client IP.""" forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() client = request.client return client.host if client else "unknown" class AuditLogMiddleware(BaseHTTPMiddleware): """Logs all /api/ requests with method, path, user, IP, status, and duration.""" async def dispatch(self, request: Request, call_next) -> Response: # type: ignore[no-untyped-def] path = request.url.path if not path.startswith("/api/"): return await call_next(request) start = time.monotonic() identity = _extract_identity(request) ip = _client_ip(request) query = str(request.query_params) if request.query_params else "" response = await call_next(request) duration_ms = (time.monotonic() - start) * 1000 audit_logger.info( "API request", extra={ "method": request.method, "path": path, "query": query, "user": identity, "ip": ip, "status": response.status_code, "duration_ms": round(duration_ms, 1), }, ) return response