"""FastAPI entrypoint for hmrc-sync. Endpoints: - GET /authorize — redirect to HMRC OAuth, primes refresh_token - GET /callback — OAuth callback; exchange code, persist token - POST /callback-metadata — browser-side session attributes (fraud headers) - POST /sync — pull latest HMRC figures for a given tax year - GET /healthz — readiness + queue depth """ from __future__ import annotations import logging import os import secrets import urllib.parse from contextlib import asynccontextmanager from typing import Any from fastapi import FastAPI, HTTPException, Request from fastapi.responses import HTMLResponse, RedirectResponse from prometheus_fastapi_instrumentator import Instrumentator from hmrc_sync import oauth from hmrc_sync.fraud_headers import SessionContext log = logging.getLogger(__name__) REQUIRED_ENV = [ "HMRC_PROD_CLIENT_ID", "HMRC_PROD_CLIENT_SECRET", "HMRC_PROD_REDIRECT_URI", "DB_CONNECTION_STRING", ] def _verify_env() -> None: missing = [k for k in REQUIRED_ENV if not os.environ.get(k)] if missing: raise RuntimeError(f"Missing required env vars: {', '.join(missing)}") @asynccontextmanager async def lifespan(app: FastAPI): # type: ignore[no-untyped-def] _verify_env() app.state.session_context = SessionContext( device_id=os.environ.get("HMRC_DEVICE_ID", ""), public_ip=os.environ.get("HMRC_VENDOR_PUBLIC_IP", ""), ) app.state.oauth_states = {} # anti-CSRF state → expires_at yield app = FastAPI(title="HMRC Sync", lifespan=lifespan) Instrumentator().instrument(app).expose(app, endpoint="/metrics") @app.get("/healthz") async def healthz() -> dict[str, Any]: return {"status": "ok"} @app.get("/authorize") async def authorize() -> RedirectResponse: creds = oauth.load_creds_from_env() state = secrets.token_urlsafe(24) app.state.oauth_states[state] = True params = urllib.parse.urlencode({ "response_type": "code", "client_id": creds.client_id, "scope": "read:self-assessment", "redirect_uri": creds.redirect_uri, "state": state, }) return RedirectResponse(f"{oauth.PROD_BASE}/oauth/authorize?{params}") @app.get("/callback", response_class=HTMLResponse) async def callback(code: str, state: str) -> HTMLResponse: if state not in app.state.oauth_states: raise HTTPException(status_code=400, detail="unknown state (CSRF)") del app.state.oauth_states[state] creds = oauth.load_creds_from_env() token = await oauth.exchange_code(creds, code) oauth.persist_to_vault(token) # Serve a 1-page form that POSTs browser attributes to /callback-metadata # so we capture the per-session values HMRC wants in fraud headers. return HTMLResponse(_metadata_capture_html()) @app.post("/callback-metadata") async def callback_metadata(request: Request) -> dict[str, str]: body = await request.json() session: SessionContext = app.state.session_context session.user_agent = str(body.get("user_agent", "") or "") session.screen_width = int(body.get("screen_width", 0) or 0) session.screen_height = int(body.get("screen_height", 0) or 0) session.screen_colour_depth = int(body.get("screen_colour_depth", 0) or 0) session.window_width = int(body.get("window_width", 0) or 0) session.window_height = int(body.get("window_height", 0) or 0) session.timezone_offset = int(body.get("timezone_offset", 0) or 0) return {"status": "captured"} @app.post("/sync") async def sync(tax_year: str | None = None) -> dict[str, Any]: """Pull latest HMRC figures for `tax_year` (default: current fiscal year).""" raise HTTPException(status_code=501, detail="Sync not yet implemented — awaiting HMRC prod approval") def _metadata_capture_html() -> str: return """