296 lines
9.5 KiB
Python
296 lines
9.5 KiB
Python
"""Headless HMRC sandbox OAuth — drives Chromium via Playwright.
|
|
|
|
Logs in as a sandbox test user without needing a human in the loop,
|
|
captures the authorization code from the localhost callback (the
|
|
callback URL is never actually fetched — we abort the navigation and
|
|
read the URL), exchanges for tokens, saves them to a cache file, then
|
|
optionally calls an API endpoint.
|
|
|
|
Creds + test user credentials are read from Vault. The token cache
|
|
lives at ~/.cache/hmrc-sync/tokens.json and can be reused across runs
|
|
until the refresh_token expires (18 months).
|
|
|
|
Usage:
|
|
python3 headless_auth.py login --user-id 228488477217 --password VLAFXYsz4Uqk
|
|
python3 headless_auth.py call --utr 2762163393 --tax-year 2015-16
|
|
python3 headless_auth.py refresh
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import secrets
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk"
|
|
AUTH_PATH = "/oauth/authorize"
|
|
TOKEN_PATH = "/oauth/token"
|
|
INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}"
|
|
INCOME_ACCEPT = "application/vnd.hmrc.1.2+json"
|
|
|
|
REDIRECT_URI = "http://localhost:8080/oauth/callback"
|
|
SCOPE = "read:individual-income"
|
|
|
|
CACHE_DIR = Path.home() / ".cache" / "hmrc-sync"
|
|
TOKEN_CACHE = CACHE_DIR / "tokens.json"
|
|
|
|
|
|
@dataclass
|
|
class Creds:
|
|
client_id: str
|
|
client_secret: str
|
|
|
|
|
|
def load_creds() -> Creds:
|
|
env_id = os.environ.get("HMRC_CLIENT_ID")
|
|
env_secret = os.environ.get("HMRC_CLIENT_SECRET")
|
|
if env_id and env_secret:
|
|
return Creds(env_id, env_secret)
|
|
cid = subprocess.check_output(
|
|
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"],
|
|
text=True,
|
|
).strip()
|
|
csec = subprocess.check_output(
|
|
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"],
|
|
text=True,
|
|
).strip()
|
|
return Creds(cid, csec)
|
|
|
|
|
|
def save_tokens(tok: dict) -> None:
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
tok_with_meta = dict(tok)
|
|
tok_with_meta["_cached_at"] = int(time.time())
|
|
TOKEN_CACHE.write_text(json.dumps(tok_with_meta, indent=2))
|
|
TOKEN_CACHE.chmod(0o600)
|
|
|
|
|
|
def load_tokens() -> dict | None:
|
|
if not TOKEN_CACHE.exists():
|
|
return None
|
|
return json.loads(TOKEN_CACHE.read_text())
|
|
|
|
|
|
def authorize_url(creds: Creds, state: str) -> str:
|
|
return (
|
|
f"{SANDBOX_BASE}{AUTH_PATH}?"
|
|
+ urllib.parse.urlencode({
|
|
"response_type": "code",
|
|
"client_id": creds.client_id,
|
|
"scope": SCOPE,
|
|
"redirect_uri": REDIRECT_URI,
|
|
"state": state,
|
|
})
|
|
)
|
|
|
|
|
|
def headless_get_code(auth_url: str, user_id: str, password: str, state: str) -> str:
|
|
"""Drive Chromium through HMRC sandbox login and extract the auth code."""
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
ctx = browser.new_context()
|
|
|
|
captured_code: dict[str, str] = {}
|
|
|
|
# Abort any attempt to hit localhost:8080 and capture the URL that
|
|
# triggered it — that's the callback with ?code=...
|
|
def _intercept(route):
|
|
if "localhost:8080" in route.request.url:
|
|
parsed = urllib.parse.urlparse(route.request.url)
|
|
qs = urllib.parse.parse_qs(parsed.query)
|
|
captured_code["code"] = qs.get("code", [""])[0]
|
|
captured_code["state"] = qs.get("state", [""])[0]
|
|
route.abort()
|
|
else:
|
|
route.continue_()
|
|
|
|
ctx.route("**/*", _intercept)
|
|
page = ctx.new_page()
|
|
page.set_default_timeout(30000)
|
|
|
|
page.goto(auth_url)
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
# Step 1 — cookie banner ("Reject additional cookies")
|
|
with contextlib.suppress(Exception):
|
|
page.get_by_role("button", name="Reject additional cookies").click(timeout=3000)
|
|
page.wait_for_load_state("networkidle")
|
|
with contextlib.suppress(Exception):
|
|
page.get_by_role("button", name="Hide cookie message").click(timeout=2000)
|
|
|
|
# Step 2 — intro page ("Allow your software to connect with HMRC" → Continue)
|
|
with contextlib.suppress(Exception):
|
|
page.get_by_role("button", name="Continue").first.click(timeout=5000)
|
|
page.wait_for_load_state("networkidle")
|
|
|
|
# Step 3 — login form
|
|
for sel in ["input[name='userId']", "input#userId", "input[name='user_id']", "#user_id"]:
|
|
try:
|
|
page.fill(sel, user_id, timeout=2000)
|
|
break
|
|
except Exception:
|
|
continue
|
|
for sel in ["input[name='password']", "input#password"]:
|
|
try:
|
|
page.fill(sel, password, timeout=2000)
|
|
break
|
|
except Exception:
|
|
continue
|
|
for sel in ["button[type='submit']", "button:has-text('Sign in')", "input[type='submit']"]:
|
|
try:
|
|
page.click(sel, timeout=2000)
|
|
page.wait_for_load_state("networkidle")
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
# Step 4 — consent screen ("Grant authority")
|
|
deadline = time.time() + 20
|
|
while time.time() < deadline and "code" not in captured_code:
|
|
for sel in [
|
|
"button:has-text('Grant authority')",
|
|
"button:has-text('Continue')",
|
|
"button:has-text('Accept and continue')",
|
|
"#submit",
|
|
]:
|
|
try:
|
|
page.click(sel, timeout=1500)
|
|
break
|
|
except Exception:
|
|
continue
|
|
time.sleep(0.5)
|
|
|
|
browser.close()
|
|
|
|
if "code" not in captured_code or not captured_code["code"]:
|
|
raise SystemExit(f"Headless login failed to capture code. captured={captured_code}")
|
|
if captured_code.get("state") != state:
|
|
raise SystemExit(f"CSRF state mismatch: got {captured_code.get('state')!r}, want {state!r}")
|
|
return captured_code["code"]
|
|
|
|
|
|
def exchange_code(creds: Creds, code: str) -> dict:
|
|
r = httpx.post(
|
|
f"{SANDBOX_BASE}{TOKEN_PATH}",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"client_id": creds.client_id,
|
|
"client_secret": creds.client_secret,
|
|
"redirect_uri": REDIRECT_URI,
|
|
"code": code,
|
|
},
|
|
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def refresh_tokens(creds: Creds, refresh_token: str) -> dict:
|
|
r = httpx.post(
|
|
f"{SANDBOX_BASE}{TOKEN_PATH}",
|
|
data={
|
|
"grant_type": "refresh_token",
|
|
"client_id": creds.client_id,
|
|
"client_secret": creds.client_secret,
|
|
"refresh_token": refresh_token,
|
|
},
|
|
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
|
timeout=30,
|
|
)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def get_access_or_die() -> str:
|
|
tok = load_tokens()
|
|
if not tok:
|
|
raise SystemExit("No cached tokens. Run: headless_auth.py login --user-id ... --password ...")
|
|
age = int(time.time()) - tok.get("_cached_at", 0)
|
|
if age < tok.get("expires_in", 14400) - 300:
|
|
return tok["access_token"]
|
|
# refresh
|
|
creds = load_creds()
|
|
new_tok = refresh_tokens(creds, tok["refresh_token"])
|
|
save_tokens(new_tok)
|
|
return new_tok["access_token"]
|
|
|
|
|
|
def call_income(utr: str, tax_year: str) -> int:
|
|
access = get_access_or_die()
|
|
url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}"
|
|
r = httpx.get(
|
|
url,
|
|
headers={"Accept": INCOME_ACCEPT, "Authorization": f"Bearer {access}"},
|
|
timeout=30,
|
|
)
|
|
print(f"GET /individual-income/sa/{utr}/annual-summary/{tax_year} -> HTTP {r.status_code}")
|
|
try:
|
|
print(json.dumps(r.json(), indent=2))
|
|
except Exception:
|
|
print(r.text)
|
|
return 0 if r.status_code < 400 else 2
|
|
|
|
|
|
def cmd_login(args) -> int:
|
|
creds = load_creds()
|
|
state = secrets.token_urlsafe(24)
|
|
url = authorize_url(creds, state)
|
|
print(f"Headless login → {SANDBOX_BASE}{AUTH_PATH} ...")
|
|
code = headless_get_code(url, args.user_id, args.password, state)
|
|
print(f"Got code: {code[:12]}...")
|
|
tok = exchange_code(creds, code)
|
|
save_tokens(tok)
|
|
print(f"Saved tokens to {TOKEN_CACHE}. expires_in={tok.get('expires_in')}s")
|
|
return 0
|
|
|
|
|
|
def cmd_refresh(_args) -> int:
|
|
tok = load_tokens()
|
|
if not tok:
|
|
raise SystemExit("No tokens to refresh.")
|
|
creds = load_creds()
|
|
new_tok = refresh_tokens(creds, tok["refresh_token"])
|
|
save_tokens(new_tok)
|
|
print(f"Refreshed. new expires_in={new_tok.get('expires_in')}s")
|
|
return 0
|
|
|
|
|
|
def cmd_call(args) -> int:
|
|
return call_income(args.utr, args.tax_year)
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser()
|
|
sub = p.add_subparsers(dest="cmd", required=True)
|
|
|
|
pl = sub.add_parser("login")
|
|
pl.add_argument("--user-id", required=True)
|
|
pl.add_argument("--password", required=True)
|
|
pl.set_defaults(func=cmd_login)
|
|
|
|
pr = sub.add_parser("refresh")
|
|
pr.set_defaults(func=cmd_refresh)
|
|
|
|
pc = sub.add_parser("call")
|
|
pc.add_argument("--utr", required=True)
|
|
pc.add_argument("--tax-year", default="2015-16")
|
|
pc.set_defaults(func=cmd_call)
|
|
|
|
args = p.parse_args()
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|