parser + P60 ingest: split income_tax cash/RSU, add P60 ground-truth

Meta variant-B payslips gross up Taxable Pay for RSU and compute PAYE on
the grossed-up figure, so `income_tax` on the slip is the total PAYE
(cash + RSU-attributed). Dashboards that stacked the raw figure made
vest-month tax look ~2x higher than "cash tax paid". Introduce
`cash_income_tax = income_tax * (gross_pay - pension_sacrifice) /
taxable_pay` as a derived column alongside the raw figure. Dashboards
can now stack cash vs RSU-attributed tax as separate segments.

Also capture YTD column values of `RSU Tax Offset` and `RSU Excs Refund`
from the Payments grid — needed for reconciliation against HMRC annual
figures.

P60 ingest: new parser under `parsers/p60.py` anchoring on statutory
HMRC line labels (`Tax year to 5 April YYYY`, `Employer PAYE reference`,
`In this employment` pay/tax row, NI letter bands). Processor routes
documents carrying the `p60` Paperless tag to `_handle_p60` which
writes to the new `payslip_ingest.p60_reference` table (one row per
tax_year+employer). App lifespan resolves the tag id at startup; missing
tag disables dispatch without breaking payslip ingest. Paperless tag
creation + webhook config are manual follow-ups.

Migrations:
- 0004 — cash_income_tax + ytd_rsu_tax_offset + ytd_rsu_excs_refund on
  payslip, all nullable.
- 0005 — p60_reference table with (tax_year, employer) unique +
  paperless_doc_id unique for idempotent re-uploads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-04-19 15:23:05 +00:00
parent d91f34ddb4
commit 26e43b1055
14 changed files with 644 additions and 15 deletions

View file

@ -10,10 +10,11 @@ from typing import Any, Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import Payslip
from payslip_ingest.db import P60Reference, Payslip
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.parsers import ParserError, parse_meta_uk
from payslip_ingest.parsers import ExtractedP60, ParserError, parse_meta_uk, parse_p60
from payslip_ingest.parsers.p60 import P60ParserError
from payslip_ingest.schema import ExtractedPayslip, validate_totals
from payslip_ingest.tax_year import derive_tax_year
@ -58,8 +59,9 @@ class ProcessResult:
doc_id: int
status: str
payslip_id: int | None = None
p60_id: int | None = None
validated: bool | None = None
extractor: str | None = None # "meta_uk_regex" | "claude" | None
extractor: str | None = None # "meta_uk_regex" | "claude" | "p60_regex" | None
async def process_document(
@ -67,15 +69,26 @@ async def process_document(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
paperless: PaperlessClient,
extractor: ClaudeExtractor,
p60_tag_id: int | None = None,
) -> ProcessResult:
async with db_session_factory() as session:
existing = await session.execute(
existing_payslip = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
if existing.scalar() is not None:
log.info("skipping doc_id=%s — already ingested", doc_id)
if existing_payslip.scalar() is not None:
log.info("skipping doc_id=%s — already ingested as payslip", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
existing_p60 = await session.execute(
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
if existing_p60.scalar() is not None:
log.info("skipping doc_id=%s — already ingested as P60", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
metadata = await paperless.get_document(doc_id)
tag_ids = metadata.get("tags") or []
if p60_tag_id is not None and p60_tag_id in tag_ids:
pdf_bytes = await paperless.download_document(doc_id)
return await _handle_p60(doc_id, pdf_bytes, db_session_factory)
title = (metadata.get("title") or "").strip()
if NON_PAYSLIP_TITLE_RE.search(title):
log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title)
@ -199,6 +212,9 @@ async def _insert_payslip(
ytd_tax_paid=extracted.ytd_tax_paid,
ytd_taxable_pay=extracted.ytd_taxable_pay,
ytd_gross=extracted.ytd_gross,
cash_income_tax=extracted.cash_income_tax,
ytd_rsu_tax_offset=extracted.ytd_rsu_tax_offset,
ytd_rsu_excs_refund=extracted.ytd_rsu_excs_refund,
other_deductions=_decimals_to_float(extracted.other_deductions),
net_pay=extracted.net_pay,
tax_year=derive_tax_year(extracted.pay_date),
@ -212,3 +228,53 @@ async def _insert_payslip(
def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]:
return {k: float(v) for k, v in mapping.items()}
async def _handle_p60(
doc_id: int,
pdf_bytes: bytes,
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
) -> ProcessResult:
text = _pdftotext(pdf_bytes)
if not text:
raise ValueError(f"doc_id={doc_id} P60 pdftotext extraction returned empty")
try:
parsed = parse_p60(text)
except P60ParserError as exc:
raise ValueError(f"doc_id={doc_id} P60 parser miss: {exc}") from exc
log.info("p60 parsed: tax_year=%s employer=%s gross=%s tax=%s", parsed.tax_year,
parsed.employer, parsed.gross_pay, parsed.income_tax)
p60_id = await _insert_p60(db_session_factory, doc_id, parsed)
return ProcessResult(
doc_id=doc_id,
status="inserted" if p60_id is not None else "skipped",
p60_id=p60_id,
extractor="p60_regex",
)
async def _insert_p60(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
doc_id: int,
parsed: ExtractedP60,
) -> int | None:
async with db_session_factory() as session, session.begin():
existing = await session.execute(
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
if existing.scalar() is not None:
return None
row = P60Reference(
paperless_doc_id=doc_id,
tax_year=parsed.tax_year,
employer=parsed.employer,
employer_paye_ref=parsed.employer_paye_ref,
gross_pay=parsed.gross_pay,
income_tax=parsed.income_tax,
national_insurance=parsed.national_insurance,
student_loan=parsed.student_loan,
tax_code=parsed.tax_code,
raw_extraction=parsed.to_raw(),
)
session.add(row)
await session.flush()
return row.id