Initial extraction from monorepo
This commit is contained in:
commit
5c7baa8acc
20 changed files with 1974 additions and 0 deletions
296
headless_auth.py
Normal file
296
headless_auth.py
Normal file
|
|
@ -0,0 +1,296 @@
|
|||
"""Headless HMRC sandbox OAuth — drives Chromium via Playwright.
|
||||
|
||||
Logs in as a sandbox test user without needing a human in the loop,
|
||||
captures the authorization code from the localhost callback (the
|
||||
callback URL is never actually fetched — we abort the navigation and
|
||||
read the URL), exchanges for tokens, saves them to a cache file, then
|
||||
optionally calls an API endpoint.
|
||||
|
||||
Creds + test user credentials are read from Vault. The token cache
|
||||
lives at ~/.cache/hmrc-sync/tokens.json and can be reused across runs
|
||||
until the refresh_token expires (18 months).
|
||||
|
||||
Usage:
|
||||
python3 headless_auth.py login --user-id 228488477217 --password VLAFXYsz4Uqk
|
||||
python3 headless_auth.py call --utr 2762163393 --tax-year 2015-16
|
||||
python3 headless_auth.py refresh
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import json
|
||||
import os
|
||||
import secrets
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
SANDBOX_BASE = "https://test-api.service.hmrc.gov.uk"
|
||||
AUTH_PATH = "/oauth/authorize"
|
||||
TOKEN_PATH = "/oauth/token"
|
||||
INCOME_PATH = "/individual-income/sa/{utr}/annual-summary/{tax_year}"
|
||||
INCOME_ACCEPT = "application/vnd.hmrc.1.2+json"
|
||||
|
||||
REDIRECT_URI = "http://localhost:8080/oauth/callback"
|
||||
SCOPE = "read:individual-income"
|
||||
|
||||
CACHE_DIR = Path.home() / ".cache" / "hmrc-sync"
|
||||
TOKEN_CACHE = CACHE_DIR / "tokens.json"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Creds:
|
||||
client_id: str
|
||||
client_secret: str
|
||||
|
||||
|
||||
def load_creds() -> Creds:
|
||||
env_id = os.environ.get("HMRC_CLIENT_ID")
|
||||
env_secret = os.environ.get("HMRC_CLIENT_SECRET")
|
||||
if env_id and env_secret:
|
||||
return Creds(env_id, env_secret)
|
||||
cid = subprocess.check_output(
|
||||
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_id", "secret/viktor"],
|
||||
text=True,
|
||||
).strip()
|
||||
csec = subprocess.check_output(
|
||||
["vault", "kv", "get", "-field=hmrc_mtd_sandbox_client_secret", "secret/viktor"],
|
||||
text=True,
|
||||
).strip()
|
||||
return Creds(cid, csec)
|
||||
|
||||
|
||||
def save_tokens(tok: dict) -> None:
|
||||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
tok_with_meta = dict(tok)
|
||||
tok_with_meta["_cached_at"] = int(time.time())
|
||||
TOKEN_CACHE.write_text(json.dumps(tok_with_meta, indent=2))
|
||||
TOKEN_CACHE.chmod(0o600)
|
||||
|
||||
|
||||
def load_tokens() -> dict | None:
|
||||
if not TOKEN_CACHE.exists():
|
||||
return None
|
||||
return json.loads(TOKEN_CACHE.read_text())
|
||||
|
||||
|
||||
def authorize_url(creds: Creds, state: str) -> str:
|
||||
return (
|
||||
f"{SANDBOX_BASE}{AUTH_PATH}?"
|
||||
+ urllib.parse.urlencode({
|
||||
"response_type": "code",
|
||||
"client_id": creds.client_id,
|
||||
"scope": SCOPE,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"state": state,
|
||||
})
|
||||
)
|
||||
|
||||
|
||||
def headless_get_code(auth_url: str, user_id: str, password: str, state: str) -> str:
|
||||
"""Drive Chromium through HMRC sandbox login and extract the auth code."""
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
ctx = browser.new_context()
|
||||
|
||||
captured_code: dict[str, str] = {}
|
||||
|
||||
# Abort any attempt to hit localhost:8080 and capture the URL that
|
||||
# triggered it — that's the callback with ?code=...
|
||||
def _intercept(route):
|
||||
if "localhost:8080" in route.request.url:
|
||||
parsed = urllib.parse.urlparse(route.request.url)
|
||||
qs = urllib.parse.parse_qs(parsed.query)
|
||||
captured_code["code"] = qs.get("code", [""])[0]
|
||||
captured_code["state"] = qs.get("state", [""])[0]
|
||||
route.abort()
|
||||
else:
|
||||
route.continue_()
|
||||
|
||||
ctx.route("**/*", _intercept)
|
||||
page = ctx.new_page()
|
||||
page.set_default_timeout(30000)
|
||||
|
||||
page.goto(auth_url)
|
||||
page.wait_for_load_state("networkidle")
|
||||
|
||||
# Step 1 — cookie banner ("Reject additional cookies")
|
||||
with contextlib.suppress(Exception):
|
||||
page.get_by_role("button", name="Reject additional cookies").click(timeout=3000)
|
||||
page.wait_for_load_state("networkidle")
|
||||
with contextlib.suppress(Exception):
|
||||
page.get_by_role("button", name="Hide cookie message").click(timeout=2000)
|
||||
|
||||
# Step 2 — intro page ("Allow your software to connect with HMRC" → Continue)
|
||||
with contextlib.suppress(Exception):
|
||||
page.get_by_role("button", name="Continue").first.click(timeout=5000)
|
||||
page.wait_for_load_state("networkidle")
|
||||
|
||||
# Step 3 — login form
|
||||
for sel in ["input[name='userId']", "input#userId", "input[name='user_id']", "#user_id"]:
|
||||
try:
|
||||
page.fill(sel, user_id, timeout=2000)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
for sel in ["input[name='password']", "input#password"]:
|
||||
try:
|
||||
page.fill(sel, password, timeout=2000)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
for sel in ["button[type='submit']", "button:has-text('Sign in')", "input[type='submit']"]:
|
||||
try:
|
||||
page.click(sel, timeout=2000)
|
||||
page.wait_for_load_state("networkidle")
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Step 4 — consent screen ("Grant authority")
|
||||
deadline = time.time() + 20
|
||||
while time.time() < deadline and "code" not in captured_code:
|
||||
for sel in [
|
||||
"button:has-text('Grant authority')",
|
||||
"button:has-text('Continue')",
|
||||
"button:has-text('Accept and continue')",
|
||||
"#submit",
|
||||
]:
|
||||
try:
|
||||
page.click(sel, timeout=1500)
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
time.sleep(0.5)
|
||||
|
||||
browser.close()
|
||||
|
||||
if "code" not in captured_code or not captured_code["code"]:
|
||||
raise SystemExit(f"Headless login failed to capture code. captured={captured_code}")
|
||||
if captured_code.get("state") != state:
|
||||
raise SystemExit(f"CSRF state mismatch: got {captured_code.get('state')!r}, want {state!r}")
|
||||
return captured_code["code"]
|
||||
|
||||
|
||||
def exchange_code(creds: Creds, code: str) -> dict:
|
||||
r = httpx.post(
|
||||
f"{SANDBOX_BASE}{TOKEN_PATH}",
|
||||
data={
|
||||
"grant_type": "authorization_code",
|
||||
"client_id": creds.client_id,
|
||||
"client_secret": creds.client_secret,
|
||||
"redirect_uri": REDIRECT_URI,
|
||||
"code": code,
|
||||
},
|
||||
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
||||
timeout=30,
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def refresh_tokens(creds: Creds, refresh_token: str) -> dict:
|
||||
r = httpx.post(
|
||||
f"{SANDBOX_BASE}{TOKEN_PATH}",
|
||||
data={
|
||||
"grant_type": "refresh_token",
|
||||
"client_id": creds.client_id,
|
||||
"client_secret": creds.client_secret,
|
||||
"refresh_token": refresh_token,
|
||||
},
|
||||
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
||||
timeout=30,
|
||||
)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def get_access_or_die() -> str:
|
||||
tok = load_tokens()
|
||||
if not tok:
|
||||
raise SystemExit("No cached tokens. Run: headless_auth.py login --user-id ... --password ...")
|
||||
age = int(time.time()) - tok.get("_cached_at", 0)
|
||||
if age < tok.get("expires_in", 14400) - 300:
|
||||
return tok["access_token"]
|
||||
# refresh
|
||||
creds = load_creds()
|
||||
new_tok = refresh_tokens(creds, tok["refresh_token"])
|
||||
save_tokens(new_tok)
|
||||
return new_tok["access_token"]
|
||||
|
||||
|
||||
def call_income(utr: str, tax_year: str) -> int:
|
||||
access = get_access_or_die()
|
||||
url = f"{SANDBOX_BASE}{INCOME_PATH.format(utr=utr, tax_year=tax_year)}"
|
||||
r = httpx.get(
|
||||
url,
|
||||
headers={"Accept": INCOME_ACCEPT, "Authorization": f"Bearer {access}"},
|
||||
timeout=30,
|
||||
)
|
||||
print(f"GET /individual-income/sa/{utr}/annual-summary/{tax_year} -> HTTP {r.status_code}")
|
||||
try:
|
||||
print(json.dumps(r.json(), indent=2))
|
||||
except Exception:
|
||||
print(r.text)
|
||||
return 0 if r.status_code < 400 else 2
|
||||
|
||||
|
||||
def cmd_login(args) -> int:
|
||||
creds = load_creds()
|
||||
state = secrets.token_urlsafe(24)
|
||||
url = authorize_url(creds, state)
|
||||
print(f"Headless login → {SANDBOX_BASE}{AUTH_PATH} ...")
|
||||
code = headless_get_code(url, args.user_id, args.password, state)
|
||||
print(f"Got code: {code[:12]}...")
|
||||
tok = exchange_code(creds, code)
|
||||
save_tokens(tok)
|
||||
print(f"Saved tokens to {TOKEN_CACHE}. expires_in={tok.get('expires_in')}s")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_refresh(_args) -> int:
|
||||
tok = load_tokens()
|
||||
if not tok:
|
||||
raise SystemExit("No tokens to refresh.")
|
||||
creds = load_creds()
|
||||
new_tok = refresh_tokens(creds, tok["refresh_token"])
|
||||
save_tokens(new_tok)
|
||||
print(f"Refreshed. new expires_in={new_tok.get('expires_in')}s")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_call(args) -> int:
|
||||
return call_income(args.utr, args.tax_year)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser()
|
||||
sub = p.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
pl = sub.add_parser("login")
|
||||
pl.add_argument("--user-id", required=True)
|
||||
pl.add_argument("--password", required=True)
|
||||
pl.set_defaults(func=cmd_login)
|
||||
|
||||
pr = sub.add_parser("refresh")
|
||||
pr.set_defaults(func=cmd_refresh)
|
||||
|
||||
pc = sub.add_parser("call")
|
||||
pc.add_argument("--utr", required=True)
|
||||
pc.add_argument("--tax-year", default="2015-16")
|
||||
pc.set_defaults(func=cmd_call)
|
||||
|
||||
args = p.parse_args()
|
||||
return args.func(args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue