commit 5c7baa8acc897eb2b56c5d34ebaf7b640b7e06bb Author: Viktor Barzin Date: Thu May 7 17:06:11 2026 +0000 Initial extraction from monorepo diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..241d831 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +.venv/ +.mypy_cache/ +.pytest_cache/ +.ruff_cache/ +.hypothesis/ +*.egg-info/ diff --git a/.woodpecker.yml b/.woodpecker.yml new file mode 100644 index 0000000..38c16ab --- /dev/null +++ b/.woodpecker.yml @@ -0,0 +1,34 @@ +when: + event: push + +clone: + git: + image: woodpeckerci/plugin-git + settings: + attempts: 5 + backoff: 10s + +steps: + - name: build-and-push + image: woodpeckerci/plugin-docker-buildx + settings: + # Dual-push during the Forgejo registry consolidation bake. After + # ≥14 days clean, registry.viktorbarzin.me drops out (Phase 4). + repo: + - registry.viktorbarzin.me/hmrc-sync + - forgejo.viktorbarzin.me/viktor/hmrc-sync + logins: + - registry: registry.viktorbarzin.me + username: viktorbarzin + password: + from_secret: registry-password + - registry: forgejo.viktorbarzin.me + username: + from_secret: forgejo_user + password: + from_secret: forgejo_push_token + dockerfile: Dockerfile + context: . + auto_tag: true + platforms: + - linux/amd64 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8aebf10 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +FROM python:3.12-slim AS builder + +ENV POETRY_VERSION=1.8.4 \ + POETRY_VIRTUALENVS_IN_PROJECT=true \ + PIP_NO_CACHE_DIR=1 + +RUN pip install --no-cache-dir "poetry==${POETRY_VERSION}" + +WORKDIR /app +COPY pyproject.toml poetry.lock* README.md ./ +RUN poetry install --only main --no-root + +COPY hmrc_sync ./hmrc_sync +COPY alembic ./alembic +COPY alembic.ini ./alembic.ini +RUN poetry install --only main + + +FROM python:3.12-slim + +WORKDIR /app + +RUN useradd --system --uid 10002 --home /app --shell /usr/sbin/nologin hmrc + +COPY --from=builder --chown=hmrc:hmrc /app /app + +ENV PATH="/app/.venv/bin:${PATH}" \ + PYTHONUNBUFFERED=1 + +EXPOSE 8080 +USER hmrc +ENTRYPOINT ["python", "-m", "hmrc_sync"] +CMD ["serve"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..ffbf6bd --- /dev/null +++ b/README.md @@ -0,0 +1,90 @@ +# hmrc-sync + +Pulls annual PAYE/NI figures from **HMRC Individual Tax API v1.1** to +reconcile against the monthly payslip data captured by `payslip-ingest/`. + +## Phase 1 — sandbox OAuth smoke test (shipped) + +Scripts live at the repo root next to this README: + +- `oauth_dance.py` — interactive browser OAuth flow against + `test-api.service.hmrc.gov.uk`, captures the callback on + `localhost:8080/oauth/callback`, exchanges for tokens, hits + `/individual-income/sa/{utr}/annual-summary/{tax_year}`. +- `headless_auth.py` — same flow but driven by Chromium via Playwright. + Useful for CI smoke tests. + +See the inline module docstrings for usage. + +## Phase 2 — production service (scaffolded, awaiting HMRC approval) + +Directory layout matches `payslip-ingest/`: + +``` +hmrc-sync/ +├── hmrc_sync/ +│ ├── __init__.py +│ ├── __main__.py # click CLI: serve / sync / migrate +│ ├── app.py # FastAPI (authorize, callback, sync, healthz) +│ ├── client.py # HmrcClient — wraps Individual Tax API v1.1 +│ ├── db.py # SQLAlchemy models (tax_year_snapshot, fetch_log) +│ ├── fraud_headers.py # build Gov-Client-/Gov-Vendor- headers +│ └── oauth.py # Vault-backed refresh_token storage +├── alembic/ +│ ├── env.py +│ └── versions/0001_initial.py +├── tests/ +│ └── test_fraud_headers.py # CI-gated shape tests + sandbox validator smoke +├── Dockerfile +├── alembic.ini +└── pyproject.toml +``` + +### Critical path to prod + +1. **HMRC Dev Hub** (user action, ~10 min): + - Subscribe to *Individual Tax API v1.1*. + - Add prod redirect URI: `https://hmrc-oauth.viktorbarzin.me/callback`. + - Submit Production Access application — 2 questionnaires, frame as + "single-user PAYE reconciliation, not redistributed". + - Review takes ~10 working days. +2. **File HMRC SDST support ticket** up-front asking (a) is MTD ITSA + signup required for Individual Tax API prod access, and (b) can a + PAYE-only individual voluntarily enroll without self-employment + income. Proceed with app submission in parallel. +3. **Fraud-header validator sweep** (local — blocking): + ``` + HMRC_VALIDATOR=1 pytest tests/test_fraud_headers.py + ``` + Must be green before prod deploy. +4. **After HMRC approval arrives**: + - Seed Vault keys: `hmrc_prod_client_id`, `hmrc_prod_client_secret`, + `hmrc_sync_webhook_token`, `hmrc_device_id` at `secret/viktor/`. + - Create `infra/stacks/hmrc-sync/` Terraform stack (clone from + `infra/stacks/payslip-ingest/`): Deployment, Service, Ingress via + `ingress_factory` (protected=false for HMRC callback), ESO for + Vault→K8s Secret, Grafana datasource ConfigMap, CronJob at 06:00 + UTC daily running `python -m hmrc_sync sync --tax-year current`. + - Deploy stack. + - Visit `https://hmrc-oauth.viktorbarzin.me/authorize` once in a + browser to seed the refresh_token. CronJob takes over thereafter. + +### Dashboard Panel 10 + +`infra/stacks/monitoring/modules/monitoring/dashboards/uk-payslip.json` +already carries Panel 10 ("HMRC Tax Year Reconciliation — Individual +Tax API"). It queries `hmrc_sync.tax_year_snapshot` which doesn't +exist yet on the monitoring DB — the panel renders empty until +hmrc-sync is deployed and the Alembic migration runs. + +### Risks / mitigations + +- **MTD pilot gate blocks API** — SDST ticket resolves; fallback is + payslip-ingest P60 reconciliation (already shipped). +- **Prod approval denied on "personal use"** — reframe + appeal; else + permanent P60-only reconciliation. +- **Fraud-header audit fails** — validator API gates deploy. +- **Refresh token expires (18 months)** — alert on `expires_in` < 30 + days; manual re-auth via `/authorize`. + +Tracked as beads `code-74j`. diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..8be7c72 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,37 @@ +[alembic] +script_location = alembic +sqlalchemy.url = placeholder + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..6d433b4 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,59 @@ +import asyncio +import os +from logging.config import fileConfig + +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context +from hmrc_sync.db import SCHEMA_NAME, Base + +config = context.config +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +db_url = os.environ.get("DB_CONNECTION_STRING") +if db_url: + config.set_main_option("sqlalchemy.url", db_url) + +target_metadata = Base.metadata + + +def do_run_migrations(connection: Connection) -> None: + connection.exec_driver_sql(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA_NAME}"') + context.configure( + connection=connection, + target_metadata=target_metadata, + version_table_schema=SCHEMA_NAME, + include_schemas=True, + ) + with context.begin_transaction(): + context.run_migrations() + + +async def run_migrations_online() -> None: + configuration = config.get_section(config.config_ini_section, {}) + connectable = async_engine_from_config(configuration, prefix="sqlalchemy.") + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + await connection.commit() + await connectable.dispose() + + +def run_migrations_offline() -> None: + context.configure( + url=config.get_main_option("sqlalchemy.url"), + target_metadata=target_metadata, + literal_binds=True, + version_table_schema=SCHEMA_NAME, + include_schemas=True, + dialect_opts={"paramstyle": "named"}, + ) + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + asyncio.run(run_migrations_online()) diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..ecb939d --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} +""" +from collections.abc import Sequence +from typing import Union + +import sqlalchemy as sa + +from alembic import op +${imports if imports else ""} + +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/0001_initial.py b/alembic/versions/0001_initial.py new file mode 100644 index 0000000..c353cee --- /dev/null +++ b/alembic/versions/0001_initial.py @@ -0,0 +1,85 @@ +"""Create hmrc_sync.tax_year_snapshot + hmrc_sync.fetch_log. + +These two tables hold everything hmrc-sync persists. The snapshot table +keeps HMRC's `hmrc-held` PAYE/NI view per (tax_year, employer, day); +fetch_log is the audit trail of every outbound API call (for +fraud-header compliance reviews HMRC may trigger). +""" +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision = "0001" +down_revision = None +branch_labels = None +depends_on = None + +SCHEMA = "hmrc_sync" + + +def upgrade() -> None: + op.create_table( + "tax_year_snapshot", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("tax_year", sa.String(), nullable=False), + sa.Column("employer_paye_ref", sa.String(), nullable=False), + sa.Column("snapshot_date", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("gross_pay", sa.Numeric(12, 2), nullable=False), + sa.Column("income_tax", sa.Numeric(12, 2), nullable=False), + sa.Column("ni_contributions", sa.Numeric(12, 2), nullable=False), + sa.Column("source", sa.String(), nullable=False, server_default="hmrc-held"), + sa.Column( + "raw_response", + postgresql.JSONB().with_variant(sa.JSON(), "sqlite"), + nullable=False, + ), + sa.Column( + "fetched_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.UniqueConstraint( + "tax_year", + "employer_paye_ref", + "snapshot_date", + name="uq_tax_year_snapshot", + ), + schema=SCHEMA, + ) + op.create_index( + "ix_tax_year_snapshot_tax_year", + "tax_year_snapshot", + ["tax_year"], + schema=SCHEMA, + ) + + op.create_table( + "fetch_log", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("endpoint", sa.String(), nullable=False), + sa.Column("status_code", sa.Integer(), nullable=False), + sa.Column("request_id", sa.String(), nullable=True), + sa.Column("correlation_id", sa.String(), nullable=True), + sa.Column( + "fraud_headers_sent", + postgresql.JSONB().with_variant(sa.JSON(), "sqlite"), + nullable=False, + ), + sa.Column("response_snippet", sa.String(), nullable=True), + sa.Column("duration_ms", sa.Integer(), nullable=False), + sa.Column( + "fetched_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + schema=SCHEMA, + ) + + +def downgrade() -> None: + op.drop_table("fetch_log", schema=SCHEMA) + op.drop_index("ix_tax_year_snapshot_tax_year", table_name="tax_year_snapshot", schema=SCHEMA) + op.drop_table("tax_year_snapshot", schema=SCHEMA) diff --git a/headless_auth.py b/headless_auth.py new file mode 100644 index 0000000..cdddad4 --- /dev/null +++ b/headless_auth.py @@ -0,0 +1,296 @@ +"""Headless HMRC sandbox OAuth — drives Chromium via Playwright. + +Logs in as a sandbox test user without needing a human in the loop, +captures the authorization code from the localhost callback (the +callback URL is never actually fetched — we abort the navigation and +read the URL), exchanges for tokens, saves them to a cache file, then +optionally calls an API endpoint. + +Creds + test user credentials are read from Vault. The token cache +lives at ~/.cache/hmrc-sync/tokens.json and can be reused across runs +until the refresh_token expires (18 months). + +Usage: + python3 headless_auth.py login --user-id 228488477217 --password VLAFXYsz4Uqk + python3 headless_auth.py call --utr 2762163393 --tax-year 2015-16 + python3 headless_auth.py refresh +""" + +from __future__ import annotations + +import argparse +import contextlib +import json +import os +import secrets +import subprocess +import sys +import time +import urllib.parse +from dataclasses import dataclass +from pathlib import Path + +import httpx +from playwright.sync_api import sync_playwright + +SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk" +AUTH_PATH = "/oauth/authorize" +TOKEN_PATH = "/oauth/token" +INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}" +INCOME_ACCEPT = "application/vnd.hmrc.1.2+json" + +REDIRECT_URI = "http://localhost:8080/oauth/callback" +SCOPE = "read:individual-income" + +CACHE_DIR = Path.home() / ".cache" / "hmrc-sync" +TOKEN_CACHE = CACHE_DIR / "tokens.json" + + +@dataclass +class Creds: + client_id: str + client_secret: str + + +def load_creds() -> Creds: + env_id = os.environ.get("HMRC_CLIENT_ID") + env_secret = os.environ.get("HMRC_CLIENT_SECRET") + if env_id and env_secret: + return Creds(env_id, env_secret) + cid = subprocess.check_output( + ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"], + text=True, + ).strip() + csec = subprocess.check_output( + ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"], + text=True, + ).strip() + return Creds(cid, csec) + + +def save_tokens(tok: dict) -> None: + CACHE_DIR.mkdir(parents=True, exist_ok=True) + tok_with_meta = dict(tok) + tok_with_meta["_cached_at"] = int(time.time()) + TOKEN_CACHE.write_text(json.dumps(tok_with_meta, indent=2)) + TOKEN_CACHE.chmod(0o600) + + +def load_tokens() -> dict | None: + if not TOKEN_CACHE.exists(): + return None + return json.loads(TOKEN_CACHE.read_text()) + + +def authorize_url(creds: Creds, state: str) -> str: + return ( + f"{SANDBOX_BASE}{AUTH_PATH}?" + + urllib.parse.urlencode({ + "response_type": "code", + "client_id": creds.client_id, + "scope": SCOPE, + "redirect_uri": REDIRECT_URI, + "state": state, + }) + ) + + +def headless_get_code(auth_url: str, user_id: str, password: str, state: str) -> str: + """Drive Chromium through HMRC sandbox login and extract the auth code.""" + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + ctx = browser.new_context() + + captured_code: dict[str, str] = {} + + # Abort any attempt to hit localhost:8080 and capture the URL that + # triggered it — that's the callback with ?code=... + def _intercept(route): + if "localhost:8080" in route.request.url: + parsed = urllib.parse.urlparse(route.request.url) + qs = urllib.parse.parse_qs(parsed.query) + captured_code["code"] = qs.get("code", [""])[0] + captured_code["state"] = qs.get("state", [""])[0] + route.abort() + else: + route.continue_() + + ctx.route("**/*", _intercept) + page = ctx.new_page() + page.set_default_timeout(30000) + + page.goto(auth_url) + page.wait_for_load_state("networkidle") + + # Step 1 — cookie banner ("Reject additional cookies") + with contextlib.suppress(Exception): + page.get_by_role("button", name="Reject additional cookies").click(timeout=3000) + page.wait_for_load_state("networkidle") + with contextlib.suppress(Exception): + page.get_by_role("button", name="Hide cookie message").click(timeout=2000) + + # Step 2 — intro page ("Allow your software to connect with HMRC" → Continue) + with contextlib.suppress(Exception): + page.get_by_role("button", name="Continue").first.click(timeout=5000) + page.wait_for_load_state("networkidle") + + # Step 3 — login form + for sel in ["input[name='userId']", "input#userId", "input[name='user_id']", "#user_id"]: + try: + page.fill(sel, user_id, timeout=2000) + break + except Exception: + continue + for sel in ["input[name='password']", "input#password"]: + try: + page.fill(sel, password, timeout=2000) + break + except Exception: + continue + for sel in ["button[type='submit']", "button:has-text('Sign in')", "input[type='submit']"]: + try: + page.click(sel, timeout=2000) + page.wait_for_load_state("networkidle") + break + except Exception: + continue + + # Step 4 — consent screen ("Grant authority") + deadline = time.time() + 20 + while time.time() < deadline and "code" not in captured_code: + for sel in [ + "button:has-text('Grant authority')", + "button:has-text('Continue')", + "button:has-text('Accept and continue')", + "#submit", + ]: + try: + page.click(sel, timeout=1500) + break + except Exception: + continue + time.sleep(0.5) + + browser.close() + + if "code" not in captured_code or not captured_code["code"]: + raise SystemExit(f"Headless login failed to capture code. captured={captured_code}") + if captured_code.get("state") != state: + raise SystemExit(f"CSRF state mismatch: got {captured_code.get('state')!r}, want {state!r}") + return captured_code["code"] + + +def exchange_code(creds: Creds, code: str) -> dict: + r = httpx.post( + f"{SANDBOX_BASE}{TOKEN_PATH}", + data={ + "grant_type": "authorization_code", + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "redirect_uri": REDIRECT_URI, + "code": code, + }, + headers={"Accept": "application/vnd.hmrc.1.0+json"}, + timeout=30, + ) + r.raise_for_status() + return r.json() + + +def refresh_tokens(creds: Creds, refresh_token: str) -> dict: + r = httpx.post( + f"{SANDBOX_BASE}{TOKEN_PATH}", + data={ + "grant_type": "refresh_token", + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "refresh_token": refresh_token, + }, + headers={"Accept": "application/vnd.hmrc.1.0+json"}, + timeout=30, + ) + r.raise_for_status() + return r.json() + + +def get_access_or_die() -> str: + tok = load_tokens() + if not tok: + raise SystemExit("No cached tokens. Run: headless_auth.py login --user-id ... --password ...") + age = int(time.time()) - tok.get("_cached_at", 0) + if age < tok.get("expires_in", 14400) - 300: + return tok["access_token"] + # refresh + creds = load_creds() + new_tok = refresh_tokens(creds, tok["refresh_token"]) + save_tokens(new_tok) + return new_tok["access_token"] + + +def call_income(utr: str, tax_year: str) -> int: + access = get_access_or_die() + url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}" + r = httpx.get( + url, + headers={"Accept": INCOME_ACCEPT, "Authorization": f"Bearer {access}"}, + timeout=30, + ) + print(f"GET /individual-income/sa/{utr}/annual-summary/{tax_year} -> HTTP {r.status_code}") + try: + print(json.dumps(r.json(), indent=2)) + except Exception: + print(r.text) + return 0 if r.status_code < 400 else 2 + + +def cmd_login(args) -> int: + creds = load_creds() + state = secrets.token_urlsafe(24) + url = authorize_url(creds, state) + print(f"Headless login → {SANDBOX_BASE}{AUTH_PATH} ...") + code = headless_get_code(url, args.user_id, args.password, state) + print(f"Got code: {code[:12]}...") + tok = exchange_code(creds, code) + save_tokens(tok) + print(f"Saved tokens to {TOKEN_CACHE}. expires_in={tok.get('expires_in')}s") + return 0 + + +def cmd_refresh(_args) -> int: + tok = load_tokens() + if not tok: + raise SystemExit("No tokens to refresh.") + creds = load_creds() + new_tok = refresh_tokens(creds, tok["refresh_token"]) + save_tokens(new_tok) + print(f"Refreshed. new expires_in={new_tok.get('expires_in')}s") + return 0 + + +def cmd_call(args) -> int: + return call_income(args.utr, args.tax_year) + + +def main() -> int: + p = argparse.ArgumentParser() + sub = p.add_subparsers(dest="cmd", required=True) + + pl = sub.add_parser("login") + pl.add_argument("--user-id", required=True) + pl.add_argument("--password", required=True) + pl.set_defaults(func=cmd_login) + + pr = sub.add_parser("refresh") + pr.set_defaults(func=cmd_refresh) + + pc = sub.add_parser("call") + pc.add_argument("--utr", required=True) + pc.add_argument("--tax-year", default="2015-16") + pc.set_defaults(func=cmd_call) + + args = p.parse_args() + return args.func(args) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hmrc_sync/__init__.py b/hmrc_sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hmrc_sync/__main__.py b/hmrc_sync/__main__.py new file mode 100644 index 0000000..7b1be4b --- /dev/null +++ b/hmrc_sync/__main__.py @@ -0,0 +1,36 @@ +import logging +import os +import subprocess +import sys + +import click +import uvicorn + + +@click.group() +def cli() -> None: + logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO")) + + +@cli.command() +def serve() -> None: + """Run the FastAPI server (K8s entrypoint).""" + uvicorn.run("hmrc_sync.app:app", host="0.0.0.0", port=8080) + + +@cli.command() +@click.option("--tax-year", default="current", help="Tax year to fetch, e.g. 2024-25 or 'current'.") +def sync(tax_year: str) -> None: + """One-shot sync of HMRC figures — used by the CronJob.""" + raise click.ClickException("Sync stub — implement after HMRC prod approval lands") + + +@cli.command() +def migrate() -> None: + """Run `alembic upgrade head`.""" + result = subprocess.run(["alembic", "upgrade", "head"], check=False) + sys.exit(result.returncode) + + +if __name__ == "__main__": + cli() diff --git a/hmrc_sync/app.py b/hmrc_sync/app.py new file mode 100644 index 0000000..1b0a89e --- /dev/null +++ b/hmrc_sync/app.py @@ -0,0 +1,129 @@ +"""FastAPI entrypoint for hmrc-sync. + +Endpoints: +- GET /authorize — redirect to HMRC OAuth, primes refresh_token +- GET /callback — OAuth callback; exchange code, persist token +- POST /callback-metadata — browser-side session attributes (fraud headers) +- POST /sync — pull latest HMRC figures for a given tax year +- GET /healthz — readiness + queue depth +""" +from __future__ import annotations + +import logging +import os +import secrets +import urllib.parse +from contextlib import asynccontextmanager +from typing import Any + +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import HTMLResponse, RedirectResponse +from prometheus_fastapi_instrumentator import Instrumentator + +from hmrc_sync import oauth +from hmrc_sync.fraud_headers import SessionContext + +log = logging.getLogger(__name__) + +REQUIRED_ENV = [ + "HMRC_PROD_CLIENT_ID", + "HMRC_PROD_CLIENT_SECRET", + "HMRC_PROD_REDIRECT_URI", + "DB_CONNECTION_STRING", +] + + +def _verify_env() -> None: + missing = [k for k in REQUIRED_ENV if not os.environ.get(k)] + if missing: + raise RuntimeError(f"Missing required env vars: {', '.join(missing)}") + + +@asynccontextmanager +async def lifespan(app: FastAPI): # type: ignore[no-untyped-def] + _verify_env() + app.state.session_context = SessionContext( + device_id=os.environ.get("HMRC_DEVICE_ID", ""), + public_ip=os.environ.get("HMRC_VENDOR_PUBLIC_IP", ""), + ) + app.state.oauth_states = {} # anti-CSRF state → expires_at + yield + + +app = FastAPI(title="HMRC Sync", lifespan=lifespan) +Instrumentator().instrument(app).expose(app, endpoint="/metrics") + + +@app.get("/healthz") +async def healthz() -> dict[str, Any]: + return {"status": "ok"} + + +@app.get("/authorize") +async def authorize() -> RedirectResponse: + creds = oauth.load_creds_from_env() + state = secrets.token_urlsafe(24) + app.state.oauth_states[state] = True + params = urllib.parse.urlencode({ + "response_type": "code", + "client_id": creds.client_id, + "scope": "read:self-assessment", + "redirect_uri": creds.redirect_uri, + "state": state, + }) + return RedirectResponse(f"{oauth.PROD_BASE}/oauth/authorize?{params}") + + +@app.get("/callback", response_class=HTMLResponse) +async def callback(code: str, state: str) -> HTMLResponse: + if state not in app.state.oauth_states: + raise HTTPException(status_code=400, detail="unknown state (CSRF)") + del app.state.oauth_states[state] + creds = oauth.load_creds_from_env() + token = await oauth.exchange_code(creds, code) + oauth.persist_to_vault(token) + # Serve a 1-page form that POSTs browser attributes to /callback-metadata + # so we capture the per-session values HMRC wants in fraud headers. + return HTMLResponse(_metadata_capture_html()) + + +@app.post("/callback-metadata") +async def callback_metadata(request: Request) -> dict[str, str]: + body = await request.json() + session: SessionContext = app.state.session_context + session.user_agent = str(body.get("user_agent", "") or "") + session.screen_width = int(body.get("screen_width", 0) or 0) + session.screen_height = int(body.get("screen_height", 0) or 0) + session.screen_colour_depth = int(body.get("screen_colour_depth", 0) or 0) + session.window_width = int(body.get("window_width", 0) or 0) + session.window_height = int(body.get("window_height", 0) or 0) + session.timezone_offset = int(body.get("timezone_offset", 0) or 0) + return {"status": "captured"} + + +@app.post("/sync") +async def sync(tax_year: str | None = None) -> dict[str, Any]: + """Pull latest HMRC figures for `tax_year` (default: current fiscal year).""" + raise HTTPException(status_code=501, detail="Sync not yet implemented — awaiting HMRC prod approval") + + +def _metadata_capture_html() -> str: + return """ +hmrc-sync — capturing session +

