"""HMRC OAuth token persistence — Vault-backed refresh-token store. The refresh_token is long-lived (HMRC grants 18 months). We keep it in Vault at `secret/viktor/hmrc_refresh_token` and let ESO sync it to a K8s Secret the pod mounts as an env var. On every refresh, we write the new token back to Vault so the next pod restart picks it up. Writing back requires Vault write access — the pod uses a short-lived K8s-auth Vault token with a narrow policy that only allows writing `secret/viktor/hmrc_refresh_token`. """ from __future__ import annotations import logging import os from dataclasses import dataclass import httpx log = logging.getLogger(__name__) VAULT_KEY = "secret/viktor/hmrc_refresh_token" PROD_BASE = "https://api.service.hmrc.gov.uk" TOKEN_PATH = "/oauth/token" @dataclass(frozen=True) class OAuthCreds: client_id: str client_secret: str redirect_uri: str @dataclass class TokenBundle: access_token: str refresh_token: str expires_in: int scope: str @classmethod def from_json(cls, data: dict[str, object]) -> TokenBundle: return cls( access_token=str(data["access_token"]), refresh_token=str(data["refresh_token"]), expires_in=int(data["expires_in"]), # type: ignore[arg-type] scope=str(data.get("scope", "")), ) def load_creds_from_env() -> OAuthCreds: return OAuthCreds( client_id=os.environ["HMRC_PROD_CLIENT_ID"], client_secret=os.environ["HMRC_PROD_CLIENT_SECRET"], redirect_uri=os.environ["HMRC_PROD_REDIRECT_URI"], ) async def exchange_code(creds: OAuthCreds, code: str) -> TokenBundle: """Swap a fresh authorization_code for an access+refresh token pair.""" async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{PROD_BASE}{TOKEN_PATH}", data={ "grant_type": "authorization_code", "client_id": creds.client_id, "client_secret": creds.client_secret, "redirect_uri": creds.redirect_uri, "code": code, }, headers={"Accept": "application/vnd.hmrc.1.0+json"}, ) resp.raise_for_status() return TokenBundle.from_json(resp.json()) async def refresh(creds: OAuthCreds, refresh_token: str) -> TokenBundle: """Exchange an old refresh_token for a fresh access+refresh pair. HMRC rotates the refresh_token on every refresh — the old one becomes invalid immediately after this call returns. Persist the new one to Vault atomically; a failure between the refresh and the Vault write leaves us stranded. """ async with httpx.AsyncClient(timeout=30.0) as client: resp = await client.post( f"{PROD_BASE}{TOKEN_PATH}", data={ "grant_type": "refresh_token", "client_id": creds.client_id, "client_secret": creds.client_secret, "refresh_token": refresh_token, }, headers={"Accept": "application/vnd.hmrc.1.0+json"}, ) resp.raise_for_status() return TokenBundle.from_json(resp.json()) def persist_to_vault(token: TokenBundle) -> None: """Write the new refresh_token back to Vault. Uses the hvac client with K8s-auth — the pod's service-account token at /var/run/secrets/kubernetes.io/serviceaccount/token logs into Vault's kubernetes auth method and receives a short-lived Vault token with write access to `secret/viktor/hmrc_refresh_token` only. """ import hvac addr = os.environ.get("VAULT_ADDR", "https://vault.viktorbarzin.me") role = os.environ.get("VAULT_K8S_ROLE", "hmrc-sync") jwt_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" with open(jwt_path, encoding="utf-8") as fh: jwt = fh.read() client = hvac.Client(url=addr) client.auth.kubernetes.login(role=role, jwt=jwt) client.secrets.kv.v2.create_or_update_secret( path="viktor/hmrc_refresh_token", secret={ "refresh_token": token.refresh_token, "expires_in": token.expires_in, "scope": token.scope, }, ) log.info("Rotated HMRC refresh_token persisted to Vault")