82 lines
2.7 KiB
Python
82 lines
2.7 KiB
Python
"""HMRC Individual Tax API v1.1 wrapper.
|
|
|
|
One method per endpoint we consume. Every request attaches the full fraud-
|
|
prevention header set built by `fraud_headers.build_headers()`.
|
|
|
|
Individual Tax API v1.1 returns tax-paid + income-breakdown figures per
|
|
employment per tax year — exactly the ground-truth data we reconcile
|
|
against the payslip-ingest monthly aggregate.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from hmrc_sync.fraud_headers import SessionContext, build_headers
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
PROD_BASE = "https://api.service.hmrc.gov.uk"
|
|
INDIVIDUAL_TAX_VERSION = "application/vnd.hmrc.1.1+json"
|
|
|
|
|
|
@dataclass
|
|
class HmrcResponse:
|
|
status_code: int
|
|
body: dict[str, Any]
|
|
duration_ms: int
|
|
request_id: str | None
|
|
correlation_id: str | None
|
|
fraud_headers_sent: dict[str, str]
|
|
|
|
|
|
class HmrcClient:
|
|
|
|
def __init__(self,
|
|
access_token: str,
|
|
session: SessionContext,
|
|
connection_method: str = "BATCH_PROCESS_DIRECT",
|
|
base_url: str = PROD_BASE):
|
|
self._access_token = access_token
|
|
self._session = session
|
|
self._connection_method = connection_method
|
|
self._base_url = base_url.rstrip("/")
|
|
|
|
async def individual_tax_summary(self, utr: str, tax_year: str) -> HmrcResponse:
|
|
"""GET /individuals/tax/sa/{utr}/summary/{taxYear}
|
|
|
|
`utr` is the 10-digit Self Assessment reference; tax_year format
|
|
is `YYYY-YY` (e.g. `2024-25`).
|
|
"""
|
|
path = f"/individuals/tax/sa/{utr}/summary/{tax_year}"
|
|
return await self._get(path)
|
|
|
|
async def _get(self, path: str) -> HmrcResponse:
|
|
fraud = build_headers(self._session, self._connection_method)
|
|
headers = {
|
|
"Accept": INDIVIDUAL_TAX_VERSION,
|
|
"Authorization": f"Bearer {self._access_token}",
|
|
}
|
|
headers.update(fraud)
|
|
started = time.perf_counter()
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.get(f"{self._base_url}{path}", headers=headers)
|
|
duration_ms = int((time.perf_counter() - started) * 1000)
|
|
body: dict[str, Any]
|
|
try:
|
|
body = resp.json() if resp.content else {}
|
|
except ValueError:
|
|
body = {"raw": resp.text[:2000]}
|
|
log.info("hmrc %s status=%s duration=%dms", path, resp.status_code, duration_ms)
|
|
return HmrcResponse(
|
|
status_code=resp.status_code,
|
|
body=body,
|
|
duration_ms=duration_ms,
|
|
request_id=resp.headers.get("x-request-id"),
|
|
correlation_id=resp.headers.get("x-correlation-id"),
|
|
fraud_headers_sent=fraud,
|
|
)
|