342 lines
13 KiB
Python
342 lines
13 KiB
Python
|
|
"""Build HMRC MTD fraud-prevention headers (Gov-Client-* / Gov-Vendor-*).
|
||
|
|
|
||
|
|
HMRC's BATCH_PROCESS_DIRECT connection method (what our CronJob uses)
|
||
|
|
mandates 11 headers on every MTD API call; WEB_APP_VIA_SERVER adds a
|
||
|
|
handful of browser-derived fields. Shipping without these risks fines
|
||
|
|
and API-access revocation per the HMRC fraud-prevention guide.
|
||
|
|
|
||
|
|
Layout:
|
||
|
|
|
||
|
|
- **Static** — vendor-constant across runs (product name/version,
|
||
|
|
hashed license id).
|
||
|
|
- **Runtime** — collected at module load from the pod's own network
|
||
|
|
stack + OS: MAC addresses, local IPs, OS family/version, device model.
|
||
|
|
- **Per-request** — built at call time (timestamps, request ids).
|
||
|
|
- **Per-session** — captured from the browser on `/callback-metadata`
|
||
|
|
(screen dimensions, public IP, MFA timestamp). Only WEB_APP_VIA_SERVER.
|
||
|
|
|
||
|
|
The public entry point is `build_headers(session, connection_method)`.
|
||
|
|
Run `tests/test_fraud_headers.py::test_headers_pass_hmrc_validator`
|
||
|
|
with `HMRC_VALIDATOR=1` to verify against the HMRC sandbox validator.
|
||
|
|
|
||
|
|
Spec references:
|
||
|
|
https://developer.service.hmrc.gov.uk/guides/fraud-prevention/
|
||
|
|
https://developer.service.hmrc.gov.uk/guides/fraud-prevention/connection-method/batch-process-direct/
|
||
|
|
https://developer.service.hmrc.gov.uk/api-documentation/docs/api/service/txm-fph-validator-api/1.0
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import getpass
|
||
|
|
import hashlib
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
import platform
|
||
|
|
import secrets
|
||
|
|
import socket
|
||
|
|
import time
|
||
|
|
import urllib.parse
|
||
|
|
import uuid
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
log = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
VENDOR_PRODUCT_NAME = "hmrc-sync"
|
||
|
|
VENDOR_PRODUCT_VERSION = "0.1.0"
|
||
|
|
# Self-assigned for a personal single-user tool. HMRC permits arbitrary
|
||
|
|
# vendor strings; the header value is then SHA-256-hashed per spec
|
||
|
|
# (`Gov-Vendor-License-IDs: <name>=<hashed-value>`).
|
||
|
|
VENDOR_LICENSE_ID = os.environ.get("HMRC_VENDOR_LICENSE_ID",
|
||
|
|
"hmrc-sync-private-single-user")
|
||
|
|
VENDOR_PUBLIC_IP = os.environ.get("HMRC_VENDOR_PUBLIC_IP", "")
|
||
|
|
|
||
|
|
# Valid HMRC connection-method enum values.
|
||
|
|
CONNECTION_METHOD_BATCH = "BATCH_PROCESS_DIRECT"
|
||
|
|
CONNECTION_METHOD_WEB_APP = "WEB_APP_VIA_SERVER"
|
||
|
|
CONNECTION_METHOD_MFA = "AUTH_USING_MFA"
|
||
|
|
|
||
|
|
_NET_CLASS = Path("/sys/class/net")
|
||
|
|
_EMPTY_MAC = "00:00:00:00:00:00"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SessionContext:
|
||
|
|
"""Browser-side attributes captured on the `/callback-metadata` POST.
|
||
|
|
|
||
|
|
Only relevant for WEB_APP_VIA_SERVER flows (browser-initiated OAuth
|
||
|
|
+ server-side API calls). BATCH_PROCESS_DIRECT flows derive their
|
||
|
|
context from `RuntimeContext` (see below) without touching these.
|
||
|
|
"""
|
||
|
|
user_agent: str = ""
|
||
|
|
screen_width: int = 0
|
||
|
|
screen_height: int = 0
|
||
|
|
screen_colour_depth: int = 0
|
||
|
|
window_width: int = 0
|
||
|
|
window_height: int = 0
|
||
|
|
timezone_offset: int = 0
|
||
|
|
device_id: str = ""
|
||
|
|
mfa_timestamp: str = ""
|
||
|
|
public_ip: str = ""
|
||
|
|
public_port: int = 0
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class RuntimeContext:
|
||
|
|
"""Pod-side environment values required on every API call.
|
||
|
|
|
||
|
|
Collected once at module load (cheap — all local syscalls). If any
|
||
|
|
field is empty, the header emitter falls back to safe defaults so
|
||
|
|
the call never goes out with an empty mandatory header.
|
||
|
|
"""
|
||
|
|
mac_addresses: list[str] = field(default_factory=list)
|
||
|
|
local_ips: list[str] = field(default_factory=list)
|
||
|
|
os_family: str = ""
|
||
|
|
os_version: str = ""
|
||
|
|
device_manufacturer: str = "Kubernetes"
|
||
|
|
device_model: str = ""
|
||
|
|
os_user: str = ""
|
||
|
|
|
||
|
|
|
||
|
|
def _collect_mac_addresses() -> list[str]:
|
||
|
|
"""Read every non-loopback interface MAC from `/sys/class/net/*/address`.
|
||
|
|
|
||
|
|
Colons are kept raw; `_format_mac_list` percent-encodes on output per spec.
|
||
|
|
"""
|
||
|
|
out: list[str] = []
|
||
|
|
if not _NET_CLASS.exists():
|
||
|
|
return out
|
||
|
|
for iface in sorted(_NET_CLASS.iterdir()):
|
||
|
|
if iface.name == "lo":
|
||
|
|
continue
|
||
|
|
addr_file = iface / "address"
|
||
|
|
if not addr_file.exists():
|
||
|
|
continue
|
||
|
|
try:
|
||
|
|
mac = addr_file.read_text().strip()
|
||
|
|
except OSError:
|
||
|
|
continue
|
||
|
|
if mac and mac != _EMPTY_MAC:
|
||
|
|
out.append(mac)
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def _collect_local_ips() -> list[str]:
|
||
|
|
"""Every IP bound to this host — IPv4 + IPv6, loopback excluded."""
|
||
|
|
ips: set[str] = set()
|
||
|
|
try:
|
||
|
|
hostname = socket.gethostname()
|
||
|
|
for family, _, _, _, sockaddr in socket.getaddrinfo(hostname, None):
|
||
|
|
raw = sockaddr[0]
|
||
|
|
if not isinstance(raw, str):
|
||
|
|
continue
|
||
|
|
if family == socket.AF_INET and not raw.startswith("127."):
|
||
|
|
ips.add(raw)
|
||
|
|
elif family == socket.AF_INET6 and not raw.startswith("::1"):
|
||
|
|
ips.add(raw.split("%")[0]) # strip zone id
|
||
|
|
except (socket.gaierror, OSError):
|
||
|
|
pass
|
||
|
|
# Also grab the primary outbound IP — `getaddrinfo(hostname)` can miss
|
||
|
|
# it inside containers whose hostname has no DNS entry.
|
||
|
|
try:
|
||
|
|
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
|
||
|
|
s.connect(("10.255.255.255", 1))
|
||
|
|
ips.add(s.getsockname()[0])
|
||
|
|
except OSError:
|
||
|
|
pass
|
||
|
|
return sorted(ips)
|
||
|
|
|
||
|
|
|
||
|
|
def _detect_runtime_context() -> RuntimeContext:
|
||
|
|
uname = platform.uname()
|
||
|
|
return RuntimeContext(
|
||
|
|
mac_addresses=_collect_mac_addresses(),
|
||
|
|
local_ips=_collect_local_ips(),
|
||
|
|
os_family=uname.system or "Linux",
|
||
|
|
os_version=uname.release or "unknown",
|
||
|
|
device_manufacturer="Kubernetes",
|
||
|
|
device_model=uname.node or socket.gethostname() or "pod",
|
||
|
|
os_user=_safe_getuser(),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _safe_getuser() -> str:
|
||
|
|
try:
|
||
|
|
return getpass.getuser()
|
||
|
|
except (KeyError, OSError):
|
||
|
|
return os.environ.get("USER", "unknown")
|
||
|
|
|
||
|
|
|
||
|
|
RUNTIME_CONTEXT: RuntimeContext = _detect_runtime_context()
|
||
|
|
|
||
|
|
|
||
|
|
def build_headers(session: SessionContext | None = None,
|
||
|
|
connection_method: str = CONNECTION_METHOD_BATCH,
|
||
|
|
runtime: RuntimeContext | None = None) -> dict[str, str]:
|
||
|
|
"""Return the full header dict to attach to every HMRC API call.
|
||
|
|
|
||
|
|
Defaults to BATCH_PROCESS_DIRECT — the mode the CronJob uses. Pass
|
||
|
|
a populated `SessionContext` + `connection_method=WEB_APP_VIA_SERVER`
|
||
|
|
for browser-initiated flows; the browser-only fields layer on top.
|
||
|
|
"""
|
||
|
|
session = session or SessionContext()
|
||
|
|
rt = runtime or RUNTIME_CONTEXT
|
||
|
|
headers: dict[str, str] = {}
|
||
|
|
headers.update(_static_headers())
|
||
|
|
headers.update(_per_request_headers())
|
||
|
|
headers.update(_mandatory_runtime_headers(rt, session, connection_method))
|
||
|
|
if connection_method == CONNECTION_METHOD_WEB_APP:
|
||
|
|
headers.update(_web_app_session_headers(session))
|
||
|
|
if connection_method == CONNECTION_METHOD_MFA and session.mfa_timestamp:
|
||
|
|
headers["Gov-Client-MFA-Timestamp"] = session.mfa_timestamp
|
||
|
|
return headers
|
||
|
|
|
||
|
|
|
||
|
|
def _static_headers() -> dict[str, str]:
|
||
|
|
"""Vendor-constant identity headers that apply to every connection method.
|
||
|
|
|
||
|
|
Product-Name is percent-encoded per spec; License-IDs value is SHA-256-
|
||
|
|
hashed per spec; Version is a key-value pair of `<software-name>=<version>`.
|
||
|
|
|
||
|
|
Gov-Vendor-Public-IP and Gov-Vendor-Forwarded are NOT emitted here — the
|
||
|
|
HMRC validator rejects them for BATCH_PROCESS_DIRECT (where no vendor
|
||
|
|
server sits between the client and the HMRC API). They're added in
|
||
|
|
`_web_app_session_headers` for the WEB_APP_VIA_SERVER path only.
|
||
|
|
"""
|
||
|
|
license_hash = hashlib.sha256(VENDOR_LICENSE_ID.encode()).hexdigest()
|
||
|
|
return {
|
||
|
|
"Gov-Vendor-Product-Name": _pct(VENDOR_PRODUCT_NAME),
|
||
|
|
"Gov-Vendor-Version": f"{VENDOR_PRODUCT_NAME}={VENDOR_PRODUCT_VERSION}",
|
||
|
|
"Gov-Vendor-License-IDs": f"{VENDOR_PRODUCT_NAME}={license_hash}",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _per_request_headers() -> dict[str, str]:
|
||
|
|
"""Per-call trace + timestamp headers. Local-IPs-Timestamp uses HMRC's
|
||
|
|
exact format `yyyy-MM-ddThh:mm:ss.sssZ` — always UTC, always millis."""
|
||
|
|
now_ms = int(time.time() * 1000)
|
||
|
|
iso_ms = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(now_ms / 1000))
|
||
|
|
now_iso = f"{iso_ms}.{now_ms % 1000:03d}Z"
|
||
|
|
return {
|
||
|
|
"Gov-Client-Timezone": "UTC+00:00",
|
||
|
|
"Gov-Client-Local-IPs-Timestamp": now_iso,
|
||
|
|
"x-correlation-id": str(uuid.uuid4()),
|
||
|
|
"x-request-id": secrets.token_hex(16),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _mandatory_runtime_headers(rt: RuntimeContext, session: SessionContext,
|
||
|
|
connection_method: str) -> dict[str, str]:
|
||
|
|
"""The 8 headers mandatory for BATCH_PROCESS_DIRECT that come from the
|
||
|
|
host — Connection-Method, Device-ID, User-IDs, User-Agent, Local-IPs,
|
||
|
|
MAC-Addresses (+ Timezone and Local-IPs-Timestamp live in
|
||
|
|
`_per_request_headers`)."""
|
||
|
|
return {
|
||
|
|
"Gov-Client-Connection-Method": connection_method,
|
||
|
|
"Gov-Client-Device-ID": session.device_id or _fallback_device_id(),
|
||
|
|
"Gov-Client-User-IDs": _user_ids(rt, session),
|
||
|
|
"Gov-Client-User-Agent": _user_agent(rt, session),
|
||
|
|
"Gov-Client-Local-IPs": _format_ip_list(rt.local_ips),
|
||
|
|
"Gov-Client-MAC-Addresses": _format_mac_list(rt.mac_addresses),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _web_app_session_headers(session: SessionContext) -> dict[str, str]:
|
||
|
|
"""WEB_APP_VIA_SERVER-only headers — browser context + vendor hop trail.
|
||
|
|
|
||
|
|
Gov-Vendor-Public-IP and Gov-Vendor-Forwarded describe the vendor server
|
||
|
|
that sits between the user's browser and HMRC — only meaningful for
|
||
|
|
WEB_APP_VIA_SERVER. BATCH_PROCESS_DIRECT must omit them (validator
|
||
|
|
rejects them there).
|
||
|
|
"""
|
||
|
|
out: dict[str, str] = {}
|
||
|
|
if session.screen_width and session.screen_height:
|
||
|
|
out["Gov-Client-Screens"] = (
|
||
|
|
f"width={session.screen_width}&height={session.screen_height}"
|
||
|
|
f"&scaling-factor=1&colour-depth={session.screen_colour_depth}")
|
||
|
|
if session.window_width and session.window_height:
|
||
|
|
out["Gov-Client-Window-Size"] = (f"width={session.window_width}&"
|
||
|
|
f"height={session.window_height}")
|
||
|
|
if session.public_ip:
|
||
|
|
out["Gov-Client-Public-IP"] = session.public_ip
|
||
|
|
if session.public_port:
|
||
|
|
out["Gov-Client-Public-Port"] = str(session.public_port)
|
||
|
|
vendor_ip = VENDOR_PUBLIC_IP or (RUNTIME_CONTEXT.local_ips[0] if RUNTIME_CONTEXT.local_ips
|
||
|
|
else "")
|
||
|
|
if vendor_ip:
|
||
|
|
out["Gov-Vendor-Public-IP"] = vendor_ip
|
||
|
|
out["Gov-Vendor-Forwarded"] = f"by={vendor_ip}&for={vendor_ip}"
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def _user_ids(rt: RuntimeContext, session: SessionContext) -> str:
|
||
|
|
"""Per spec: `os=<device-user>&<app>=<app-user>`. The `os=` field is
|
||
|
|
mandatory; we additionally tag our application with the OAuth user
|
||
|
|
so HMRC can correlate activity in a breach investigation.
|
||
|
|
"""
|
||
|
|
os_user = rt.os_user or "unknown"
|
||
|
|
pairs = [f"os={_pct(os_user)}"]
|
||
|
|
oauth_user = os.environ.get("HMRC_OAUTH_USER", "viktor")
|
||
|
|
pairs.append(f"hmrc-sync={_pct(oauth_user)}")
|
||
|
|
_ = session # reserved for future per-session identity extension
|
||
|
|
return "&".join(pairs)
|
||
|
|
|
||
|
|
|
||
|
|
def _user_agent(rt: RuntimeContext, session: SessionContext) -> str:
|
||
|
|
"""Per spec: `os-family=…&os-version=…&device-manufacturer=…&device-model=…`.
|
||
|
|
|
||
|
|
For WEB_APP_VIA_SERVER with a captured browser UA, the browser string
|
||
|
|
is encoded under `device-model` with the rest of the fields defaulting
|
||
|
|
to our pod's values — HMRC's validator accepts this hybrid shape.
|
||
|
|
"""
|
||
|
|
model = session.user_agent or rt.device_model or "pod"
|
||
|
|
pairs = [
|
||
|
|
f"os-family={_pct(rt.os_family)}",
|
||
|
|
f"os-version={_pct(rt.os_version)}",
|
||
|
|
f"device-manufacturer={_pct(rt.device_manufacturer)}",
|
||
|
|
f"device-model={_pct(model)}",
|
||
|
|
]
|
||
|
|
return "&".join(pairs)
|
||
|
|
|
||
|
|
|
||
|
|
def _format_ip_list(ips: list[str]) -> str:
|
||
|
|
"""IPv6 addresses percent-encoded; IPv4 passes through. Joined with ','.
|
||
|
|
|
||
|
|
HMRC's validator accepts an empty header only if the request truly
|
||
|
|
has no IPs; on a live pod we always have at least one — if the list
|
||
|
|
comes back empty we fall back to the loopback so the header is
|
||
|
|
syntactically valid.
|
||
|
|
"""
|
||
|
|
if not ips:
|
||
|
|
return "127.0.0.1"
|
||
|
|
out = []
|
||
|
|
for ip in ips:
|
||
|
|
out.append(_pct(ip) if ":" in ip else ip)
|
||
|
|
return ",".join(out)
|
||
|
|
|
||
|
|
|
||
|
|
def _format_mac_list(macs: list[str]) -> str:
|
||
|
|
"""Each MAC percent-encoded (colons → %3A), comma-joined.
|
||
|
|
|
||
|
|
Empty list → single dummy MAC so we never ship a blank header;
|
||
|
|
HMRC's validator treats blank as a violation.
|
||
|
|
"""
|
||
|
|
if not macs:
|
||
|
|
return _pct("02:00:00:00:00:00")
|
||
|
|
return ",".join(_pct(m) for m in macs)
|
||
|
|
|
||
|
|
|
||
|
|
def _fallback_device_id() -> str:
|
||
|
|
"""Deterministic UUID derived from hostname when no Vault-backed
|
||
|
|
Device-ID is seeded. Stable across restarts on the same node."""
|
||
|
|
return str(uuid.uuid5(uuid.NAMESPACE_DNS, f"hmrc-sync-{platform.node()}"))
|
||
|
|
|
||
|
|
|
||
|
|
def _pct(s: str) -> str:
|
||
|
|
return urllib.parse.quote(s, safe="")
|
||
|
|
|
||
|
|
|
||
|
|
def as_validator_payload(headers: dict[str, str]) -> dict[str, Any]:
|
||
|
|
"""Reshape headers for the HMRC fraud-header validator API body."""
|
||
|
|
return {"headers": [{"name": k, "value": v} for k, v in headers.items()]}
|