payslip-ingest/payslip_ingest/app.py
Viktor Barzin 26e43b1055 parser + P60 ingest: split income_tax cash/RSU, add P60 ground-truth
Meta variant-B payslips gross up Taxable Pay for RSU and compute PAYE on
the grossed-up figure, so `income_tax` on the slip is the total PAYE
(cash + RSU-attributed). Dashboards that stacked the raw figure made
vest-month tax look ~2x higher than "cash tax paid". Introduce
`cash_income_tax = income_tax * (gross_pay - pension_sacrifice) /
taxable_pay` as a derived column alongside the raw figure. Dashboards
can now stack cash vs RSU-attributed tax as separate segments.

Also capture YTD column values of `RSU Tax Offset` and `RSU Excs Refund`
from the Payments grid — needed for reconciliation against HMRC annual
figures.

P60 ingest: new parser under `parsers/p60.py` anchoring on statutory
HMRC line labels (`Tax year to 5 April YYYY`, `Employer PAYE reference`,
`In this employment` pay/tax row, NI letter bands). Processor routes
documents carrying the `p60` Paperless tag to `_handle_p60` which
writes to the new `payslip_ingest.p60_reference` table (one row per
tax_year+employer). App lifespan resolves the tag id at startup; missing
tag disables dispatch without breaking payslip ingest. Paperless tag
creation + webhook config are manual follow-ups.

Migrations:
- 0004 — cash_income_tax + ytd_rsu_tax_offset + ytd_rsu_excs_refund on
  payslip, all nullable.
- 0005 — p60_reference table with (tax_year, employer) unique +
  paperless_doc_id unique for idempotent re-uploads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:23:05 +00:00

130 lines
4.4 KiB
Python

import asyncio
import contextlib
import hmac
import logging
import os
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, Header, HTTPException, status
from prometheus_fastapi_instrumentator import Instrumentator
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import create_engine_from_env, make_session_factory
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.processor import process_document
from payslip_ingest.schema import WebhookPayload
log = logging.getLogger(__name__)
REQUIRED_ENV = [
"PAPERLESS_URL",
"PAPERLESS_API_TOKEN",
"CLAUDE_AGENT_URL",
"CLAUDE_AGENT_BEARER_TOKEN",
"DB_CONNECTION_STRING",
"WEBHOOK_BEARER_TOKEN",
]
# Type alias for the processor function — makes monkeypatching in tests explicit.
ProcessorFn = Callable[
[int, async_sessionmaker[Any], PaperlessClient, ClaudeExtractor, int | None],
Awaitable[Any],
]
P60_TAG_NAME = "p60"
def _verify_env() -> None:
missing = [k for k in REQUIRED_ENV if not os.environ.get(k)]
if missing:
raise RuntimeError(f"Missing required env vars: {', '.join(missing)}")
def _verify_bearer(authorization: str | None, expected: str) -> None:
if not expected:
raise HTTPException(status_code=401, detail="Service unauthenticated")
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing bearer token")
token = authorization.removeprefix("Bearer ")
if not hmac.compare_digest(token, expected):
raise HTTPException(status_code=401, detail="Invalid token")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
_verify_env()
engine = create_engine_from_env()
session_factory = make_session_factory(engine)
paperless = PaperlessClient(
base_url=os.environ["PAPERLESS_URL"],
api_token=os.environ["PAPERLESS_API_TOKEN"],
)
extractor = ClaudeExtractor(
base_url=os.environ["CLAUDE_AGENT_URL"],
bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"],
)
queue: asyncio.Queue[int] = asyncio.Queue()
processor: ProcessorFn = app.state.__dict__.get("processor_fn", process_document)
# Resolve the P60 Paperless tag once at startup. Missing tag → log
# and skip P60 dispatch; payslip ingest keeps working regardless.
p60_tag_id: int | None = None
try:
p60_tag_id = await paperless.get_tag_id(P60_TAG_NAME)
log.info("p60 dispatch enabled: tag_id=%s", p60_tag_id)
except Exception as exc:
log.warning("p60 tag %r not found — dispatch disabled: %s", P60_TAG_NAME, exc)
async def worker() -> None:
while True:
doc_id = await queue.get()
try:
await processor(doc_id, session_factory, paperless, extractor, p60_tag_id)
except Exception:
log.exception("processing failed for doc_id=%s", doc_id)
finally:
queue.task_done()
worker_task = asyncio.create_task(worker())
app.state.queue = queue
app.state.session_factory = session_factory
app.state.paperless = paperless
app.state.extractor = extractor
app.state.p60_tag_id = p60_tag_id
try:
yield
finally:
worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker_task
await paperless.aclose()
await extractor.aclose()
await engine.dispose()
app = FastAPI(title="Payslip Ingest", lifespan=lifespan)
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
@app.post("/webhook", status_code=status.HTTP_202_ACCEPTED)
async def webhook(
payload: WebhookPayload,
authorization: str | None = Header(default=None),
) -> dict[str, Any]:
_verify_bearer(authorization, os.environ.get("WEBHOOK_BEARER_TOKEN", ""))
queue: asyncio.Queue[int] = app.state.queue
await queue.put(payload.document_id)
return {"status": "accepted", "document_id": payload.document_id}
@app.get("/healthz")
async def healthz() -> dict[str, Any]:
queue: asyncio.Queue[int] | None = getattr(app.state, "queue", None)
depth = queue.qsize() if queue is not None else 0
return {"status": "ok", "queue_depth": depth}