125 lines
4.2 KiB
Python
125 lines
4.2 KiB
Python
"""HMRC OAuth token persistence — Vault-backed refresh-token store.
|
|
|
|
The refresh_token is long-lived (HMRC grants 18 months). We keep it in
|
|
Vault at `secret/viktor/hmrc_refresh_token` and let ESO sync it to a K8s
|
|
Secret the pod mounts as an env var. On every refresh, we write the new
|
|
token back to Vault so the next pod restart picks it up.
|
|
|
|
Writing back requires Vault write access — the pod uses a short-lived
|
|
K8s-auth Vault token with a narrow policy that only allows writing
|
|
`secret/viktor/hmrc_refresh_token`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
|
|
import httpx
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
VAULT_KEY = "secret/viktor/hmrc_refresh_token"
|
|
PROD_BASE = "https://api.service.hmrc.gov.uk"
|
|
TOKEN_PATH = "/oauth/token"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OAuthCreds:
|
|
client_id: str
|
|
client_secret: str
|
|
redirect_uri: str
|
|
|
|
|
|
@dataclass
|
|
class TokenBundle:
|
|
access_token: str
|
|
refresh_token: str
|
|
expires_in: int
|
|
scope: str
|
|
|
|
@classmethod
|
|
def from_json(cls, data: dict[str, object]) -> TokenBundle:
|
|
return cls(
|
|
access_token=str(data["access_token"]),
|
|
refresh_token=str(data["refresh_token"]),
|
|
expires_in=int(data["expires_in"]), # type: ignore[arg-type]
|
|
scope=str(data.get("scope", "")),
|
|
)
|
|
|
|
|
|
def load_creds_from_env() -> OAuthCreds:
|
|
return OAuthCreds(
|
|
client_id=os.environ["HMRC_PROD_CLIENT_ID"],
|
|
client_secret=os.environ["HMRC_PROD_CLIENT_SECRET"],
|
|
redirect_uri=os.environ["HMRC_PROD_REDIRECT_URI"],
|
|
)
|
|
|
|
|
|
async def exchange_code(creds: OAuthCreds, code: str) -> TokenBundle:
|
|
"""Swap a fresh authorization_code for an access+refresh token pair."""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
f"{PROD_BASE}{TOKEN_PATH}",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"client_id": creds.client_id,
|
|
"client_secret": creds.client_secret,
|
|
"redirect_uri": creds.redirect_uri,
|
|
"code": code,
|
|
},
|
|
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
|
)
|
|
resp.raise_for_status()
|
|
return TokenBundle.from_json(resp.json())
|
|
|
|
|
|
async def refresh(creds: OAuthCreds, refresh_token: str) -> TokenBundle:
|
|
"""Exchange an old refresh_token for a fresh access+refresh pair.
|
|
|
|
HMRC rotates the refresh_token on every refresh — the old one becomes
|
|
invalid immediately after this call returns. Persist the new one to
|
|
Vault atomically; a failure between the refresh and the Vault write
|
|
leaves us stranded.
|
|
"""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.post(
|
|
f"{PROD_BASE}{TOKEN_PATH}",
|
|
data={
|
|
"grant_type": "refresh_token",
|
|
"client_id": creds.client_id,
|
|
"client_secret": creds.client_secret,
|
|
"refresh_token": refresh_token,
|
|
},
|
|
headers={"Accept": "application/vnd.hmrc.1.0+json"},
|
|
)
|
|
resp.raise_for_status()
|
|
return TokenBundle.from_json(resp.json())
|
|
|
|
|
|
def persist_to_vault(token: TokenBundle) -> None:
|
|
"""Write the new refresh_token back to Vault.
|
|
|
|
Uses the hvac client with K8s-auth — the pod's service-account token
|
|
at /var/run/secrets/kubernetes.io/serviceaccount/token logs into
|
|
Vault's kubernetes auth method and receives a short-lived Vault token
|
|
with write access to `secret/viktor/hmrc_refresh_token` only.
|
|
"""
|
|
import hvac
|
|
|
|
addr = os.environ.get("VAULT_ADDR", "https://vault.viktorbarzin.me")
|
|
role = os.environ.get("VAULT_K8S_ROLE", "hmrc-sync")
|
|
jwt_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
|
with open(jwt_path, encoding="utf-8") as fh:
|
|
jwt = fh.read()
|
|
client = hvac.Client(url=addr)
|
|
client.auth.kubernetes.login(role=role, jwt=jwt)
|
|
client.secrets.kv.v2.create_or_update_secret(
|
|
path="viktor/hmrc_refresh_token",
|
|
secret={
|
|
"refresh_token": token.refresh_token,
|
|
"expires_in": token.expires_in,
|
|
"scope": token.scope,
|
|
},
|
|
)
|
|
log.info("Rotated HMRC refresh_token persisted to Vault")
|