"""Headless HMRC sandbox OAuth — drives Chromium via Playwright. Logs in as a sandbox test user without needing a human in the loop, captures the authorization code from the localhost callback (the callback URL is never actually fetched — we abort the navigation and read the URL), exchanges for tokens, saves them to a cache file, then optionally calls an API endpoint. Creds + test user credentials are read from Vault. The token cache lives at ~/.cache/hmrc-sync/tokens.json and can be reused across runs until the refresh_token expires (18 months). Usage: python3 headless_auth.py login --user-id 228488477217 --password VLAFXYsz4Uqk python3 headless_auth.py call --utr 2762163393 --tax-year 2015-16 python3 headless_auth.py refresh """ from __future__ import annotations import argparse import contextlib import json import os import secrets import subprocess import sys import time import urllib.parse from dataclasses import dataclass from pathlib import Path import httpx from playwright.sync_api import sync_playwright SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk" AUTH_PATH = "/oauth/authorize" TOKEN_PATH = "/oauth/token" INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}" INCOME_ACCEPT = "application/vnd.hmrc.1.2+json" REDIRECT_URI = "http://localhost:8080/oauth/callback" SCOPE = "read:individual-income" CACHE_DIR = Path.home() / ".cache" / "hmrc-sync" TOKEN_CACHE = CACHE_DIR / "tokens.json" @dataclass class Creds: client_id: str client_secret: str def load_creds() -> Creds: env_id = os.environ.get("HMRC_CLIENT_ID") env_secret = os.environ.get("HMRC_CLIENT_SECRET") if env_id and env_secret: return Creds(env_id, env_secret) cid = subprocess.check_output( ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"], text=True, ).strip() csec = subprocess.check_output( ["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"], text=True, ).strip() return Creds(cid, csec) def save_tokens(tok: dict) -> None: CACHE_DIR.mkdir(parents=True, exist_ok=True) tok_with_meta = dict(tok) tok_with_meta["_cached_at"] = int(time.time()) TOKEN_CACHE.write_text(json.dumps(tok_with_meta, indent=2)) TOKEN_CACHE.chmod(0o600) def load_tokens() -> dict | None: if not TOKEN_CACHE.exists(): return None return json.loads(TOKEN_CACHE.read_text()) def authorize_url(creds: Creds, state: str) -> str: return ( f"{SANDBOX_BASE}{AUTH_PATH}?" + urllib.parse.urlencode({ "response_type": "code", "client_id": creds.client_id, "scope": SCOPE, "redirect_uri": REDIRECT_URI, "state": state, }) ) def headless_get_code(auth_url: str, user_id: str, password: str, state: str) -> str: """Drive Chromium through HMRC sandbox login and extract the auth code.""" with sync_playwright() as p: browser = p.chromium.launch(headless=True) ctx = browser.new_context() captured_code: dict[str, str] = {} # Abort any attempt to hit localhost:8080 and capture the URL that # triggered it — that's the callback with ?code=... def _intercept(route): if "localhost:8080" in route.request.url: parsed = urllib.parse.urlparse(route.request.url) qs = urllib.parse.parse_qs(parsed.query) captured_code["code"] = qs.get("code", [""])[0] captured_code["state"] = qs.get("state", [""])[0] route.abort() else: route.continue_() ctx.route("**/*", _intercept) page = ctx.new_page() page.set_default_timeout(30000) page.goto(auth_url) page.wait_for_load_state("networkidle") # Step 1 — cookie banner ("Reject additional cookies") with contextlib.suppress(Exception): page.get_by_role("button", name="Reject additional cookies").click(timeout=3000) page.wait_for_load_state("networkidle") with contextlib.suppress(Exception): page.get_by_role("button", name="Hide cookie message").click(timeout=2000) # Step 2 — intro page ("Allow your software to connect with HMRC" → Continue) with contextlib.suppress(Exception): page.get_by_role("button", name="Continue").first.click(timeout=5000) page.wait_for_load_state("networkidle") # Step 3 — login form for sel in ["input[name='userId']", "input#userId", "input[name='user_id']", "#user_id"]: try: page.fill(sel, user_id, timeout=2000) break except Exception: continue for sel in ["input[name='password']", "input#password"]: try: page.fill(sel, password, timeout=2000) break except Exception: continue for sel in ["button[type='submit']", "button:has-text('Sign in')", "input[type='submit']"]: try: page.click(sel, timeout=2000) page.wait_for_load_state("networkidle") break except Exception: continue # Step 4 — consent screen ("Grant authority") deadline = time.time() + 20 while time.time() < deadline and "code" not in captured_code: for sel in [ "button:has-text('Grant authority')", "button:has-text('Continue')", "button:has-text('Accept and continue')", "#submit", ]: try: page.click(sel, timeout=1500) break except Exception: continue time.sleep(0.5) browser.close() if "code" not in captured_code or not captured_code["code"]: raise SystemExit(f"Headless login failed to capture code. captured={captured_code}") if captured_code.get("state") != state: raise SystemExit(f"CSRF state mismatch: got {captured_code.get('state')!r}, want {state!r}") return captured_code["code"] def exchange_code(creds: Creds, code: str) -> dict: r = httpx.post( f"{SANDBOX_BASE}{TOKEN_PATH}", data={ "grant_type": "authorization_code", "client_id": creds.client_id, "client_secret": creds.client_secret, "redirect_uri": REDIRECT_URI, "code": code, }, headers={"Accept": "application/vnd.hmrc.1.0+json"}, timeout=30, ) r.raise_for_status() return r.json() def refresh_tokens(creds: Creds, refresh_token: str) -> dict: r = httpx.post( f"{SANDBOX_BASE}{TOKEN_PATH}", data={ "grant_type": "refresh_token", "client_id": creds.client_id, "client_secret": creds.client_secret, "refresh_token": refresh_token, }, headers={"Accept": "application/vnd.hmrc.1.0+json"}, timeout=30, ) r.raise_for_status() return r.json() def get_access_or_die() -> str: tok = load_tokens() if not tok: raise SystemExit("No cached tokens. Run: headless_auth.py login --user-id ... --password ...") age = int(time.time()) - tok.get("_cached_at", 0) if age < tok.get("expires_in", 14400) - 300: return tok["access_token"] # refresh creds = load_creds() new_tok = refresh_tokens(creds, tok["refresh_token"]) save_tokens(new_tok) return new_tok["access_token"] def call_income(utr: str, tax_year: str) -> int: access = get_access_or_die() url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}" r = httpx.get( url, headers={"Accept": INCOME_ACCEPT, "Authorization": f"Bearer {access}"}, timeout=30, ) print(f"GET /individual-income/sa/{utr}/annual-summary/{tax_year} -> HTTP {r.status_code}") try: print(json.dumps(r.json(), indent=2)) except Exception: print(r.text) return 0 if r.status_code < 400 else 2 def cmd_login(args) -> int: creds = load_creds() state = secrets.token_urlsafe(24) url = authorize_url(creds, state) print(f"Headless login → {SANDBOX_BASE}{AUTH_PATH} ...") code = headless_get_code(url, args.user_id, args.password, state) print(f"Got code: {code[:12]}...") tok = exchange_code(creds, code) save_tokens(tok) print(f"Saved tokens to {TOKEN_CACHE}. expires_in={tok.get('expires_in')}s") return 0 def cmd_refresh(_args) -> int: tok = load_tokens() if not tok: raise SystemExit("No tokens to refresh.") creds = load_creds() new_tok = refresh_tokens(creds, tok["refresh_token"]) save_tokens(new_tok) print(f"Refreshed. new expires_in={new_tok.get('expires_in')}s") return 0 def cmd_call(args) -> int: return call_income(args.utr, args.tax_year) def main() -> int: p = argparse.ArgumentParser() sub = p.add_subparsers(dest="cmd", required=True) pl = sub.add_parser("login") pl.add_argument("--user-id", required=True) pl.add_argument("--password", required=True) pl.set_defaults(func=cmd_login) pr = sub.add_parser("refresh") pr.set_defaults(func=cmd_refresh) pc = sub.add_parser("call") pc.add_argument("--utr", required=True) pc.add_argument("--tax-year", default="2015-16") pc.set_defaults(func=cmd_call) args = p.parse_args() return args.func(args) if __name__ == "__main__": sys.exit(main())