Capturing session attributes for HMRC fraud headers...

+ +""" diff --git a/hmrc_sync/client.py b/hmrc_sync/client.py new file mode 100644 index 0000000..396429a --- /dev/null +++ b/hmrc_sync/client.py @@ -0,0 +1,82 @@ +"""HMRC Individual Tax API v1.1 wrapper. + +One method per endpoint we consume. Every request attaches the full fraud- +prevention header set built by `fraud_headers.build_headers()`. + +Individual Tax API v1.1 returns tax-paid + income-breakdown figures per +employment per tax year — exactly the ground-truth data we reconcile +against the payslip-ingest monthly aggregate. +""" +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Any + +import httpx + +from hmrc_sync.fraud_headers import SessionContext, build_headers + +log = logging.getLogger(__name__) + +PROD_BASE = "https://api.service.hmrc.gov.uk" +INDIVIDUAL_TAX_VERSION = "application/vnd.hmrc.1.1+json" + + +@dataclass +class HmrcResponse: + status_code: int + body: dict[str, Any] + duration_ms: int + request_id: str | None + correlation_id: str | None + fraud_headers_sent: dict[str, str] + + +class HmrcClient: + + def __init__(self, + access_token: str, + session: SessionContext, + connection_method: str = "BATCH_PROCESS_DIRECT", + base_url: str = PROD_BASE): + self._access_token = access_token + self._session = session + self._connection_method = connection_method + self._base_url = base_url.rstrip("/") + + async def individual_tax_summary(self, utr: str, tax_year: str) -> HmrcResponse: + """GET /individuals/tax/sa/{utr}/summary/{taxYear} + + `utr` is the 10-digit Self Assessment reference; tax_year format + is `YYYY-YY` (e.g. `2024-25`). + """ + path = f"/individuals/tax/sa/{utr}/summary/{tax_year}" + return await self._get(path) + + async def _get(self, path: str) -> HmrcResponse: + fraud = build_headers(self._session, self._connection_method) + headers = { + "Accept": INDIVIDUAL_TAX_VERSION, + "Authorization": f"Bearer {self._access_token}", + } + headers.update(fraud) + started = time.perf_counter() + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get(f"{self._base_url}{path}", headers=headers) + duration_ms = int((time.perf_counter() - started) * 1000) + body: dict[str, Any] + try: + body = resp.json() if resp.content else {} + except ValueError: + body = {"raw": resp.text[:2000]} + log.info("hmrc %s status=%s duration=%dms", path, resp.status_code, duration_ms) + return HmrcResponse( + status_code=resp.status_code, + body=body, + duration_ms=duration_ms, + request_id=resp.headers.get("x-request-id"), + correlation_id=resp.headers.get("x-correlation-id"), + fraud_headers_sent=fraud, + ) diff --git a/hmrc_sync/db.py b/hmrc_sync/db.py new file mode 100644 index 0000000..6506af8 --- /dev/null +++ b/hmrc_sync/db.py @@ -0,0 +1,70 @@ +import os +from datetime import datetime +from decimal import Decimal +from typing import Any + +from sqlalchemy import JSON, TIMESTAMP, Integer, Numeric, String, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + +SCHEMA_NAME = "hmrc_sync" + + +class Base(DeclarativeBase): + pass + + +JSON_TYPE = JSONB().with_variant(JSON(), "sqlite") + + +class TaxYearSnapshot(Base): + """One row per (tax_year, employer_paye_ref, snapshot_date). + + HMRC returns the `hmrc-held` view of annual PAYE/NI for a given + employment. Taking a daily snapshot lets us see HMRC's figures evolve + as late RTI filings land, and lets the dashboard always show the + latest value by snapshot_date. + """ + __tablename__ = "tax_year_snapshot" + __table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012 + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + tax_year: Mapped[str] = mapped_column(String, nullable=False, index=True) + employer_paye_ref: Mapped[str] = mapped_column(String, nullable=False) + snapshot_date: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), nullable=False) + gross_pay: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + income_tax: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + ni_contributions: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + source: Mapped[str] = mapped_column(String, nullable=False, server_default="hmrc-held") + raw_response: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False) + fetched_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=text("now()")) + + +class FetchLog(Base): + """Audit trail of every HMRC API call — for fraud-header compliance review.""" + __tablename__ = "fetch_log" + __table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012 + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + endpoint: Mapped[str] = mapped_column(String, nullable=False) + status_code: Mapped[int] = mapped_column(Integer, nullable=False) + request_id: Mapped[str | None] = mapped_column(String, nullable=True) + correlation_id: Mapped[str | None] = mapped_column(String, nullable=True) + fraud_headers_sent: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False) + response_snippet: Mapped[str | None] = mapped_column(String, nullable=True) + duration_ms: Mapped[int] = mapped_column(Integer, nullable=False) + fetched_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=text("now()")) + + +def create_engine_from_env() -> AsyncEngine: + url = os.environ["DB_CONNECTION_STRING"] + return create_async_engine(url, pool_pre_ping=True) + + +def make_session_factory(engine: AsyncEngine) -> async_sessionmaker[Any]: + return async_sessionmaker(engine, expire_on_commit=False) diff --git a/hmrc_sync/fraud_headers.py b/hmrc_sync/fraud_headers.py new file mode 100644 index 0000000..05625ee --- /dev/null +++ b/hmrc_sync/fraud_headers.py @@ -0,0 +1,341 @@ +"""Build HMRC MTD fraud-prevention headers (Gov-Client-* / Gov-Vendor-*). + +HMRC's BATCH_PROCESS_DIRECT connection method (what our CronJob uses) +mandates 11 headers on every MTD API call; WEB_APP_VIA_SERVER adds a +handful of browser-derived fields. Shipping without these risks fines +and API-access revocation per the HMRC fraud-prevention guide. + +Layout: + +- **Static** — vendor-constant across runs (product name/version, + hashed license id). +- **Runtime** — collected at module load from the pod's own network + stack + OS: MAC addresses, local IPs, OS family/version, device model. +- **Per-request** — built at call time (timestamps, request ids). +- **Per-session** — captured from the browser on `/callback-metadata` + (screen dimensions, public IP, MFA timestamp). Only WEB_APP_VIA_SERVER. + +The public entry point is `build_headers(session, connection_method)`. +Run `tests/test_fraud_headers.py::test_headers_pass_hmrc_validator` +with `HMRC_VALIDATOR=1` to verify against the HMRC sandbox validator. + +Spec references: + https://developer.service.hmrc.gov.uk/guides/fraud-prevention/ + https://developer.service.hmrc.gov.uk/guides/fraud-prevention/connection-method/batch-process-direct/ + https://developer.service.hmrc.gov.uk/api-documentation/docs/api/service/txm-fph-validator-api/1.0 +""" +from __future__ import annotations + +import getpass +import hashlib +import logging +import os +import platform +import secrets +import socket +import time +import urllib.parse +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +log = logging.getLogger(__name__) + +VENDOR_PRODUCT_NAME = "hmrc-sync" +VENDOR_PRODUCT_VERSION = "0.1.0" +# Self-assigned for a personal single-user tool. HMRC permits arbitrary +# vendor strings; the header value is then SHA-256-hashed per spec +# (`Gov-Vendor-License-IDs: =`). +VENDOR_LICENSE_ID = os.environ.get("HMRC_VENDOR_LICENSE_ID", + "hmrc-sync-private-single-user") +VENDOR_PUBLIC_IP = os.environ.get("HMRC_VENDOR_PUBLIC_IP", "") + +# Valid HMRC connection-method enum values. +CONNECTION_METHOD_BATCH = "BATCH_PROCESS_DIRECT" +CONNECTION_METHOD_WEB_APP = "WEB_APP_VIA_SERVER" +CONNECTION_METHOD_MFA = "AUTH_USING_MFA" + +_NET_CLASS = Path("/sys/class/net") +_EMPTY_MAC = "00:00:00:00:00:00" + + +@dataclass +class SessionContext: + """Browser-side attributes captured on the `/callback-metadata` POST. + + Only relevant for WEB_APP_VIA_SERVER flows (browser-initiated OAuth + + server-side API calls). BATCH_PROCESS_DIRECT flows derive their + context from `RuntimeContext` (see below) without touching these. + """ + user_agent: str = "" + screen_width: int = 0 + screen_height: int = 0 + screen_colour_depth: int = 0 + window_width: int = 0 + window_height: int = 0 + timezone_offset: int = 0 + device_id: str = "" + mfa_timestamp: str = "" + public_ip: str = "" + public_port: int = 0 + + +@dataclass +class RuntimeContext: + """Pod-side environment values required on every API call. + + Collected once at module load (cheap — all local syscalls). If any + field is empty, the header emitter falls back to safe defaults so + the call never goes out with an empty mandatory header. + """ + mac_addresses: list[str] = field(default_factory=list) + local_ips: list[str] = field(default_factory=list) + os_family: str = "" + os_version: str = "" + device_manufacturer: str = "Kubernetes" + device_model: str = "" + os_user: str = "" + + +def _collect_mac_addresses() -> list[str]: + """Read every non-loopback interface MAC from `/sys/class/net/*/address`. + + Colons are kept raw; `_format_mac_list` percent-encodes on output per spec. + """ + out: list[str] = [] + if not _NET_CLASS.exists(): + return out + for iface in sorted(_NET_CLASS.iterdir()): + if iface.name == "lo": + continue + addr_file = iface / "address" + if not addr_file.exists(): + continue + try: + mac = addr_file.read_text().strip() + except OSError: + continue + if mac and mac != _EMPTY_MAC: + out.append(mac) + return out + + +def _collect_local_ips() -> list[str]: + """Every IP bound to this host — IPv4 + IPv6, loopback excluded.""" + ips: set[str] = set() + try: + hostname = socket.gethostname() + for family, _, _, _, sockaddr in socket.getaddrinfo(hostname, None): + raw = sockaddr[0] + if not isinstance(raw, str): + continue + if family == socket.AF_INET and not raw.startswith("127."): + ips.add(raw) + elif family == socket.AF_INET6 and not raw.startswith("::1"): + ips.add(raw.split("%")[0]) # strip zone id + except (socket.gaierror, OSError): + pass + # Also grab the primary outbound IP — `getaddrinfo(hostname)` can miss + # it inside containers whose hostname has no DNS entry. + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("10.255.255.255", 1)) + ips.add(s.getsockname()[0]) + except OSError: + pass + return sorted(ips) + + +def _detect_runtime_context() -> RuntimeContext: + uname = platform.uname() + return RuntimeContext( + mac_addresses=_collect_mac_addresses(), + local_ips=_collect_local_ips(), + os_family=uname.system or "Linux", + os_version=uname.release or "unknown", + device_manufacturer="Kubernetes", + device_model=uname.node or socket.gethostname() or "pod", + os_user=_safe_getuser(), + ) + + +def _safe_getuser() -> str: + try: + return getpass.getuser() + except (KeyError, OSError): + return os.environ.get("USER", "unknown") + + +RUNTIME_CONTEXT: RuntimeContext = _detect_runtime_context() + + +def build_headers(session: SessionContext | None = None, + connection_method: str = CONNECTION_METHOD_BATCH, + runtime: RuntimeContext | None = None) -> dict[str, str]: + """Return the full header dict to attach to every HMRC API call. + + Defaults to BATCH_PROCESS_DIRECT — the mode the CronJob uses. Pass + a populated `SessionContext` + `connection_method=WEB_APP_VIA_SERVER` + for browser-initiated flows; the browser-only fields layer on top. + """ + session = session or SessionContext() + rt = runtime or RUNTIME_CONTEXT + headers: dict[str, str] = {} + headers.update(_static_headers()) + headers.update(_per_request_headers()) + headers.update(_mandatory_runtime_headers(rt, session, connection_method)) + if connection_method == CONNECTION_METHOD_WEB_APP: + headers.update(_web_app_session_headers(session)) + if connection_method == CONNECTION_METHOD_MFA and session.mfa_timestamp: + headers["Gov-Client-MFA-Timestamp"] = session.mfa_timestamp + return headers + + +def _static_headers() -> dict[str, str]: + """Vendor-constant identity headers that apply to every connection method. + + Product-Name is percent-encoded per spec; License-IDs value is SHA-256- + hashed per spec; Version is a key-value pair of `=`. + + Gov-Vendor-Public-IP and Gov-Vendor-Forwarded are NOT emitted here — the + HMRC validator rejects them for BATCH_PROCESS_DIRECT (where no vendor + server sits between the client and the HMRC API). They're added in + `_web_app_session_headers` for the WEB_APP_VIA_SERVER path only. + """ + license_hash = hashlib.sha256(VENDOR_LICENSE_ID.encode()).hexdigest() + return { + "Gov-Vendor-Product-Name": _pct(VENDOR_PRODUCT_NAME), + "Gov-Vendor-Version": f"{VENDOR_PRODUCT_NAME}={VENDOR_PRODUCT_VERSION}", + "Gov-Vendor-License-IDs": f"{VENDOR_PRODUCT_NAME}={license_hash}", + } + + +def _per_request_headers() -> dict[str, str]: + """Per-call trace + timestamp headers. Local-IPs-Timestamp uses HMRC's + exact format `yyyy-MM-ddThh:mm:ss.sssZ` — always UTC, always millis.""" + now_ms = int(time.time() * 1000) + iso_ms = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(now_ms / 1000)) + now_iso = f"{iso_ms}.{now_ms % 1000:03d}Z" + return { + "Gov-Client-Timezone": "UTC+00:00", + "Gov-Client-Local-IPs-Timestamp": now_iso, + "x-correlation-id": str(uuid.uuid4()), + "x-request-id": secrets.token_hex(16), + } + + +def _mandatory_runtime_headers(rt: RuntimeContext, session: SessionContext, + connection_method: str) -> dict[str, str]: + """The 8 headers mandatory for BATCH_PROCESS_DIRECT that come from the + host — Connection-Method, Device-ID, User-IDs, User-Agent, Local-IPs, + MAC-Addresses (+ Timezone and Local-IPs-Timestamp live in + `_per_request_headers`).""" + return { + "Gov-Client-Connection-Method": connection_method, + "Gov-Client-Device-ID": session.device_id or _fallback_device_id(), + "Gov-Client-User-IDs": _user_ids(rt, session), + "Gov-Client-User-Agent": _user_agent(rt, session), + "Gov-Client-Local-IPs": _format_ip_list(rt.local_ips), + "Gov-Client-MAC-Addresses": _format_mac_list(rt.mac_addresses), + } + + +def _web_app_session_headers(session: SessionContext) -> dict[str, str]: + """WEB_APP_VIA_SERVER-only headers — browser context + vendor hop trail. + + Gov-Vendor-Public-IP and Gov-Vendor-Forwarded describe the vendor server + that sits between the user's browser and HMRC — only meaningful for + WEB_APP_VIA_SERVER. BATCH_PROCESS_DIRECT must omit them (validator + rejects them there). + """ + out: dict[str, str] = {} + if session.screen_width and session.screen_height: + out["Gov-Client-Screens"] = ( + f"width={session.screen_width}&height={session.screen_height}" + f"&scaling-factor=1&colour-depth={session.screen_colour_depth}") + if session.window_width and session.window_height: + out["Gov-Client-Window-Size"] = (f"width={session.window_width}&" + f"height={session.window_height}") + if session.public_ip: + out["Gov-Client-Public-IP"] = session.public_ip + if session.public_port: + out["Gov-Client-Public-Port"] = str(session.public_port) + vendor_ip = VENDOR_PUBLIC_IP or (RUNTIME_CONTEXT.local_ips[0] if RUNTIME_CONTEXT.local_ips + else "") + if vendor_ip: + out["Gov-Vendor-Public-IP"] = vendor_ip + out["Gov-Vendor-Forwarded"] = f"by={vendor_ip}&for={vendor_ip}" + return out + + +def _user_ids(rt: RuntimeContext, session: SessionContext) -> str: + """Per spec: `os=&=`. The `os=` field is + mandatory; we additionally tag our application with the OAuth user + so HMRC can correlate activity in a breach investigation. + """ + os_user = rt.os_user or "unknown" + pairs = [f"os={_pct(os_user)}"] + oauth_user = os.environ.get("HMRC_OAUTH_USER", "viktor") + pairs.append(f"hmrc-sync={_pct(oauth_user)}") + _ = session # reserved for future per-session identity extension + return "&".join(pairs) + + +def _user_agent(rt: RuntimeContext, session: SessionContext) -> str: + """Per spec: `os-family=…&os-version=…&device-manufacturer=…&device-model=…`. + + For WEB_APP_VIA_SERVER with a captured browser UA, the browser string + is encoded under `device-model` with the rest of the fields defaulting + to our pod's values — HMRC's validator accepts this hybrid shape. + """ + model = session.user_agent or rt.device_model or "pod" + pairs = [ + f"os-family={_pct(rt.os_family)}", + f"os-version={_pct(rt.os_version)}", + f"device-manufacturer={_pct(rt.device_manufacturer)}", + f"device-model={_pct(model)}", + ] + return "&".join(pairs) + + +def _format_ip_list(ips: list[str]) -> str: + """IPv6 addresses percent-encoded; IPv4 passes through. Joined with ','. + + HMRC's validator accepts an empty header only if the request truly + has no IPs; on a live pod we always have at least one — if the list + comes back empty we fall back to the loopback so the header is + syntactically valid. + """ + if not ips: + return "127.0.0.1" + out = [] + for ip in ips: + out.append(_pct(ip) if ":" in ip else ip) + return ",".join(out) + + +def _format_mac_list(macs: list[str]) -> str: + """Each MAC percent-encoded (colons → %3A), comma-joined. + + Empty list → single dummy MAC so we never ship a blank header; + HMRC's validator treats blank as a violation. + """ + if not macs: + return _pct("02:00:00:00:00:00") + return ",".join(_pct(m) for m in macs) + + +def _fallback_device_id() -> str: + """Deterministic UUID derived from hostname when no Vault-backed + Device-ID is seeded. Stable across restarts on the same node.""" + return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"hmrc-sync-{platform.node()}")) + + +def _pct(s: str) -> str: + return urllib.parse.quote(s, safe="") + + +def as_validator_payload(headers: dict[str, str]) -> dict[str, Any]: + """Reshape headers for the HMRC fraud-header validator API body.""" + return {"headers": [{"name": k, "value": v} for k, v in headers.items()]} diff --git a/hmrc_sync/oauth.py b/hmrc_sync/oauth.py new file mode 100644 index 0000000..cc509d4 --- /dev/null +++ b/hmrc_sync/oauth.py @@ -0,0 +1,125 @@ +"""HMRC OAuth token persistence — Vault-backed refresh-token store. + +The refresh_token is long-lived (HMRC grants 18 months). We keep it in +Vault at `secret/viktor/hmrc_refresh_token` and let ESO sync it to a K8s +Secret the pod mounts as an env var. On every refresh, we write the new +token back to Vault so the next pod restart picks it up. + +Writing back requires Vault write access — the pod uses a short-lived +K8s-auth Vault token with a narrow policy that only allows writing +`secret/viktor/hmrc_refresh_token`. +""" +from __future__ import annotations + +import logging +import os +from dataclasses import dataclass + +import httpx + +log = logging.getLogger(__name__) + +VAULT_KEY = "secret/viktor/hmrc_refresh_token" +PROD_BASE = "https://api.service.hmrc.gov.uk" +TOKEN_PATH = "/oauth/token" + + +@dataclass(frozen=True) +class OAuthCreds: + client_id: str + client_secret: str + redirect_uri: str + + +@dataclass +class TokenBundle: + access_token: str + refresh_token: str + expires_in: int + scope: str + + @classmethod + def from_json(cls, data: dict[str, object]) -> TokenBundle: + return cls( + access_token=str(data["access_token"]), + refresh_token=str(data["refresh_token"]), + expires_in=int(data["expires_in"]), # type: ignore[arg-type] + scope=str(data.get("scope", "")), + ) + + +def load_creds_from_env() -> OAuthCreds: + return OAuthCreds( + client_id=os.environ["HMRC_PROD_CLIENT_ID"], + client_secret=os.environ["HMRC_PROD_CLIENT_SECRET"], + redirect_uri=os.environ["HMRC_PROD_REDIRECT_URI"], + ) + + +async def exchange_code(creds: OAuthCreds, code: str) -> TokenBundle: + """Swap a fresh authorization_code for an access+refresh token pair.""" + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{PROD_BASE}{TOKEN_PATH}", + data={ + "grant_type": "authorization_code", + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "redirect_uri": creds.redirect_uri, + "code": code, + }, + headers={"Accept": "application/vnd.hmrc.1.0+json"}, + ) + resp.raise_for_status() + return TokenBundle.from_json(resp.json()) + + +async def refresh(creds: OAuthCreds, refresh_token: str) -> TokenBundle: + """Exchange an old refresh_token for a fresh access+refresh pair. + + HMRC rotates the refresh_token on every refresh — the old one becomes + invalid immediately after this call returns. Persist the new one to + Vault atomically; a failure between the refresh and the Vault write + leaves us stranded. + """ + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{PROD_BASE}{TOKEN_PATH}", + data={ + "grant_type": "refresh_token", + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "refresh_token": refresh_token, + }, + headers={"Accept": "application/vnd.hmrc.1.0+json"}, + ) + resp.raise_for_status() + return TokenBundle.from_json(resp.json()) + + +def persist_to_vault(token: TokenBundle) -> None: + """Write the new refresh_token back to Vault. + + Uses the hvac client with K8s-auth — the pod's service-account token + at /var/run/secrets/kubernetes.io/serviceaccount/token logs into + Vault's kubernetes auth method and receives a short-lived Vault token + with write access to `secret/viktor/hmrc_refresh_token` only. + """ + import hvac + + addr = os.environ.get("VAULT_ADDR", "https://vault.viktorbarzin.me") + role = os.environ.get("VAULT_K8S_ROLE", "hmrc-sync") + jwt_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" + with open(jwt_path, encoding="utf-8") as fh: + jwt = fh.read() + client = hvac.Client(url=addr) + client.auth.kubernetes.login(role=role, jwt=jwt) + client.secrets.kv.v2.create_or_update_secret( + path="viktor/hmrc_refresh_token", + secret={ + "refresh_token": token.refresh_token, + "expires_in": token.expires_in, + "scope": token.scope, + }, + ) + log.info("Rotated HMRC refresh_token persisted to Vault") diff --git a/oauth_dance.py b/oauth_dance.py new file mode 100644 index 0000000..5b323e3 --- /dev/null +++ b/oauth_dance.py @@ -0,0 +1,178 @@ +"""Phase-1 HMRC MTD OAuth sandbox smoke test. + +Runs the authorization_code flow against HMRC's test environment, captures +the callback on localhost:8080, exchanges for tokens, then calls +/individuals/income-received/employments/{nino}/{taxYear} for a test user. + +Prerequisites (do once in the HMRC dev hub for the app): + 1. Add http://localhost:8080/oauth/callback as a Redirect URI. + 2. Subscribe to "Individuals Income Received API" (and accept terms). + 3. Create a sandbox test user (Individuals → Create Test User) and note + the NINO + Government Gateway user ID + password. + +Credentials are read from Vault (secret/viktor/hmrc_mtd_sandbox_client_{id,secret}) +with env-var fallback for portability. + +Run: + python3 oauth_dance.py --nino NH000000A --tax-year 2025-26 +""" + +from __future__ import annotations + +import argparse +import http.server +import json +import os +import secrets +import socketserver +import sys +import threading +import urllib.parse +import webbrowser +from dataclasses import dataclass + +import httpx + +SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk" +AUTH_PATH = "/oauth/authorize" +TOKEN_PATH = "/oauth/token" +# Legacy "Individual Income API" v1.2 — annual SA summary. Path uses +# the 10-digit Self-Assessment UTR, NOT the NINO. MTD +# "Individuals Income Received API" would be richer (in-year YTD) but +# isn't available to this app's subscription list. +INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}" +INCOME_ACCEPT = "application/vnd.hmrc.1.2+json" + +REDIRECT_URI = "http://localhost:8080/oauth/callback" +CALLBACK_PORT = 8080 +SCOPE = "read:individual-income" + + +@dataclass +class Creds: + client_id: str + client_secret: str + + +def load_creds() -> Creds: + env_id = os.environ.get("HMRC_CLIENT_ID") + env_secret = os.environ.get("HMRC_CLIENT_SECRET") + if env_id and env_secret: + return Creds(env_id, env_secret) + import subprocess + cid = subprocess.check_output( + ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"], + text=True, + ).strip() + csec = subprocess.check_output( + ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"], + text=True, + ).strip() + return Creds(cid, csec) + + +class _CallbackHandler(http.server.BaseHTTPRequestHandler): + captured: dict[str, str] = {} + + def do_GET(self) -> None: + parsed = urllib.parse.urlparse(self.path) + if parsed.path != "/oauth/callback": + self.send_response(404) + self.end_headers() + return + qs = urllib.parse.parse_qs(parsed.query) + _CallbackHandler.captured.update({k: v[0] for k, v in qs.items()}) + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + body = b"

