hmrc-sync/headless_auth.py
2026-05-07 17:06:11 +00:00

296 lines
9.5 KiB
Python

"""Headless HMRC sandbox OAuth — drives Chromium via Playwright.
Logs in as a sandbox test user without needing a human in the loop,
captures the authorization code from the localhost callback (the
callback URL is never actually fetched — we abort the navigation and
read the URL), exchanges for tokens, saves them to a cache file, then
optionally calls an API endpoint.
Creds + test user credentials are read from Vault. The token cache
lives at ~/.cache/hmrc-sync/tokens.json and can be reused across runs
until the refresh_token expires (18 months).
Usage:
python3 headless_auth.py login --user-id 228488477217 --password VLAFXYsz4Uqk
python3 headless_auth.py call --utr 2762163393 --tax-year 2015-16
python3 headless_auth.py refresh
"""
from __future__ import annotations
import argparse
import contextlib
import json
import os
import secrets
import subprocess
import sys
import time
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
import httpx
from playwright.sync_api import sync_playwright
SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk"
AUTH_PATH = "/oauth/authorize"
TOKEN_PATH = "/oauth/token"
INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}"
INCOME_ACCEPT = "application/vnd.hmrc.1.2+json"
REDIRECT_URI = "http://localhost:8080/oauth/callback"
SCOPE = "read:individual-income"
CACHE_DIR = Path.home() / ".cache" / "hmrc-sync"
TOKEN_CACHE = CACHE_DIR / "tokens.json"
@dataclass
class Creds:
client_id: str
client_secret: str
def load_creds() -> Creds:
env_id = os.environ.get("HMRC_CLIENT_ID")
env_secret = os.environ.get("HMRC_CLIENT_SECRET")
if env_id and env_secret:
return Creds(env_id, env_secret)
cid = subprocess.check_output(
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"],
text=True,
).strip()
csec = subprocess.check_output(
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"],
text=True,
).strip()
return Creds(cid, csec)
def save_tokens(tok: dict) -> None:
CACHE_DIR.mkdir(parents=True, exist_ok=True)
tok_with_meta = dict(tok)
tok_with_meta["_cached_at"] = int(time.time())
TOKEN_CACHE.write_text(json.dumps(tok_with_meta, indent=2))
TOKEN_CACHE.chmod(0o600)
def load_tokens() -> dict | None:
if not TOKEN_CACHE.exists():
return None
return json.loads(TOKEN_CACHE.read_text())
def authorize_url(creds: Creds, state: str) -> str:
return (
f"{SANDBOX_BASE}{AUTH_PATH}?"
+ urllib.parse.urlencode({
"response_type": "code",
"client_id": creds.client_id,
"scope": SCOPE,
"redirect_uri": REDIRECT_URI,
"state": state,
})
)
def headless_get_code(auth_url: str, user_id: str, password: str, state: str) -> str:
"""Drive Chromium through HMRC sandbox login and extract the auth code."""
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
ctx = browser.new_context()
captured_code: dict[str, str] = {}
# Abort any attempt to hit localhost:8080 and capture the URL that
# triggered it — that's the callback with ?code=...
def _intercept(route):
if "localhost:8080" in route.request.url:
parsed = urllib.parse.urlparse(route.request.url)
qs = urllib.parse.parse_qs(parsed.query)
captured_code["code"] = qs.get("code", [""])[0]
captured_code["state"] = qs.get("state", [""])[0]
route.abort()
else:
route.continue_()
ctx.route("**/*", _intercept)
page = ctx.new_page()
page.set_default_timeout(30000)
page.goto(auth_url)
page.wait_for_load_state("networkidle")
# Step 1 — cookie banner ("Reject additional cookies")
with contextlib.suppress(Exception):
page.get_by_role("button", name="Reject additional cookies").click(timeout=3000)
page.wait_for_load_state("networkidle")
with contextlib.suppress(Exception):
page.get_by_role("button", name="Hide cookie message").click(timeout=2000)
# Step 2 — intro page ("Allow your software to connect with HMRC" → Continue)
with contextlib.suppress(Exception):
page.get_by_role("button", name="Continue").first.click(timeout=5000)
page.wait_for_load_state("networkidle")
# Step 3 — login form
for sel in ["input[name='userId']", "input#userId", "input[name='user_id']", "#user_id"]:
try:
page.fill(sel, user_id, timeout=2000)
break
except Exception:
continue
for sel in ["input[name='password']", "input#password"]:
try:
page.fill(sel, password, timeout=2000)
break
except Exception:
continue
for sel in ["button[type='submit']", "button:has-text('Sign in')", "input[type='submit']"]:
try:
page.click(sel, timeout=2000)
page.wait_for_load_state("networkidle")
break
except Exception:
continue
# Step 4 — consent screen ("Grant authority")
deadline = time.time() + 20
while time.time() < deadline and "code" not in captured_code:
for sel in [
"button:has-text('Grant authority')",
"button:has-text('Continue')",
"button:has-text('Accept and continue')",
"#submit",
]:
try:
page.click(sel, timeout=1500)
break
except Exception:
continue
time.sleep(0.5)
browser.close()
if "code" not in captured_code or not captured_code["code"]:
raise SystemExit(f"Headless login failed to capture code. captured={captured_code}")
if captured_code.get("state") != state:
raise SystemExit(f"CSRF state mismatch: got {captured_code.get('state')!r}, want {state!r}")
return captured_code["code"]
def exchange_code(creds: Creds, code: str) -> dict:
r = httpx.post(
f"{SANDBOX_BASE}{TOKEN_PATH}",
data={
"grant_type": "authorization_code",
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"redirect_uri": REDIRECT_URI,
"code": code,
},
headers={"Accept": "application/vnd.hmrc.1.0+json"},
timeout=30,
)
r.raise_for_status()
return r.json()
def refresh_tokens(creds: Creds, refresh_token: str) -> dict:
r = httpx.post(
f"{SANDBOX_BASE}{TOKEN_PATH}",
data={
"grant_type": "refresh_token",
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"refresh_token": refresh_token,
},
headers={"Accept": "application/vnd.hmrc.1.0+json"},
timeout=30,
)
r.raise_for_status()
return r.json()
def get_access_or_die() -> str:
tok = load_tokens()
if not tok:
raise SystemExit("No cached tokens. Run: headless_auth.py login --user-id ... --password ...")
age = int(time.time()) - tok.get("_cached_at", 0)
if age < tok.get("expires_in", 14400) - 300:
return tok["access_token"]
# refresh
creds = load_creds()
new_tok = refresh_tokens(creds, tok["refresh_token"])
save_tokens(new_tok)
return new_tok["access_token"]
def call_income(utr: str, tax_year: str) -> int:
access = get_access_or_die()
url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}"
r = httpx.get(
url,
headers={"Accept": INCOME_ACCEPT, "Authorization": f"Bearer {access}"},
timeout=30,
)
print(f"GET /individual-income/sa/{utr}/annual-summary/{tax_year} -> HTTP {r.status_code}")
try:
print(json.dumps(r.json(), indent=2))
except Exception:
print(r.text)
return 0 if r.status_code < 400 else 2
def cmd_login(args) -> int:
creds = load_creds()
state = secrets.token_urlsafe(24)
url = authorize_url(creds, state)
print(f"Headless login → {SANDBOX_BASE}{AUTH_PATH} ...")
code = headless_get_code(url, args.user_id, args.password, state)
print(f"Got code: {code[:12]}...")
tok = exchange_code(creds, code)
save_tokens(tok)
print(f"Saved tokens to {TOKEN_CACHE}. expires_in={tok.get('expires_in')}s")
return 0
def cmd_refresh(_args) -> int:
tok = load_tokens()
if not tok:
raise SystemExit("No tokens to refresh.")
creds = load_creds()
new_tok = refresh_tokens(creds, tok["refresh_token"])
save_tokens(new_tok)
print(f"Refreshed. new expires_in={new_tok.get('expires_in')}s")
return 0
def cmd_call(args) -> int:
return call_income(args.utr, args.tax_year)
def main() -> int:
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
pl = sub.add_parser("login")
pl.add_argument("--user-id", required=True)
pl.add_argument("--password", required=True)
pl.set_defaults(func=cmd_login)
pr = sub.add_parser("refresh")
pr.set_defaults(func=cmd_refresh)
pc = sub.add_parser("call")
pc.add_argument("--utr", required=True)
pc.add_argument("--tax-year", default="2015-16")
pc.set_defaults(func=cmd_call)
args = p.parse_args()
return args.func(args)
if __name__ == "__main__":
sys.exit(main())