"""Build HMRC MTD fraud-prevention headers (Gov-Client-* / Gov-Vendor-*). HMRC's BATCH_PROCESS_DIRECT connection method (what our CronJob uses) mandates 11 headers on every MTD API call; WEB_APP_VIA_SERVER adds a handful of browser-derived fields. Shipping without these risks fines and API-access revocation per the HMRC fraud-prevention guide. Layout: - **Static** — vendor-constant across runs (product name/version, hashed license id). - **Runtime** — collected at module load from the pod's own network stack + OS: MAC addresses, local IPs, OS family/version, device model. - **Per-request** — built at call time (timestamps, request ids). - **Per-session** — captured from the browser on `/callback-metadata` (screen dimensions, public IP, MFA timestamp). Only WEB_APP_VIA_SERVER. The public entry point is `build_headers(session, connection_method)`. Run `tests/test_fraud_headers.py::test_headers_pass_hmrc_validator` with `HMRC_VALIDATOR=1` to verify against the HMRC sandbox validator. Spec references: https://developer.service.hmrc.gov.uk/guides/fraud-prevention/ https://developer.service.hmrc.gov.uk/guides/fraud-prevention/connection-method/batch-process-direct/ https://developer.service.hmrc.gov.uk/api-documentation/docs/api/service/txm-fph-validator-api/1.0 """ from __future__ import annotations import getpass import hashlib import logging import os import platform import secrets import socket import time import urllib.parse import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Any log = logging.getLogger(__name__) VENDOR_PRODUCT_NAME = "hmrc-sync" VENDOR_PRODUCT_VERSION = "0.1.0" # Self-assigned for a personal single-user tool. HMRC permits arbitrary # vendor strings; the header value is then SHA-256-hashed per spec # (`Gov-Vendor-License-IDs: =`). VENDOR_LICENSE_ID = os.environ.get("HMRC_VENDOR_LICENSE_ID", "hmrc-sync-private-single-user") VENDOR_PUBLIC_IP = os.environ.get("HMRC_VENDOR_PUBLIC_IP", "") # Valid HMRC connection-method enum values. CONNECTION_METHOD_BATCH = "BATCH_PROCESS_DIRECT" CONNECTION_METHOD_WEB_APP = "WEB_APP_VIA_SERVER" CONNECTION_METHOD_MFA = "AUTH_USING_MFA" _NET_CLASS = Path("/sys/class/net") _EMPTY_MAC = "00:00:00:00:00:00" @dataclass class SessionContext: """Browser-side attributes captured on the `/callback-metadata` POST. Only relevant for WEB_APP_VIA_SERVER flows (browser-initiated OAuth + server-side API calls). BATCH_PROCESS_DIRECT flows derive their context from `RuntimeContext` (see below) without touching these. """ user_agent: str = "" screen_width: int = 0 screen_height: int = 0 screen_colour_depth: int = 0 window_width: int = 0 window_height: int = 0 timezone_offset: int = 0 device_id: str = "" mfa_timestamp: str = "" public_ip: str = "" public_port: int = 0 @dataclass class RuntimeContext: """Pod-side environment values required on every API call. Collected once at module load (cheap — all local syscalls). If any field is empty, the header emitter falls back to safe defaults so the call never goes out with an empty mandatory header. """ mac_addresses: list[str] = field(default_factory=list) local_ips: list[str] = field(default_factory=list) os_family: str = "" os_version: str = "" device_manufacturer: str = "Kubernetes" device_model: str = "" os_user: str = "" def _collect_mac_addresses() -> list[str]: """Read every non-loopback interface MAC from `/sys/class/net/*/address`. Colons are kept raw; `_format_mac_list` percent-encodes on output per spec. """ out: list[str] = [] if not _NET_CLASS.exists(): return out for iface in sorted(_NET_CLASS.iterdir()): if iface.name == "lo": continue addr_file = iface / "address" if not addr_file.exists(): continue try: mac = addr_file.read_text().strip() except OSError: continue if mac and mac != _EMPTY_MAC: out.append(mac) return out def _collect_local_ips() -> list[str]: """Every IP bound to this host — IPv4 + IPv6, loopback excluded.""" ips: set[str] = set() try: hostname = socket.gethostname() for family, _, _, _, sockaddr in socket.getaddrinfo(hostname, None): raw = sockaddr[0] if not isinstance(raw, str): continue if family == socket.AF_INET and not raw.startswith("127."): ips.add(raw) elif family == socket.AF_INET6 and not raw.startswith("::1"): ips.add(raw.split("%")[0]) # strip zone id except (socket.gaierror, OSError): pass # Also grab the primary outbound IP — `getaddrinfo(hostname)` can miss # it inside containers whose hostname has no DNS entry. try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("10.255.255.255", 1)) ips.add(s.getsockname()[0]) except OSError: pass return sorted(ips) def _detect_runtime_context() -> RuntimeContext: uname = platform.uname() return RuntimeContext( mac_addresses=_collect_mac_addresses(), local_ips=_collect_local_ips(), os_family=uname.system or "Linux", os_version=uname.release or "unknown", device_manufacturer="Kubernetes", device_model=uname.node or socket.gethostname() or "pod", os_user=_safe_getuser(), ) def _safe_getuser() -> str: try: return getpass.getuser() except (KeyError, OSError): return os.environ.get("USER", "unknown") RUNTIME_CONTEXT: RuntimeContext = _detect_runtime_context() def build_headers(session: SessionContext | None = None, connection_method: str = CONNECTION_METHOD_BATCH, runtime: RuntimeContext | None = None) -> dict[str, str]: """Return the full header dict to attach to every HMRC API call. Defaults to BATCH_PROCESS_DIRECT — the mode the CronJob uses. Pass a populated `SessionContext` + `connection_method=WEB_APP_VIA_SERVER` for browser-initiated flows; the browser-only fields layer on top. """ session = session or SessionContext() rt = runtime or RUNTIME_CONTEXT headers: dict[str, str] = {} headers.update(_static_headers()) headers.update(_per_request_headers()) headers.update(_mandatory_runtime_headers(rt, session, connection_method)) if connection_method == CONNECTION_METHOD_WEB_APP: headers.update(_web_app_session_headers(session)) if connection_method == CONNECTION_METHOD_MFA and session.mfa_timestamp: headers["Gov-Client-MFA-Timestamp"] = session.mfa_timestamp return headers def _static_headers() -> dict[str, str]: """Vendor-constant identity headers that apply to every connection method. Product-Name is percent-encoded per spec; License-IDs value is SHA-256- hashed per spec; Version is a key-value pair of `=`. Gov-Vendor-Public-IP and Gov-Vendor-Forwarded are NOT emitted here — the HMRC validator rejects them for BATCH_PROCESS_DIRECT (where no vendor server sits between the client and the HMRC API). They're added in `_web_app_session_headers` for the WEB_APP_VIA_SERVER path only. """ license_hash = hashlib.sha256(VENDOR_LICENSE_ID.encode()).hexdigest() return { "Gov-Vendor-Product-Name": _pct(VENDOR_PRODUCT_NAME), "Gov-Vendor-Version": f"{VENDOR_PRODUCT_NAME}={VENDOR_PRODUCT_VERSION}", "Gov-Vendor-License-IDs": f"{VENDOR_PRODUCT_NAME}={license_hash}", } def _per_request_headers() -> dict[str, str]: """Per-call trace + timestamp headers. Local-IPs-Timestamp uses HMRC's exact format `yyyy-MM-ddThh:mm:ss.sssZ` — always UTC, always millis.""" now_ms = int(time.time() * 1000) iso_ms = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(now_ms / 1000)) now_iso = f"{iso_ms}.{now_ms % 1000:03d}Z" return { "Gov-Client-Timezone": "UTC+00:00", "Gov-Client-Local-IPs-Timestamp": now_iso, "x-correlation-id": str(uuid.uuid4()), "x-request-id": secrets.token_hex(16), } def _mandatory_runtime_headers(rt: RuntimeContext, session: SessionContext, connection_method: str) -> dict[str, str]: """The 8 headers mandatory for BATCH_PROCESS_DIRECT that come from the host — Connection-Method, Device-ID, User-IDs, User-Agent, Local-IPs, MAC-Addresses (+ Timezone and Local-IPs-Timestamp live in `_per_request_headers`).""" return { "Gov-Client-Connection-Method": connection_method, "Gov-Client-Device-ID": session.device_id or _fallback_device_id(), "Gov-Client-User-IDs": _user_ids(rt, session), "Gov-Client-User-Agent": _user_agent(rt, session), "Gov-Client-Local-IPs": _format_ip_list(rt.local_ips), "Gov-Client-MAC-Addresses": _format_mac_list(rt.mac_addresses), } def _web_app_session_headers(session: SessionContext) -> dict[str, str]: """WEB_APP_VIA_SERVER-only headers — browser context + vendor hop trail. Gov-Vendor-Public-IP and Gov-Vendor-Forwarded describe the vendor server that sits between the user's browser and HMRC — only meaningful for WEB_APP_VIA_SERVER. BATCH_PROCESS_DIRECT must omit them (validator rejects them there). """ out: dict[str, str] = {} if session.screen_width and session.screen_height: out["Gov-Client-Screens"] = ( f"width={session.screen_width}&height={session.screen_height}" f"&scaling-factor=1&colour-depth={session.screen_colour_depth}") if session.window_width and session.window_height: out["Gov-Client-Window-Size"] = (f"width={session.window_width}&" f"height={session.window_height}") if session.public_ip: out["Gov-Client-Public-IP"] = session.public_ip if session.public_port: out["Gov-Client-Public-Port"] = str(session.public_port) vendor_ip = VENDOR_PUBLIC_IP or (RUNTIME_CONTEXT.local_ips[0] if RUNTIME_CONTEXT.local_ips else "") if vendor_ip: out["Gov-Vendor-Public-IP"] = vendor_ip out["Gov-Vendor-Forwarded"] = f"by={vendor_ip}&for={vendor_ip}" return out def _user_ids(rt: RuntimeContext, session: SessionContext) -> str: """Per spec: `os=&=`. The `os=` field is mandatory; we additionally tag our application with the OAuth user so HMRC can correlate activity in a breach investigation. """ os_user = rt.os_user or "unknown" pairs = [f"os={_pct(os_user)}"] oauth_user = os.environ.get("HMRC_OAUTH_USER", "viktor") pairs.append(f"hmrc-sync={_pct(oauth_user)}") _ = session # reserved for future per-session identity extension return "&".join(pairs) def _user_agent(rt: RuntimeContext, session: SessionContext) -> str: """Per spec: `os-family=…&os-version=…&device-manufacturer=…&device-model=…`. For WEB_APP_VIA_SERVER with a captured browser UA, the browser string is encoded under `device-model` with the rest of the fields defaulting to our pod's values — HMRC's validator accepts this hybrid shape. """ model = session.user_agent or rt.device_model or "pod" pairs = [ f"os-family={_pct(rt.os_family)}", f"os-version={_pct(rt.os_version)}", f"device-manufacturer={_pct(rt.device_manufacturer)}", f"device-model={_pct(model)}", ] return "&".join(pairs) def _format_ip_list(ips: list[str]) -> str: """IPv6 addresses percent-encoded; IPv4 passes through. Joined with ','. HMRC's validator accepts an empty header only if the request truly has no IPs; on a live pod we always have at least one — if the list comes back empty we fall back to the loopback so the header is syntactically valid. """ if not ips: return "127.0.0.1" out = [] for ip in ips: out.append(_pct(ip) if ":" in ip else ip) return ",".join(out) def _format_mac_list(macs: list[str]) -> str: """Each MAC percent-encoded (colons → %3A), comma-joined. Empty list → single dummy MAC so we never ship a blank header; HMRC's validator treats blank as a violation. """ if not macs: return _pct("02:00:00:00:00:00") return ",".join(_pct(m) for m in macs) def _fallback_device_id() -> str: """Deterministic UUID derived from hostname when no Vault-backed Device-ID is seeded. Stable across restarts on the same node.""" return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"hmrc-sync-{platform.node()}")) def _pct(s: str) -> str: return urllib.parse.quote(s, safe="") def as_validator_payload(headers: dict[str, str]) -> dict[str, Any]: """Reshape headers for the HMRC fraud-header validator API body.""" return {"headers": [{"name": k, "value": v} for k, v in headers.items()]}