hmrc-sync/hmrc_sync/oauth.py
2026-05-07 17:06:11 +00:00

125 lines
4.2 KiB
Python

"""HMRC OAuth token persistence — Vault-backed refresh-token store.
The refresh_token is long-lived (HMRC grants 18 months). We keep it in
Vault at `secret/viktor/hmrc_refresh_token` and let ESO sync it to a K8s
Secret the pod mounts as an env var. On every refresh, we write the new
token back to Vault so the next pod restart picks it up.
Writing back requires Vault write access — the pod uses a short-lived
K8s-auth Vault token with a narrow policy that only allows writing
`secret/viktor/hmrc_refresh_token`.
"""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
import httpx
log = logging.getLogger(__name__)
VAULT_KEY = "secret/viktor/hmrc_refresh_token"
PROD_BASE = "https://api.service.hmrc.gov.uk"
TOKEN_PATH = "/oauth/token"
@dataclass(frozen=True)
class OAuthCreds:
client_id: str
client_secret: str
redirect_uri: str
@dataclass
class TokenBundle:
access_token: str
refresh_token: str
expires_in: int
scope: str
@classmethod
def from_json(cls, data: dict[str, object]) -> TokenBundle:
return cls(
access_token=str(data["access_token"]),
refresh_token=str(data["refresh_token"]),
expires_in=int(data["expires_in"]), # type: ignore[arg-type]
scope=str(data.get("scope", "")),
)
def load_creds_from_env() -> OAuthCreds:
return OAuthCreds(
client_id=os.environ["HMRC_PROD_CLIENT_ID"],
client_secret=os.environ["HMRC_PROD_CLIENT_SECRET"],
redirect_uri=os.environ["HMRC_PROD_REDIRECT_URI"],
)
async def exchange_code(creds: OAuthCreds, code: str) -> TokenBundle:
"""Swap a fresh authorization_code for an access+refresh token pair."""
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{PROD_BASE}{TOKEN_PATH}",
data={
"grant_type": "authorization_code",
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"redirect_uri": creds.redirect_uri,
"code": code,
},
headers={"Accept": "application/vnd.hmrc.1.0+json"},
)
resp.raise_for_status()
return TokenBundle.from_json(resp.json())
async def refresh(creds: OAuthCreds, refresh_token: str) -> TokenBundle:
"""Exchange an old refresh_token for a fresh access+refresh pair.
HMRC rotates the refresh_token on every refresh — the old one becomes
invalid immediately after this call returns. Persist the new one to
Vault atomically; a failure between the refresh and the Vault write
leaves us stranded.
"""
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{PROD_BASE}{TOKEN_PATH}",
data={
"grant_type": "refresh_token",
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"refresh_token": refresh_token,
},
headers={"Accept": "application/vnd.hmrc.1.0+json"},
)
resp.raise_for_status()
return TokenBundle.from_json(resp.json())
def persist_to_vault(token: TokenBundle) -> None:
"""Write the new refresh_token back to Vault.
Uses the hvac client with K8s-auth — the pod's service-account token
at /var/run/secrets/kubernetes.io/serviceaccount/token logs into
Vault's kubernetes auth method and receives a short-lived Vault token
with write access to `secret/viktor/hmrc_refresh_token` only.
"""
import hvac
addr = os.environ.get("VAULT_ADDR", "https://vault.viktorbarzin.me")
role = os.environ.get("VAULT_K8S_ROLE", "hmrc-sync")
jwt_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(jwt_path, encoding="utf-8") as fh:
jwt = fh.read()
client = hvac.Client(url=addr)
client.auth.kubernetes.login(role=role, jwt=jwt)
client.secrets.kv.v2.create_or_update_secret(
path="viktor/hmrc_refresh_token",
secret={
"refresh_token": token.refresh_token,
"expires_in": token.expires_in,
"scope": token.scope,
},
)
log.info("Rotated HMRC refresh_token persisted to Vault")