diff --git a/alembic/versions/0004_cash_income_tax.py b/alembic/versions/0004_cash_income_tax.py new file mode 100644 index 0000000..1aa7517 --- /dev/null +++ b/alembic/versions/0004_cash_income_tax.py @@ -0,0 +1,53 @@ +"""Add cash_income_tax + YTD RSU offset/refund columns. + +Meta variant-B payslips gross up Taxable Pay for RSU vest; PAYE on the slip +(`Tax paid`) is the total including the RSU-attributed portion. Storing the +verbatim figure is correct for HMRC accounting but makes vest-month tax look +~2x higher on dashboards that stack it against cash pay. + +`cash_income_tax` is the derived pro-rata share of PAYE that the cash portion +of gross (gross - pension_sacrifice) bears, computed as +`income_tax * (gross_pay - pension_sacrifice) / taxable_pay`. Dashboards can +stack the derived column and show the remainder as the RSU-attributed slice. + +`ytd_rsu_tax_offset` and `ytd_rsu_excs_refund` capture the Year-to-Date +column of the RSU lines in the Payments block — useful for reconciliation +against HMRC's annual figures once the P60 / HMRC API pipelines land. + +All three columns are nullable; existing rows get NULL until a one-shot +backfill runs. +""" +import sqlalchemy as sa + +from alembic import op + +revision = "0004" +down_revision = "0003" +branch_labels = None +depends_on = None + +SCHEMA = "payslip_ingest" + + +def upgrade() -> None: + op.add_column( + "payslip", + sa.Column("cash_income_tax", sa.Numeric(12, 2), nullable=True), + schema=SCHEMA, + ) + op.add_column( + "payslip", + sa.Column("ytd_rsu_tax_offset", sa.Numeric(12, 2), nullable=True), + schema=SCHEMA, + ) + op.add_column( + "payslip", + sa.Column("ytd_rsu_excs_refund", sa.Numeric(12, 2), nullable=True), + schema=SCHEMA, + ) + + +def downgrade() -> None: + op.drop_column("payslip", "ytd_rsu_excs_refund", schema=SCHEMA) + op.drop_column("payslip", "ytd_rsu_tax_offset", schema=SCHEMA) + op.drop_column("payslip", "cash_income_tax", schema=SCHEMA) diff --git a/alembic/versions/0005_p60_reference.py b/alembic/versions/0005_p60_reference.py new file mode 100644 index 0000000..946c9de --- /dev/null +++ b/alembic/versions/0005_p60_reference.py @@ -0,0 +1,63 @@ +"""Add p60_reference table for HMRC annual ground-truth reconciliation. + +P60 is the authoritative end-of-year certificate HMRC issues; its figures +match what HMRC has on file. Storing one row per (tax_year, employer) lets +the dashboard compare `SUM(payslip)` against the P60 totals and surface +missing-month gaps or parser drift. + +Columns mirror what the P60 explicitly prints; everything derived (effective +rate, deltas) stays in the dashboard SQL. `paperless_doc_id` is unique so +re-uploading the same PDF is idempotent. `raw_extraction` keeps the full +parsed dict for debugging parser regressions. +""" +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision = "0005" +down_revision = "0004" +branch_labels = None +depends_on = None + +SCHEMA = "payslip_ingest" + + +def upgrade() -> None: + op.create_table( + "p60_reference", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("tax_year", sa.String(), nullable=False), + sa.Column("employer", sa.String(), nullable=False), + sa.Column("employer_paye_ref", sa.String(), nullable=True), + sa.Column("gross_pay", sa.Numeric(12, 2), nullable=False), + sa.Column("income_tax", sa.Numeric(12, 2), nullable=False), + sa.Column("national_insurance", sa.Numeric(12, 2), nullable=False), + sa.Column("student_loan", sa.Numeric(12, 2), nullable=True), + sa.Column("tax_code", sa.String(), nullable=True), + sa.Column("paperless_doc_id", sa.Integer(), nullable=False, unique=True), + sa.Column( + "raw_extraction", + postgresql.JSONB().with_variant(sa.JSON(), "sqlite"), + nullable=False, + ), + sa.Column( + "created_at", + sa.TIMESTAMP(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.UniqueConstraint("tax_year", "employer", name="uq_p60_tax_year_employer"), + schema=SCHEMA, + ) + op.create_index( + "ix_p60_reference_tax_year", + "p60_reference", + ["tax_year"], + schema=SCHEMA, + ) + + +def downgrade() -> None: + op.drop_index("ix_p60_reference_tax_year", table_name="p60_reference", schema=SCHEMA) + op.drop_table("p60_reference", schema=SCHEMA) diff --git a/payslip_ingest/__main__.py b/payslip_ingest/__main__.py index f30293d..4d4a9da 100644 --- a/payslip_ingest/__main__.py +++ b/payslip_ingest/__main__.py @@ -51,6 +51,14 @@ async def _backfill(tag: str, limit: int | None) -> None: base_url=os.environ["CLAUDE_AGENT_URL"], bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], ) + # Resolve the P60 tag if present — needed for the dispatch branch even + # when backfilling a non-p60 tag (a P60-tagged doc carrying the payslip + # tag too should still route to the P60 handler). + p60_tag_id: int | None = None + try: + p60_tag_id = await paperless.get_tag_id("p60") + except Exception as exc: + click.echo(f"warning: p60 tag resolution failed — dispatch disabled: {exc}", err=True) processed = 0 failed = 0 try: @@ -59,7 +67,8 @@ async def _backfill(tag: str, limit: int | None) -> None: break doc_id = int(doc["id"]) try: - result = await process_document(doc_id, session_factory, paperless, extractor) + result = await process_document(doc_id, session_factory, paperless, extractor, + p60_tag_id) click.echo(f"doc_id={doc_id} status={result.status} validated={result.validated}") except Exception as exc: # Don't let a single bad doc (wrong tag, non-payslip PDF, Claude diff --git a/payslip_ingest/app.py b/payslip_ingest/app.py index b3308a7..1516bd2 100644 --- a/payslip_ingest/app.py +++ b/payslip_ingest/app.py @@ -30,10 +30,12 @@ REQUIRED_ENV = [ # Type alias for the processor function — makes monkeypatching in tests explicit. ProcessorFn = Callable[ - [int, async_sessionmaker[Any], PaperlessClient, ClaudeExtractor], + [int, async_sessionmaker[Any], PaperlessClient, ClaudeExtractor, int | None], Awaitable[Any], ] +P60_TAG_NAME = "p60" + def _verify_env() -> None: missing = [k for k in REQUIRED_ENV if not os.environ.get(k)] @@ -69,11 +71,20 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: processor: ProcessorFn = app.state.__dict__.get("processor_fn", process_document) + # Resolve the P60 Paperless tag once at startup. Missing tag → log + # and skip P60 dispatch; payslip ingest keeps working regardless. + p60_tag_id: int | None = None + try: + p60_tag_id = await paperless.get_tag_id(P60_TAG_NAME) + log.info("p60 dispatch enabled: tag_id=%s", p60_tag_id) + except Exception as exc: + log.warning("p60 tag %r not found — dispatch disabled: %s", P60_TAG_NAME, exc) + async def worker() -> None: while True: doc_id = await queue.get() try: - await processor(doc_id, session_factory, paperless, extractor) + await processor(doc_id, session_factory, paperless, extractor, p60_tag_id) except Exception: log.exception("processing failed for doc_id=%s", doc_id) finally: @@ -84,6 +95,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app.state.session_factory = session_factory app.state.paperless = paperless app.state.extractor = extractor + app.state.p60_tag_id = p60_tag_id try: yield diff --git a/payslip_ingest/db.py b/payslip_ingest/db.py index 821b4b7..5949532 100644 --- a/payslip_ingest/db.py +++ b/payslip_ingest/db.py @@ -63,6 +63,9 @@ class Payslip(Base): ytd_tax_paid: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) ytd_taxable_pay: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) ytd_gross: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + cash_income_tax: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + ytd_rsu_tax_offset: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + ytd_rsu_excs_refund: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) other_deductions: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True) net_pay: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) tax_year: Mapped[str] = mapped_column(String, nullable=False) @@ -73,6 +76,32 @@ class Payslip(Base): server_default=text("now()")) +class P60Reference(Base): + """HMRC-issued annual P60. One row per (tax_year, employer). + + Source of truth for annual PAYE/NI — lets the dashboard reconcile + `SUM(payslip_ingest.payslip)` against the figures HMRC actually has on + file, catching both missing-month gaps and parser drift. + """ + __tablename__ = "p60_reference" + __table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012 + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + tax_year: Mapped[str] = mapped_column(String, nullable=False, index=True) + employer: Mapped[str] = mapped_column(String, nullable=False) + employer_paye_ref: Mapped[str | None] = mapped_column(String, nullable=True) + gross_pay: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + income_tax: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + national_insurance: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + student_loan: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + tax_code: Mapped[str | None] = mapped_column(String, nullable=True) + paperless_doc_id: Mapped[int] = mapped_column(Integer, unique=True, nullable=False) + raw_extraction: Mapped[dict[str, Any]] = mapped_column(JSON_TYPE, nullable=False) + created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=text("now()")) + + def create_engine_from_env() -> AsyncEngine: url = os.environ["DB_CONNECTION_STRING"] return create_async_engine(url, pool_pre_ping=True) diff --git a/payslip_ingest/parsers/__init__.py b/payslip_ingest/parsers/__init__.py index fa10b88..de9fa21 100644 --- a/payslip_ingest/parsers/__init__.py +++ b/payslip_ingest/parsers/__init__.py @@ -1,3 +1,4 @@ from payslip_ingest.parsers.meta_uk import ParserError, parse_meta_uk +from payslip_ingest.parsers.p60 import ExtractedP60, P60ParserError, parse_p60 -__all__ = ["ParserError", "parse_meta_uk"] +__all__ = ["ExtractedP60", "P60ParserError", "ParserError", "parse_meta_uk", "parse_p60"] diff --git a/payslip_ingest/parsers/meta_uk.py b/payslip_ingest/parsers/meta_uk.py index 68a744b..900859e 100644 --- a/payslip_ingest/parsers/meta_uk.py +++ b/payslip_ingest/parsers/meta_uk.py @@ -158,7 +158,7 @@ RSU_VEST_LABELS = { def _parse_variant_bc(text: str, lines: list[str], employer: str) -> ExtractedPayslip: header_idx, d_col, y_col = _find_bc_header(lines) - payments, payments_order, deductions = _collect_bc_rows(lines, header_idx, d_col, y_col) + payments, payments_order, deductions, ytd = _collect_bc_rows(lines, header_idx, d_col, y_col) gross_pay, net_pay = _parse_bc_totals_row(lines, header_idx, d_col, y_col) summary = _parse_bc_summary_block(lines) @@ -173,6 +173,9 @@ def _parse_variant_bc(text: str, lines: list[str], employer: str) -> ExtractedPa nic = deductions.get("Employee NIC", deductions.get("National Insurance", Decimal("0"))) student_loan = deductions.get("Student Loans", deductions.get("Student Loan", Decimal("0"))) + taxable_pay = summary.get("taxable_pay") + cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay) + other_deductions = {k: v for k, v in deductions.items() if k not in DEDUCTIONS_KNOWN} del payments_order # retained for future debugging; not used in validation @@ -199,15 +202,39 @@ def _parse_variant_bc(text: str, lines: list[str], employer: str) -> ExtractedPa salary=payments.get("Salary", Decimal("0")), bonus=payments.get("Perform Bonus", payments.get("Bonus", Decimal("0"))), pension_sacrifice=pension_sacrifice, - taxable_pay=summary.get("taxable_pay"), + taxable_pay=taxable_pay, ytd_tax_paid=summary.get("ytd_tax_paid"), ytd_taxable_pay=summary.get("ytd_taxable_pay"), ytd_gross=summary.get("ytd_gross"), + cash_income_tax=cash_income_tax, + ytd_rsu_tax_offset=ytd.get("RSU Tax Offset"), + ytd_rsu_excs_refund=ytd.get("RSU Excs Refund"), other_deductions=other_deductions, net_pay=net_pay, ) +def _cash_income_tax( + income_tax: Decimal, + gross_pay: Decimal, + pension_sacrifice: Decimal, + taxable_pay: Decimal | None, +) -> Decimal: + """Derived pro-rata PAYE attributable to cash pay. + + Meta variant-B grosses up Taxable Pay for RSU and computes PAYE on the + grossed-up figure, so `income_tax` on the slip is total PAYE (cash + RSU). + The cash-attributable share is `income_tax * cash_base / taxable_pay`, + where `cash_base = gross_pay - pension_sacrifice`. + + Variant A doesn't surface `taxable_pay` — fall back to the full figure + (it predates the variant-B grossing behaviour anyway). + """ + if taxable_pay is None or taxable_pay == 0: + return income_tax + return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01")) + + def _find_bc_header(lines: list[str]) -> tuple[int, int, int]: for i, line in enumerate(lines): if ("Payments" in line and "Deductions" in line and re.search(r"Year [Tt]o Date", line)): @@ -223,18 +250,20 @@ def _collect_bc_rows( header_idx: int, d_col: int, y_col: int, -) -> tuple[dict[str, Decimal], list[tuple[str, Decimal]], dict[str, Decimal]]: +) -> tuple[dict[str, Decimal], list[tuple[str, Decimal]], dict[str, Decimal], dict[str, Decimal]]: payments: dict[str, Decimal] = {} order: list[tuple[str, Decimal]] = [] deductions: dict[str, Decimal] = {} + ytd: dict[str, Decimal] = {} for i in range(header_idx + 1, len(lines)): line = lines[i].rstrip() if "Total Payment" in line: - return payments, order, deductions + return payments, order, deductions, ytd if not line.strip(): continue p_seg = line[:d_col] if len(line) > d_col else line d_seg = line[d_col:y_col] if len(line) > d_col else "" + y_seg = line[y_col:] if len(line) > y_col else "" p_label, p_amount = _last_amount(p_seg) if p_label and p_amount is not None: payments[p_label] = p_amount @@ -246,7 +275,10 @@ def _collect_bc_rows( if d_label == "RSU Net Gain": d_amount = abs(d_amount) deductions[d_label] = d_amount - return payments, order, deductions + y_label, y_amount = _last_amount(y_seg) + if y_label and y_amount is not None: + ytd[y_label] = y_amount + return payments, order, deductions, ytd def _parse_bc_totals_row( @@ -367,6 +399,7 @@ def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPay taxable_pay_s = _find_match(text, TAXABLE_PAY_A_RE) taxable_pay = _to_decimal(taxable_pay_s) if taxable_pay_s else None + cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay) pay_date = _parse_date(text) @@ -391,6 +424,9 @@ def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPay ytd_tax_paid=None, ytd_taxable_pay=None, ytd_gross=None, + cash_income_tax=cash_income_tax, + ytd_rsu_tax_offset=None, + ytd_rsu_excs_refund=None, other_deductions=other_deductions, net_pay=net_pay, ) diff --git a/payslip_ingest/parsers/p60.py b/payslip_ingest/parsers/p60.py new file mode 100644 index 0000000..bb9833a --- /dev/null +++ b/payslip_ingest/parsers/p60.py @@ -0,0 +1,152 @@ +"""Regex-based parser for HMRC P60 End of Year Certificates. + +UK P60 format is statutory — every employer's P60 has the same line anchors: +`Tax year to 5 April YYYY`, `Employer PAYE reference`, `In this employment` +(gross pay), `Tax deducted`, etc. We lean on those anchors rather than +column layout because pdftotext output varies between employers. + +Handles two employer spellings that Meta has used on P60s over the years: +`Facebook UK Ltd` (pre-2022) and `Facebook UK Limited` (2022+). Returns an +`ExtractedP60` dataclass; on structural miss raises `P60ParserError`. +""" +import re +from dataclasses import dataclass +from decimal import Decimal + +from payslip_ingest.parsers.meta_uk import AMOUNT_RE, EMPLOYER_RE, _to_decimal + + +class P60ParserError(ValueError): + """Raised when the P60 template cannot be matched.""" + + +TAX_YEAR_RE = re.compile(r"Tax year to 5 April\s+(\d{4})") +# HMRC PAYE references are `NNN/XXXXXXX` — 3 digits + slash + alphanumeric. +PAYE_REF_RE = re.compile(r"Employer PAYE reference\s+(\d{3}\s*/\s*[A-Z0-9]+)") +# Wrap AMOUNT_RE.pattern in a non-capturing group so its top-level `|` +# stays scoped inside — otherwise embedding it in a larger regex flips +# the alternation into the outer context. +AMOUNT_FRAG = "(?:" + AMOUNT_RE.pattern + ")" +# The canonical P60 has a row `In this employment £ £`. We +# capture both amounts: group 1 = pay, group 2 = tax deducted. +IN_EMPLOYMENT_RE = re.compile(r"In this employment[^\n\d]+£?\s*(" + AMOUNT_FRAG + + r")[^\n\d]+£?\s*(" + AMOUNT_FRAG + r")") +# Fallback: some P60 layouts (e.g. older printouts) put pay and tax on +# separate lines — a `Total for year` row has both, same shape. +TOTAL_FOR_YEAR_RE = re.compile(r"Total for year[^\n\d]+£?\s*(" + AMOUNT_FRAG + r")[^\n\d]+£?\s*(" + + AMOUNT_FRAG + r")") +# NI totals are split by letter (A/B/C/H). Anchor on lines that start with +# a single letter followed by three whitespace-separated amounts; take the +# 3rd amount as the employee's contribution for that letter band. +NI_LETTER_LINE_RE = re.compile( + r"^[A-Z]\s+£?\s*" + AMOUNT_FRAG + r"\s+£?\s*" + AMOUNT_FRAG + r"\s+£?\s*(" + AMOUNT_FRAG + r")", + re.MULTILINE) +# Student loan is optional — not every P60 has one. Zero is still "has one". +STUDENT_LOAN_RE = re.compile(r"Student Loan (?:repayments|deductions)[^\n\d]*£?\s*(" + AMOUNT_FRAG + + r")") +TAX_CODE_RE = re.compile(r"Final tax code\s+([0-9A-Z]+[A-Z])") + + +@dataclass +class ExtractedP60: + tax_year: str # "2024/25" + employer: str + employer_paye_ref: str | None + gross_pay: Decimal + income_tax: Decimal + national_insurance: Decimal + student_loan: Decimal | None + tax_code: str | None + + def to_raw(self) -> dict[str, str | None]: + """Snapshot for `raw_extraction` JSON column.""" + return { + "tax_year": self.tax_year, + "employer": self.employer, + "employer_paye_ref": self.employer_paye_ref, + "gross_pay": str(self.gross_pay), + "income_tax": str(self.income_tax), + "national_insurance": str(self.national_insurance), + "student_loan": str(self.student_loan) if self.student_loan is not None else None, + "tax_code": self.tax_code, + } + + +def parse_p60(text: str) -> ExtractedP60: + if not text.strip(): + raise P60ParserError("empty text") + if "P60" not in text: + raise P60ParserError("does not look like a P60 (missing 'P60' marker)") + + tax_year = _parse_tax_year(text) + employer = _parse_employer(text) + paye_ref = _parse_paye_ref(text) + + gross_pay, income_tax = _parse_pay_and_tax(text) + ni = _sum_ni(text) + student_loan = _optional_amount(text, STUDENT_LOAN_RE) + tax_code = _match_group(text, TAX_CODE_RE) + + return ExtractedP60( + tax_year=tax_year, + employer=employer, + employer_paye_ref=paye_ref, + gross_pay=gross_pay, + income_tax=income_tax, + national_insurance=ni, + student_loan=student_loan, + tax_code=tax_code, + ) + + +def _parse_tax_year(text: str) -> str: + m = TAX_YEAR_RE.search(text) + if not m: + raise P60ParserError("`Tax year to 5 April YYYY` anchor not found") + ending_year = int(m.group(1)) + # "to 5 April 2025" → the tax year is 2024/25. + return f"{ending_year - 1}/{str(ending_year)[-2:]}" + + +def _parse_employer(text: str) -> str: + m = EMPLOYER_RE.search(text) + if not m: + raise P60ParserError("employer name not found (expected Facebook UK Ltd/Limited)") + return m.group(0) + + +def _parse_paye_ref(text: str) -> str | None: + m = PAYE_REF_RE.search(text) + if not m: + return None + return re.sub(r"\s+", "", m.group(1)) + + +def _parse_pay_and_tax(text: str) -> tuple[Decimal, Decimal]: + """Return (gross_pay, income_tax) from the `In this employment` row. + + Falls back to `Total for year` if the primary row isn't present — some + older / reformatted P60s only print the totals line. + """ + m = IN_EMPLOYMENT_RE.search(text) or TOTAL_FOR_YEAR_RE.search(text) + if not m: + raise P60ParserError("Neither `In this employment` nor `Total for year` pay/tax row found") + return _to_decimal(m.group(1)), _to_decimal(m.group(2)) + + +def _optional_amount(text: str, pattern: re.Pattern[str]) -> Decimal | None: + m = pattern.search(text) + return _to_decimal(m.group(1)) if m else None + + +def _sum_ni(text: str) -> Decimal: + """Sum contributions across all NI letter rows (A/B/C/H ...).""" + total = Decimal("0") + for m in NI_LETTER_LINE_RE.finditer(text): + total += _to_decimal(m.group(1)) + return total + + +def _match_group(text: str, pattern: re.Pattern[str]) -> str | None: + m = pattern.search(text) + return m.group(1).strip() if m else None diff --git a/payslip_ingest/processor.py b/payslip_ingest/processor.py index aa8a617..c10f955 100644 --- a/payslip_ingest/processor.py +++ b/payslip_ingest/processor.py @@ -10,10 +10,11 @@ from typing import Any, Protocol from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker -from payslip_ingest.db import Payslip +from payslip_ingest.db import P60Reference, Payslip from payslip_ingest.extractor import ClaudeExtractor from payslip_ingest.paperless import PaperlessClient -from payslip_ingest.parsers import ParserError, parse_meta_uk +from payslip_ingest.parsers import ExtractedP60, ParserError, parse_meta_uk, parse_p60 +from payslip_ingest.parsers.p60 import P60ParserError from payslip_ingest.schema import ExtractedPayslip, validate_totals from payslip_ingest.tax_year import derive_tax_year @@ -58,8 +59,9 @@ class ProcessResult: doc_id: int status: str payslip_id: int | None = None + p60_id: int | None = None validated: bool | None = None - extractor: str | None = None # "meta_uk_regex" | "claude" | None + extractor: str | None = None # "meta_uk_regex" | "claude" | "p60_regex" | None async def process_document( @@ -67,15 +69,26 @@ async def process_document( db_session_factory: async_sessionmaker[Any] | _SessionFactory, paperless: PaperlessClient, extractor: ClaudeExtractor, + p60_tag_id: int | None = None, ) -> ProcessResult: async with db_session_factory() as session: - existing = await session.execute( + existing_payslip = await session.execute( select(Payslip.id).where(Payslip.paperless_doc_id == doc_id)) - if existing.scalar() is not None: - log.info("skipping doc_id=%s — already ingested", doc_id) + if existing_payslip.scalar() is not None: + log.info("skipping doc_id=%s — already ingested as payslip", doc_id) + return ProcessResult(doc_id=doc_id, status="skipped") + existing_p60 = await session.execute( + select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id)) + if existing_p60.scalar() is not None: + log.info("skipping doc_id=%s — already ingested as P60", doc_id) return ProcessResult(doc_id=doc_id, status="skipped") metadata = await paperless.get_document(doc_id) + tag_ids = metadata.get("tags") or [] + if p60_tag_id is not None and p60_tag_id in tag_ids: + pdf_bytes = await paperless.download_document(doc_id) + return await _handle_p60(doc_id, pdf_bytes, db_session_factory) + title = (metadata.get("title") or "").strip() if NON_PAYSLIP_TITLE_RE.search(title): log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title) @@ -199,6 +212,9 @@ async def _insert_payslip( ytd_tax_paid=extracted.ytd_tax_paid, ytd_taxable_pay=extracted.ytd_taxable_pay, ytd_gross=extracted.ytd_gross, + cash_income_tax=extracted.cash_income_tax, + ytd_rsu_tax_offset=extracted.ytd_rsu_tax_offset, + ytd_rsu_excs_refund=extracted.ytd_rsu_excs_refund, other_deductions=_decimals_to_float(extracted.other_deductions), net_pay=extracted.net_pay, tax_year=derive_tax_year(extracted.pay_date), @@ -212,3 +228,53 @@ async def _insert_payslip( def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]: return {k: float(v) for k, v in mapping.items()} + + +async def _handle_p60( + doc_id: int, + pdf_bytes: bytes, + db_session_factory: async_sessionmaker[Any] | _SessionFactory, +) -> ProcessResult: + text = _pdftotext(pdf_bytes) + if not text: + raise ValueError(f"doc_id={doc_id} P60 pdftotext extraction returned empty") + try: + parsed = parse_p60(text) + except P60ParserError as exc: + raise ValueError(f"doc_id={doc_id} P60 parser miss: {exc}") from exc + log.info("p60 parsed: tax_year=%s employer=%s gross=%s tax=%s", parsed.tax_year, + parsed.employer, parsed.gross_pay, parsed.income_tax) + p60_id = await _insert_p60(db_session_factory, doc_id, parsed) + return ProcessResult( + doc_id=doc_id, + status="inserted" if p60_id is not None else "skipped", + p60_id=p60_id, + extractor="p60_regex", + ) + + +async def _insert_p60( + db_session_factory: async_sessionmaker[Any] | _SessionFactory, + doc_id: int, + parsed: ExtractedP60, +) -> int | None: + async with db_session_factory() as session, session.begin(): + existing = await session.execute( + select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id)) + if existing.scalar() is not None: + return None + row = P60Reference( + paperless_doc_id=doc_id, + tax_year=parsed.tax_year, + employer=parsed.employer, + employer_paye_ref=parsed.employer_paye_ref, + gross_pay=parsed.gross_pay, + income_tax=parsed.income_tax, + national_insurance=parsed.national_insurance, + student_loan=parsed.student_loan, + tax_code=parsed.tax_code, + raw_extraction=parsed.to_raw(), + ) + session.add(row) + await session.flush() + return row.id diff --git a/payslip_ingest/schema.py b/payslip_ingest/schema.py index 0f501c2..5150d5a 100644 --- a/payslip_ingest/schema.py +++ b/payslip_ingest/schema.py @@ -50,6 +50,16 @@ class ExtractedPayslip(BaseModel): ytd_tax_paid: Decimal | None = None ytd_taxable_pay: Decimal | None = None ytd_gross: Decimal | None = None + # Derived pro-rata share of income_tax attributable to cash pay + # (= income_tax * (gross_pay - pension_sacrifice) / taxable_pay). Nullable + # because variant A doesn't surface taxable_pay and we fall back to + # full income_tax in that case. + cash_income_tax: Decimal | None = None + # YTD Year-to-Date column values of RSU Tax Offset / RSU Excs Refund rows + # in the Payments block — captured for reconciliation with HMRC annual + # figures (P60 + Individual Tax API). + ytd_rsu_tax_offset: Decimal | None = None + ytd_rsu_excs_refund: Decimal | None = None other_deductions: dict[str, Decimal] = Field(default_factory=dict) net_pay: Decimal diff --git a/tests/fixtures/meta_uk_p60_2024_25.txt b/tests/fixtures/meta_uk_p60_2024_25.txt new file mode 100644 index 0000000..a9ab091 --- /dev/null +++ b/tests/fixtures/meta_uk_p60_2024_25.txt @@ -0,0 +1,51 @@ +P60 End of Year Certificate + +Tax year to 5 April 2025 + +Employee's details + +Surname BARZIN +First two forenames VIKTOR +National Insurance number AA 12 34 56 A +Works/Payroll number 254680 + +Pay and Income Tax details + + Pay Tax deducted + +In previous employment(s) £0.00 £0.00 +In this employment £232,630.34 £95,820.11 + +Total for year £232,630.34 £95,820.11 + +Final tax code 1257L + +National Insurance contributions in this employment + +NI table letter Earnings at Earnings above Total of employee's + LEL LEL up to UEL contributions in + this employment +A £6,396.00 £47,268.00 £5,172.40 + +Statutory payments included in the pay 'In this employment' figure + +Statutory Maternity Pay £0.00 +Statutory Paternity Pay £0.00 + +Student Loan deductions in this employment £0.00 + +Other details + +Your employer's full name and address + +Facebook UK Limited +10 Brock Street +London +NW1 3FG + +Employer PAYE reference 120/FA12345 + +This form shows your total pay for Income Tax purposes in this employment +for the year. Any overtime, bonus, commission etc, Statutory Sick Pay, +Statutory Maternity Pay, Statutory Paternity Pay or Shared Parental Pay, +Statutory Parental Bereavement Pay is included. diff --git a/tests/test_meta_uk_parser.py b/tests/test_meta_uk_parser.py index 120629b..9a84edf 100644 --- a/tests/test_meta_uk_parser.py +++ b/tests/test_meta_uk_parser.py @@ -38,6 +38,15 @@ def test_parses_variant_b_modern() -> None: assert result.ytd_taxable_pay == Decimal("373601.64") assert result.ytd_gross == Decimal("232630.34") + # Derived cash-only PAYE: income_tax * (gross - pension_sacrifice) / taxable_pay + # = 31311.90 * 39282.69 / 72096.92 = 17060.59 (vs 31311.90 total PAYE) + assert result.cash_income_tax is not None + assert abs(result.cash_income_tax - Decimal("17060.59")) <= Decimal("0.02") + + # YTD column of RSU lines in the Payments grid + assert result.ytd_rsu_tax_offset == Decimal("124674.27") + assert result.ytd_rsu_excs_refund == Decimal("3221.32") + def test_parses_variant_b_with_bonus() -> None: """March 2025 — variant B, bonus + RSU + multiple other deductions.""" @@ -145,6 +154,28 @@ def test_parses_variant_a_2021_08() -> None: assert result.taxable_pay == Decimal("15323.16") +def test_cash_income_tax_falls_back_when_taxable_pay_missing() -> None: + """When taxable_pay is None, cash_income_tax == income_tax (no RSU grossing).""" + from payslip_ingest.parsers.meta_uk import _cash_income_tax + + assert _cash_income_tax(Decimal("1000"), Decimal("5000"), Decimal("100"), + None) == Decimal("1000") + assert _cash_income_tax(Decimal("1000"), Decimal("5000"), Decimal("100"), + Decimal("0")) == Decimal("1000") + + +def test_variant_a_cash_income_tax_pro_rata() -> None: + """Variant A fixture with taxable_pay → cash_income_tax is pro-rata. + + 2021-06 has taxable_pay=5095.86 (= gross_pay), pension_sacrifice=152.90, + income_tax=1410.07 → cash_income_tax = 1410.07 * 4942.96 / 5095.86 = 1367.76. + """ + result = parse_meta_uk(_load("meta_uk_2021_06_variant_a_bik.txt")) + assert result.taxable_pay == Decimal("5095.86") + assert result.cash_income_tax is not None + assert abs(result.cash_income_tax - Decimal("1367.76")) <= Decimal("0.02") + + def test_raises_on_non_meta_payslip() -> None: with pytest.raises(ParserError): parse_meta_uk("This is not a Meta payslip\nRandom text\n") diff --git a/tests/test_p60_parser.py b/tests/test_p60_parser.py new file mode 100644 index 0000000..c0ca22b --- /dev/null +++ b/tests/test_p60_parser.py @@ -0,0 +1,74 @@ +from decimal import Decimal +from pathlib import Path + +import pytest + +from payslip_ingest.parsers.p60 import P60ParserError, parse_p60 + +FIXTURES = Path(__file__).parent / "fixtures" + + +def _load(name: str) -> str: + return (FIXTURES / name).read_text(encoding="utf-8") + + +def test_parses_meta_uk_p60_2024_25() -> None: + result = parse_p60(_load("meta_uk_p60_2024_25.txt")) + + assert result.tax_year == "2024/25" + assert result.employer == "Facebook UK Limited" + assert result.employer_paye_ref == "120/FA12345" + assert result.gross_pay == Decimal("232630.34") + assert result.income_tax == Decimal("95820.11") + assert result.national_insurance == Decimal("5172.40") + assert result.student_loan == Decimal("0.00") + assert result.tax_code == "1257L" + + +def test_parse_p60_raises_on_non_p60_text() -> None: + with pytest.raises(P60ParserError, match="does not look like a P60"): + parse_p60("Payslip for March 2025\nGross: £1000\n") + + +def test_parse_p60_raises_on_empty_text() -> None: + with pytest.raises(P60ParserError): + parse_p60("") + + +def test_parse_p60_raises_without_tax_year_anchor() -> None: + with pytest.raises(P60ParserError, match="Tax year"): + parse_p60("P60\nSome other content without the required anchor\n") + + +def test_parse_p60_handles_old_facebook_uk_ltd_spelling() -> None: + """Pre-2022 P60s list the employer as `Facebook UK Ltd` (no `Limited`).""" + text = _load("meta_uk_p60_2024_25.txt").replace("Facebook UK Limited", "Facebook UK Ltd") + result = parse_p60(text) + assert result.employer == "Facebook UK Ltd" + + +def test_parse_p60_student_loan_missing_is_none() -> None: + """P60s for years without student-loan deductions omit that line entirely.""" + text = _load("meta_uk_p60_2024_25.txt") + # Strip the Student Loan line (simulating a year pre-loan). + stripped = "\n".join(line for line in text.splitlines() if "Student Loan" not in line) + result = parse_p60(stripped) + assert result.student_loan is None + + +def test_parse_p60_tax_code_missing_is_none() -> None: + """Some historical P60s may not print a `Final tax code` line.""" + text = _load("meta_uk_p60_2024_25.txt").replace("Final tax code", "XXX") + result = parse_p60(text) + assert result.tax_code is None + + +def test_parse_p60_sums_ni_across_letter_bands() -> None: + """Employees who cross NI letter bands mid-year get one row per letter.""" + text = _load("meta_uk_p60_2024_25.txt") + # Append a second NI letter row — same shape as the A row in the fixture. + extra = "C £6,396.00 £47,268.00 £1,000.00\n" + augmented = text + "\n" + extra + result = parse_p60(augmented) + # 5172.40 (letter A, in fixture) + 1000.00 (letter C, appended) + assert result.national_insurance == Decimal("6172.40") diff --git a/tests/test_processor.py b/tests/test_processor.py index 5b7fa76..f095d8e 100644 --- a/tests/test_processor.py +++ b/tests/test_processor.py @@ -246,3 +246,45 @@ async def test_rejects_zero_gross_zero_net(paperless: AsyncMock, extractor: Asyn factory = _SessionFactory([_FakeSession(existing_ids=[])]) with pytest.raises(ValueError, match="zero gross and net"): await process_document(42, factory, paperless, extractor) + + +async def test_p60_tag_routes_to_p60_handler(paperless: AsyncMock, extractor: AsyncMock, + monkeypatch: pytest.MonkeyPatch) -> None: + """A doc carrying the P60 tag id goes to _handle_p60 (not the payslip path).""" + p60_text = (FIXTURES / "meta_uk_p60_2024_25.txt").read_text(encoding="utf-8") + monkeypatch.setattr(processor, "_pdftotext", lambda _: p60_text) + paperless.get_document.return_value = {"id": 42, "title": "P60 2024-25", "tags": [7]} + + # Two sessions: one for combined dedup, one for the P60 insert. + factory = _SessionFactory([ + _FakeSession(existing_ids=[]), + _FakeSession(existing_ids=[]), + ]) + result = await process_document(42, factory, paperless, extractor, p60_tag_id=7) + + assert result.status == "inserted" + assert result.extractor == "p60_regex" + assert result.p60_id == 1 + # Extractor (Claude) must not be called for a P60. + extractor.extract.assert_not_called() + inserted_row = factory.used[1].added[0] + assert inserted_row.tax_year == "2024/25" + assert inserted_row.gross_pay == Decimal("232630.34") + assert inserted_row.income_tax == Decimal("95820.11") + + +async def test_p60_tag_absent_follows_payslip_path(paperless: AsyncMock, extractor: AsyncMock, + monkeypatch: pytest.MonkeyPatch) -> None: + """A regular payslip (no P60 tag) still goes through the payslip path.""" + meta_text = (FIXTURES / "meta_uk_2026_02.txt").read_text(encoding="utf-8") + monkeypatch.setattr(processor, "_pdftotext", lambda _: meta_text) + paperless.get_document.return_value = {"id": 42, "title": "Payslip", "tags": [3]} + + factory = _SessionFactory([ + _FakeSession(existing_ids=[]), + _FakeSession(existing_ids=[]), + ]) + result = await process_document(42, factory, paperless, extractor, p60_tag_id=7) + assert result.status == "inserted" + assert result.extractor == "meta_uk_regex" + assert result.p60_id is None