HMRC auth received. You can close this tab.

" + self.wfile.write(body) + + def log_message(self, *_args) -> None: # silence default stderr spam + pass + + +def run_callback_server_until_code(expected_state: str) -> dict[str, str]: + with socketserver.TCPServer(("127.0.0.1", CALLBACK_PORT), _CallbackHandler) as srv: + t = threading.Thread(target=srv.serve_forever, daemon=True) + t.start() + while "code" not in _CallbackHandler.captured and "error" not in _CallbackHandler.captured: + threading.Event().wait(0.25) + srv.shutdown() + got = dict(_CallbackHandler.captured) + if got.get("state") != expected_state: + raise SystemExit(f"CSRF: state mismatch (got {got.get('state')!r}, want {expected_state!r})") + if "error" in got: + raise SystemExit(f"HMRC returned error: {got}") + return got + + +def exchange_code(creds: Creds, code: str) -> dict: + r = httpx.post( + f"{SANDBOX_BASE}{TOKEN_PATH}", + data={ + "grant_type": "authorization_code", + "client_id": creds.client_id, + "client_secret": creds.client_secret, + "redirect_uri": REDIRECT_URI, + "code": code, + }, + headers={"Accept": "application/vnd.hmrc.1.0+json"}, + timeout=30, + ) + r.raise_for_status() + return r.json() + + +def call_income_received(access_token: str, utr: str, tax_year: str) -> httpx.Response: + """tax_year is '2015-16' style (legacy Individual Income API).""" + url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}" + return httpx.get( + url, + headers={ + "Accept": INCOME_ACCEPT, + "Authorization": f"Bearer {access_token}", + }, + timeout=30, + ) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--utr", required=True, help="Sandbox test-user 10-digit SA UTR, e.g. 2762163393") + parser.add_argument("--tax-year", default="2015-16", help="Format 2015-16. Sandbox may only have canned data for certain years.") + args = parser.parse_args() + + creds = load_creds() + state = secrets.token_urlsafe(24) + auth_url = ( + f"{SANDBOX_BASE}{AUTH_PATH}?" + + urllib.parse.urlencode({ + "response_type": "code", + "client_id": creds.client_id, + "scope": SCOPE, + "redirect_uri": REDIRECT_URI, + "state": state, + }) + ) + print(f"Opening browser to HMRC sandbox login...\n {auth_url}\n") + webbrowser.open(auth_url) + captured = run_callback_server_until_code(expected_state=state) + print(f"Got auth code (truncated): {captured['code'][:12]}...") + + tokens = exchange_code(creds, captured["code"]) + access = tokens["access_token"] + print(f"Got access_token (exp {tokens.get('expires_in')}s), refresh_token present={('refresh_token' in tokens)}") + + resp = call_income_received(access, args.utr, args.tax_year) + print(f"\nGET /individual-income/sa/{args.utr}/annual-summary/{args.tax_year} → HTTP {resp.status_code}") + try: + print(json.dumps(resp.json(), indent=2)) + except Exception: + print(resp.text) + + return 0 if resp.status_code < 400 else 2 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..772aa6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[tool.poetry] +name = "hmrc-sync" +version = "0.1.0" +description = "Pulls annual PAYE/NI from HMRC Individual Tax API v1.1 to reconcile against payslip-ingest" +authors = ["Viktor Barzin "] +readme = "README.md" +packages = [{ include = "hmrc_sync" }] + +[tool.poetry.dependencies] +python = ">=3.12,<3.13" +fastapi = "^0.115" +uvicorn = "^0.32" +httpx = "^0.27" +pydantic = "^2.9" +sqlalchemy = { extras = ["asyncio"], version = "^2.0" } +asyncpg = "^0.29" +alembic = "^1.13" +click = "^8.1" +prometheus-fastapi-instrumentator = "^7.0" +hvac = "^2.3" + +[tool.poetry.group.dev.dependencies] +pytest = "^8.3" +pytest-asyncio = "^0.23" +mypy = "^1.11" +ruff = "^0.6" +yapf = "^0.43" +respx = "^0.21" +aiosqlite = "^0.20" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.mypy] +python_version = "3.12" +strict = true +files = ["hmrc_sync", "tests"] + +[tool.ruff] +line-length = 100 +target-version = "py312" + +[tool.ruff.lint] +select = ["E", "F", "W", "I", "UP", "B", "SIM", "RUF"] + +[tool.yapf] +based_on_style = "pep8" +column_limit = 100 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_fraud_headers.py b/tests/test_fraud_headers.py new file mode 100644 index 0000000..304161c --- /dev/null +++ b/tests/test_fraud_headers.py @@ -0,0 +1,292 @@ +"""Fraud-header compliance checks. + +Two layers: + +1. **Local shape assertions** — pure-python checks that every mandatory + Gov-Client-*/Gov-Vendor-* header is present and shaped per HMRC spec. + Runs in every CI build. + +2. **HMRC validator API smoke test** (`test_headers_pass_hmrc_validator`): + POSTs the generated header set to the HMRC sandbox validator and + asserts a clean 200 with no rejected headers. Gated on the + `HMRC_VALIDATOR` env var so `pytest` still runs fine offline. + +HMRC audits fraud headers during production-access review — a failing +validator smoke test MUST block deploy. + +Spec references (primary): + https://developer.service.hmrc.gov.uk/guides/fraud-prevention/connection-method/batch-process-direct/ + https://developer.service.hmrc.gov.uk/api-documentation/docs/api/service/txm-fph-validator-api/1.0 +""" +from __future__ import annotations + +import hashlib +import os +import re + +import httpx +import pytest + +from hmrc_sync.fraud_headers import ( + CONNECTION_METHOD_BATCH, + CONNECTION_METHOD_WEB_APP, + RUNTIME_CONTEXT, + VENDOR_LICENSE_ID, + VENDOR_PRODUCT_NAME, + RuntimeContext, + SessionContext, + as_validator_payload, + build_headers, +) + +VALIDATOR_URL = ( + "https://test-api.service.hmrc.gov.uk/test/fraud-prevention-headers/validate") + +# Per HMRC BATCH_PROCESS_DIRECT spec (11 mandatory headers). +BATCH_MANDATORY = { + "Gov-Client-Connection-Method", + "Gov-Client-Device-ID", + "Gov-Client-Local-IPs", + "Gov-Client-Local-IPs-Timestamp", + "Gov-Client-MAC-Addresses", + "Gov-Client-Timezone", + "Gov-Client-User-Agent", + "Gov-Client-User-IDs", + "Gov-Vendor-License-IDs", + "Gov-Vendor-Product-Name", + "Gov-Vendor-Version", +} + +# WEB_APP_VIA_SERVER adds browser-origin context on top of the batch set. +WEB_APP_EXTRAS = { + "Gov-Client-Screens", + "Gov-Client-Window-Size", + "Gov-Client-Public-IP", + "Gov-Client-Public-Port", +} + + +def _full_session() -> SessionContext: + return SessionContext( + user_agent="Mozilla/5.0 (X11; Linux x86_64) hmrc-sync-test", + screen_width=1920, + screen_height=1080, + screen_colour_depth=24, + window_width=1600, + window_height=900, + timezone_offset=0, + device_id="6c3a9f60-1111-2222-3333-abcdef012345", + public_ip="203.0.113.5", + public_port=443, + ) + + +# -------------------------------------------------------------------------- +# BATCH_PROCESS_DIRECT — the CronJob path. All 11 headers must be present. +# -------------------------------------------------------------------------- + + +def test_batch_process_includes_all_11_mandatory_headers() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + missing = BATCH_MANDATORY - hdrs.keys() + assert not missing, f"BATCH_PROCESS_DIRECT missing mandatory headers: {missing}" + + +def test_batch_process_omits_browser_only_headers() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + # Screens / Window-Size are browser-origin; Public-IP/Port route via a + # client-facing IP which doesn't apply to a batch job. + for h in ("Gov-Client-Screens", "Gov-Client-Window-Size", + "Gov-Client-Public-IP", "Gov-Client-Public-Port"): + assert h not in hdrs, f"BATCH emitted browser-only header: {h}" + + +def test_batch_process_connection_method_value() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + assert hdrs["Gov-Client-Connection-Method"] == "BATCH_PROCESS_DIRECT" + + +# -------------------------------------------------------------------------- +# Header-value shape assertions (per HMRC spec). +# -------------------------------------------------------------------------- + + +def test_user_ids_starts_with_os_field() -> None: + """Per spec: `os=&=`. `os=` is mandatory.""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Client-User-IDs"] + assert value.startswith("os="), f"User-IDs missing os= prefix: {value!r}" + # Key-value pairs separated by & — at least one beyond `os=`. + pairs = value.split("&") + assert len(pairs) >= 2, f"User-IDs should have app identifier too: {value!r}" + + +def test_user_agent_has_all_four_spec_fields() -> None: + """Spec: `os-family=…&os-version=…&device-manufacturer=…&device-model=…`.""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Client-User-Agent"] + for key in ("os-family=", "os-version=", "device-manufacturer=", "device-model="): + assert key in value, f"User-Agent missing {key!r}: {value!r}" + + +def test_mac_addresses_percent_encoded() -> None: + """Spec: colons in MACs must be percent-encoded (%3A).""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Client-MAC-Addresses"] + assert value, "MAC-Addresses must never be empty" + assert ":" not in value, f"MAC-Addresses contains raw colons: {value!r}" + assert "%3A" in value, f"MAC-Addresses must use %3A: {value!r}" + + +def test_local_ips_ipv6_percent_encoded() -> None: + """IPv6 entries percent-encoded; IPv4 passes through.""" + hdrs = build_headers( + connection_method=CONNECTION_METHOD_BATCH, + runtime=_runtime_with_ips(["10.0.0.4", "fe80::1"]), + ) + value = hdrs["Gov-Client-Local-IPs"] + assert "10.0.0.4" in value + assert "fe80::1" not in value # raw v6 forbidden + assert "fe80%3A%3A1" in value, f"IPv6 not encoded: {value!r}" + + +def test_vendor_license_id_is_sha256_hashed() -> None: + """Spec: `Gov-Vendor-License-IDs: =`.""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Vendor-License-IDs"] + expected_hash = hashlib.sha256(VENDOR_LICENSE_ID.encode()).hexdigest() + assert value == f"{VENDOR_PRODUCT_NAME}={expected_hash}", value + # Hash must be 64 hex chars — catches accidental plaintext leakage. + assert re.fullmatch(r"[a-z0-9-]+=[0-9a-f]{64}", value), value + + +def test_vendor_product_name_percent_encoded() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + assert hdrs["Gov-Vendor-Product-Name"] == "hmrc-sync" # no reserved chars in name + + +def test_vendor_version_format() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Vendor-Version"] + assert re.fullmatch(r"[a-z0-9-]+=\d+\.\d+\.\d+", value), value + + +def test_local_ips_timestamp_spec_format() -> None: + """Spec: `yyyy-MM-ddThh:mm:ss.sssZ` — 24-hour, UTC, 3-digit millis.""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Client-Local-IPs-Timestamp"] + assert re.fullmatch(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z", value), value + + +def test_timezone_utc_offset_format() -> None: + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + assert re.fullmatch(r"UTC[+-]\d{2}:\d{2}", hdrs["Gov-Client-Timezone"]) + + +def test_device_id_is_valid_uuid() -> None: + """UUID shape check: 8-4-4-4-12 hex — applies to fallback too.""" + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + value = hdrs["Gov-Client-Device-ID"] + assert re.fullmatch( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", + value, + ), value + + +# -------------------------------------------------------------------------- +# MFA gating + per-call variance. +# -------------------------------------------------------------------------- + + +def test_mfa_timestamp_only_emitted_for_mfa_method() -> None: + """Gov-Client-MFA-Timestamp is for AUTH_USING_MFA; batch must not emit it.""" + batch = build_headers(connection_method=CONNECTION_METHOD_BATCH) + assert "Gov-Client-MFA-Timestamp" not in batch + + session = _full_session() + session.mfa_timestamp = "2026-04-19T21:30:00.000Z" + mfa = build_headers(session, connection_method="AUTH_USING_MFA") + assert mfa.get("Gov-Client-MFA-Timestamp") == "2026-04-19T21:30:00.000Z" + + +def test_correlation_id_differs_per_call() -> None: + a = build_headers(connection_method=CONNECTION_METHOD_BATCH) + b = build_headers(connection_method=CONNECTION_METHOD_BATCH) + assert a["x-correlation-id"] != b["x-correlation-id"] + + +# -------------------------------------------------------------------------- +# WEB_APP_VIA_SERVER — batch set + browser extras. +# -------------------------------------------------------------------------- + + +def test_web_app_includes_batch_mandatory_plus_browser_extras() -> None: + hdrs = build_headers(_full_session(), connection_method=CONNECTION_METHOD_WEB_APP) + missing = (BATCH_MANDATORY | WEB_APP_EXTRAS) - hdrs.keys() + assert not missing, f"WEB_APP missing headers: {missing}" + + +# -------------------------------------------------------------------------- +# Payload reshape (used by the validator smoke test + CI self-tests). +# -------------------------------------------------------------------------- + + +def test_as_validator_payload_reshape() -> None: + hdrs = {"Gov-Client-Connection-Method": "X", "Gov-Vendor-Product-Name": "y"} + payload = as_validator_payload(hdrs) + assert payload["headers"] == [ + {"name": "Gov-Client-Connection-Method", "value": "X"}, + {"name": "Gov-Vendor-Product-Name", "value": "y"}, + ] + + +# -------------------------------------------------------------------------- +# HMRC sandbox validator smoke test — set HMRC_VALIDATOR=1 to enable. +# -------------------------------------------------------------------------- + + +@pytest.mark.skipif( + not (os.environ.get("HMRC_VALIDATOR") + and os.environ.get("HMRC_SANDBOX_TOKEN")), + reason=("HMRC sandbox validator smoke test — set HMRC_VALIDATOR=1 AND " + "HMRC_SANDBOX_TOKEN=. Dev Hub app must be subscribed " + "to txm-fph-validator-api/1.0 (application-restricted)."), +) +def test_headers_pass_hmrc_validator() -> None: + """GET /test/fraud-prevention-headers/validate with BATCH headers. + + Per the OAS spec the validator is a GET endpoint — headers go in the + actual HTTP request, not a JSON body. Auth is application-restricted + (client_credentials bearer). A successful response has code=VALID_HEADERS; + POTENTIALLY_INVALID_HEADERS emits warnings but still passes; only + INVALID_HEADERS is a hard fail. + """ + hdrs = build_headers(connection_method=CONNECTION_METHOD_BATCH) + request_headers = { + **hdrs, + "Accept": "application/vnd.hmrc.1.0+json", + "Authorization": f"Bearer {os.environ['HMRC_SANDBOX_TOKEN']}", + } + resp = httpx.get(VALIDATOR_URL, headers=request_headers, timeout=30.0) + assert resp.status_code == 200, ( + f"validator refused: {resp.status_code} {resp.text[:500]}") + body = resp.json() + code = body.get("code") + assert code != "INVALID_HEADERS", f"validator rejected: {body}" + # POTENTIALLY_INVALID_HEADERS is allowed — HMRC surfaces them as warnings; + # log for visibility but don't fail the build on them. + if code == "POTENTIALLY_INVALID_HEADERS": + print(f"validator warnings: {body.get('warnings')}") + + +def _runtime_with_ips(ips: list[str]) -> RuntimeContext: + """Build a RuntimeContext override with caller-specified local_ips.""" + return RuntimeContext( + mac_addresses=RUNTIME_CONTEXT.mac_addresses, + local_ips=ips, + os_family=RUNTIME_CONTEXT.os_family, + os_version=RUNTIME_CONTEXT.os_version, + device_manufacturer=RUNTIME_CONTEXT.device_manufacturer, + device_model=RUNTIME_CONTEXT.device_model, + os_user=RUNTIME_CONTEXT.os_user, + )