130 lines
4.4 KiB
Python
130 lines
4.4 KiB
Python
|
|
"""FastAPI entrypoint for hmrc-sync.
|
||
|
|
|
||
|
|
Endpoints:
|
||
|
|
- GET /authorize — redirect to HMRC OAuth, primes refresh_token
|
||
|
|
- GET /callback — OAuth callback; exchange code, persist token
|
||
|
|
- POST /callback-metadata — browser-side session attributes (fraud headers)
|
||
|
|
- POST /sync — pull latest HMRC figures for a given tax year
|
||
|
|
- GET /healthz — readiness + queue depth
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import secrets
|
||
|
|
import urllib.parse
|
||
|
|
from contextlib import asynccontextmanager
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from fastapi import FastAPI, HTTPException, Request
|
||
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
||
|
|
from prometheus_fastapi_instrumentator import Instrumentator
|
||
|
|
|
||
|
|
from hmrc_sync import oauth
|
||
|
|
from hmrc_sync.fraud_headers import SessionContext
|
||
|
|
|
||
|
|
log = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
REQUIRED_ENV = [
|
||
|
|
"HMRC_PROD_CLIENT_ID",
|
||
|
|
"HMRC_PROD_CLIENT_SECRET",
|
||
|
|
"HMRC_PROD_REDIRECT_URI",
|
||
|
|
"DB_CONNECTION_STRING",
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
def _verify_env() -> None:
|
||
|
|
missing = [k for k in REQUIRED_ENV if not os.environ.get(k)]
|
||
|
|
if missing:
|
||
|
|
raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
|
||
|
|
|
||
|
|
|
||
|
|
@asynccontextmanager
|
||
|
|
async def lifespan(app: FastAPI): # type: ignore[no-untyped-def]
|
||
|
|
_verify_env()
|
||
|
|
app.state.session_context = SessionContext(
|
||
|
|
device_id=os.environ.get("HMRC_DEVICE_ID", ""),
|
||
|
|
public_ip=os.environ.get("HMRC_VENDOR_PUBLIC_IP", ""),
|
||
|
|
)
|
||
|
|
app.state.oauth_states = {} # anti-CSRF state → expires_at
|
||
|
|
yield
|
||
|
|
|
||
|
|
|
||
|
|
app = FastAPI(title="HMRC Sync", lifespan=lifespan)
|
||
|
|
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
|
||
|
|
|
||
|
|
|
||
|
|
@app.get("/healthz")
|
||
|
|
async def healthz() -> dict[str, Any]:
|
||
|
|
return {"status": "ok"}
|
||
|
|
|
||
|
|
|
||
|
|
@app.get("/authorize")
|
||
|
|
async def authorize() -> RedirectResponse:
|
||
|
|
creds = oauth.load_creds_from_env()
|
||
|
|
state = secrets.token_urlsafe(24)
|
||
|
|
app.state.oauth_states[state] = True
|
||
|
|
params = urllib.parse.urlencode({
|
||
|
|
"response_type": "code",
|
||
|
|
"client_id": creds.client_id,
|
||
|
|
"scope": "read:self-assessment",
|
||
|
|
"redirect_uri": creds.redirect_uri,
|
||
|
|
"state": state,
|
||
|
|
})
|
||
|
|
return RedirectResponse(f"{oauth.PROD_BASE}/oauth/authorize?{params}")
|
||
|
|
|
||
|
|
|
||
|
|
@app.get("/callback", response_class=HTMLResponse)
|
||
|
|
async def callback(code: str, state: str) -> HTMLResponse:
|
||
|
|
if state not in app.state.oauth_states:
|
||
|
|
raise HTTPException(status_code=400, detail="unknown state (CSRF)")
|
||
|
|
del app.state.oauth_states[state]
|
||
|
|
creds = oauth.load_creds_from_env()
|
||
|
|
token = await oauth.exchange_code(creds, code)
|
||
|
|
oauth.persist_to_vault(token)
|
||
|
|
# Serve a 1-page form that POSTs browser attributes to /callback-metadata
|
||
|
|
# so we capture the per-session values HMRC wants in fraud headers.
|
||
|
|
return HTMLResponse(_metadata_capture_html())
|
||
|
|
|
||
|
|
|
||
|
|
@app.post("/callback-metadata")
|
||
|
|
async def callback_metadata(request: Request) -> dict[str, str]:
|
||
|
|
body = await request.json()
|
||
|
|
session: SessionContext = app.state.session_context
|
||
|
|
session.user_agent = str(body.get("user_agent", "") or "")
|
||
|
|
session.screen_width = int(body.get("screen_width", 0) or 0)
|
||
|
|
session.screen_height = int(body.get("screen_height", 0) or 0)
|
||
|
|
session.screen_colour_depth = int(body.get("screen_colour_depth", 0) or 0)
|
||
|
|
session.window_width = int(body.get("window_width", 0) or 0)
|
||
|
|
session.window_height = int(body.get("window_height", 0) or 0)
|
||
|
|
session.timezone_offset = int(body.get("timezone_offset", 0) or 0)
|
||
|
|
return {"status": "captured"}
|
||
|
|
|
||
|
|
|
||
|
|
@app.post("/sync")
|
||
|
|
async def sync(tax_year: str | None = None) -> dict[str, Any]:
|
||
|
|
"""Pull latest HMRC figures for `tax_year` (default: current fiscal year)."""
|
||
|
|
raise HTTPException(status_code=501, detail="Sync not yet implemented — awaiting HMRC prod approval")
|
||
|
|
|
||
|
|
|
||
|
|
def _metadata_capture_html() -> str:
|
||
|
|
return """<!doctype html>
|
||
|
|
<html><head><title>hmrc-sync — capturing session</title></head><body>
|
||
|
|
<h2>Capturing session attributes for HMRC fraud headers...</h2>
|
||
|
|
<script>
|
||
|
|
fetch('/callback-metadata', {
|
||
|
|
method: 'POST',
|
||
|
|
headers: {'Content-Type': 'application/json'},
|
||
|
|
body: JSON.stringify({
|
||
|
|
user_agent: navigator.userAgent,
|
||
|
|
screen_width: screen.width,
|
||
|
|
screen_height: screen.height,
|
||
|
|
screen_colour_depth: screen.colorDepth,
|
||
|
|
window_width: window.innerWidth,
|
||
|
|
window_height: window.innerHeight,
|
||
|
|
timezone_offset: -new Date().getTimezoneOffset()
|
||
|
|
})
|
||
|
|
}).then(() => document.body.innerHTML = '<h2>Done. You can close this tab.</h2>');
|
||
|
|
</script>
|
||
|
|
</body></html>"""
|