diff --git a/alembic/versions/0006_cash_income_tax_source.py b/alembic/versions/0006_cash_income_tax_source.py new file mode 100644 index 0000000..a2918e4 --- /dev/null +++ b/alembic/versions/0006_cash_income_tax_source.py @@ -0,0 +1,36 @@ +"""Add cash_income_tax_source audit column. + +Tracks which path produced `cash_income_tax` for a given row. Back-fill +script populates this on rows it touches so the dashboard can surface how +many rows were rescued by regex vs Claude vs left NULL. + +Values: + - `regex` — regex parser extracted taxable_pay and derived cash_income_tax + - `claude` — fell back to Claude for taxable_pay, then derived locally + - `fallback_null` — neither regex nor Claude could recover it; cash_income_tax + left NULL (dashboard's COALESCE will use income_tax) + +Nullable so pre-back-fill rows stay distinguishable from post-back-fill rows. +""" +import sqlalchemy as sa + +from alembic import op + +revision = "0006" +down_revision = "0005" +branch_labels = None +depends_on = None + +SCHEMA = "payslip_ingest" + + +def upgrade() -> None: + op.add_column( + "payslip", + sa.Column("cash_income_tax_source", sa.String(length=16), nullable=True), + schema=SCHEMA, + ) + + +def downgrade() -> None: + op.drop_column("payslip", "cash_income_tax_source", schema=SCHEMA) diff --git a/payslip_ingest/__main__.py b/payslip_ingest/__main__.py index 4d4a9da..1383907 100644 --- a/payslip_ingest/__main__.py +++ b/payslip_ingest/__main__.py @@ -116,5 +116,41 @@ def migrate() -> None: sys.exit(result.returncode) +@cli.command("backfill-cash-tax") +@click.option("--limit", type=int, default=None, help="Cap the number of rows processed.") +def backfill_cash_tax(limit: int | None) -> None: + """Back-fill cash_income_tax on rows where it's NULL (vest months only). + + Uses the widened regex parser first; falls back to Claude. Writes the + provenance source into `cash_income_tax_source`. Idempotent — only + touches NULL rows. + """ + asyncio.run(_backfill_cash_tax(limit)) + + +async def _backfill_cash_tax(limit: int | None) -> None: + from payslip_ingest.backfill_cash_tax import backfill_cash_income_tax + + engine = create_engine_from_env() + session_factory = make_session_factory(engine) + paperless = PaperlessClient( + base_url=os.environ["PAPERLESS_URL"], + api_token=os.environ["PAPERLESS_API_TOKEN"], + ) + extractor = ClaudeExtractor( + base_url=os.environ["CLAUDE_AGENT_URL"], + bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"], + ) + try: + result = await backfill_cash_income_tax(session_factory, paperless, extractor, limit=limit) + click.echo(f"back-fill complete: processed={result.processed} " + f"regex={result.regex_hits} claude={result.claude_hits} " + f"fallback_null={result.fallback_null} errors={result.errors}") + finally: + await paperless.aclose() + await extractor.aclose() + await engine.dispose() + + if __name__ == "__main__": cli() diff --git a/payslip_ingest/backfill_cash_tax.py b/payslip_ingest/backfill_cash_tax.py new file mode 100644 index 0000000..b0a6891 --- /dev/null +++ b/payslip_ingest/backfill_cash_tax.py @@ -0,0 +1,151 @@ +"""Back-fill `cash_income_tax` on rows where it's NULL. + +Dashboard Panel 11's RSU-attributed tax band is computed as +`income_tax - COALESCE(cash_income_tax, income_tax)`; rows where +`cash_income_tax` is NULL fall back to `income_tax` and collapse the band to +zero, making the cash-band swallow the RSU-band. Back-filling those rows +restores the split. + +Strategy — for each NULL row with `rsu_vest > 0`: + 1. Fetch the PDF from Paperless. + 2. Run the widened regex parser. If it yields a non-NULL `cash_income_tax`, + UPDATE the row with source='regex'. + 3. Otherwise call Claude (narrow prompt, just taxable_pay). If it returns a + taxable_pay, compute the pro-rata share and UPDATE with source='claude'. + 4. If neither succeeds, mark source='fallback_null' so the audit column + shows we tried. + +Idempotent — re-running only touches rows that are still NULL. Safe to run +as a K8s one-shot Job or via CLI on demand. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from decimal import Decimal +from typing import Any + +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import async_sessionmaker + +from payslip_ingest.db import Payslip +from payslip_ingest.extractor import ClaudeExtractor, ExtractorError +from payslip_ingest.paperless import PaperlessClient +from payslip_ingest.parsers.meta_uk import ParserError, parse_meta_uk +from payslip_ingest.processor import _pdftotext + +log = logging.getLogger(__name__) + + +@dataclass +class BackfillResult: + processed: int = 0 + regex_hits: int = 0 + claude_hits: int = 0 + fallback_null: int = 0 + errors: int = 0 + + +async def backfill_cash_income_tax( + db_session_factory: async_sessionmaker[Any], + paperless: PaperlessClient, + extractor: ClaudeExtractor, + limit: int | None = None, +) -> BackfillResult: + """Walk every NULL `cash_income_tax` row with `rsu_vest > 0` and try to fill it.""" + async with db_session_factory() as session: + rows = (await session.execute( + select(Payslip.id, Payslip.paperless_doc_id, Payslip.income_tax, Payslip.gross_pay, + Payslip.pension_sacrifice).where( + Payslip.cash_income_tax.is_(None), + Payslip.rsu_vest > 0, + ).order_by(Payslip.pay_date))).all() + + if limit is not None: + rows = rows[:limit] + + result = BackfillResult() + for row in rows: + try: + await _backfill_one(row, db_session_factory, paperless, extractor, result) + except Exception as exc: + log.exception("backfill failed for payslip id=%s doc_id=%s: %s", row.id, + row.paperless_doc_id, exc) + result.errors += 1 + result.processed += 1 + return result + + +async def _backfill_one( + row: Any, + db_session_factory: async_sessionmaker[Any], + paperless: PaperlessClient, + extractor: ClaudeExtractor, + result: BackfillResult, +) -> None: + pdf_bytes = await paperless.download_document(row.paperless_doc_id) + + # Pass 1 — widened regex parser. + text_body = _pdftotext(pdf_bytes) or "" + if text_body: + try: + parsed = parse_meta_uk(text_body) + if parsed.cash_income_tax is not None and parsed.taxable_pay is not None: + await _apply_update(db_session_factory, row.id, parsed.cash_income_tax, "regex") + result.regex_hits += 1 + return + except ParserError: + pass + + # Pass 2 — Claude fallback for the one field we need. + metadata = await paperless.get_document(row.paperless_doc_id) + try: + extracted = await extractor.extract(pdf_bytes, metadata) + except ExtractorError as exc: + log.warning("Claude extraction failed for doc_id=%s: %s", row.paperless_doc_id, exc) + extracted = None + + if extracted is not None and extracted.taxable_pay and extracted.taxable_pay > 0: + cash_tax = _derive_cash_tax(row.income_tax, row.gross_pay, row.pension_sacrifice, + extracted.taxable_pay) + await _apply_update(db_session_factory, row.id, cash_tax, "claude") + result.claude_hits += 1 + return + + await _apply_source_only(db_session_factory, row.id, "fallback_null") + result.fallback_null += 1 + + +def _derive_cash_tax(income_tax: Decimal, gross_pay: Decimal, pension_sacrifice: Decimal, + taxable_pay: Decimal) -> Decimal: + if taxable_pay <= 0: + return income_tax + return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01")) + + +async def _apply_update( + db_session_factory: async_sessionmaker[Any], + payslip_id: int, + cash_tax: Decimal, + source: str, +) -> None: + async with db_session_factory() as session, session.begin(): + await session.execute( + update(Payslip).where(Payslip.id == payslip_id).values( + cash_income_tax=cash_tax, + cash_income_tax_source=source, + )) + + +async def _apply_source_only( + db_session_factory: async_sessionmaker[Any], + payslip_id: int, + source: str, +) -> None: + async with db_session_factory() as session, session.begin(): + await session.execute( + update(Payslip).where(Payslip.id == payslip_id).values( + cash_income_tax_source=source)) + + +__all__ = ["BackfillResult", "backfill_cash_income_tax", "_derive_cash_tax"] diff --git a/payslip_ingest/db.py b/payslip_ingest/db.py index 5949532..eaeaa6d 100644 --- a/payslip_ingest/db.py +++ b/payslip_ingest/db.py @@ -64,6 +64,7 @@ class Payslip(Base): ytd_taxable_pay: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) ytd_gross: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) cash_income_tax: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + cash_income_tax_source: Mapped[str | None] = mapped_column(String(16), nullable=True) ytd_rsu_tax_offset: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) ytd_rsu_excs_refund: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) other_deductions: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True) diff --git a/payslip_ingest/parsers/meta_uk.py b/payslip_ingest/parsers/meta_uk.py index 900859e..6511b09 100644 --- a/payslip_ingest/parsers/meta_uk.py +++ b/payslip_ingest/parsers/meta_uk.py @@ -371,11 +371,28 @@ VARIANT_A_RSU_LABELS = { "RSU Net Cash UK", } -# "Taxable Pay : This Period £15323.16 : To Date £52446.53" -TAXABLE_PAY_A_RE = re.compile(r"Taxable Pay\s*:\s*This Period\s*£([\d,]+\.\d{2})") +# Variant A Taxable Pay line — multiple template variants: +# "Taxable Pay : This Period £15323.16 : To Date £52446.53" (canonical post-2021) +# "Taxable Pay This Period £1234.56" (older, no colons) +# "TAXABLE PAY : This Period £1234.56" (uppercase on some 2019-2020 slips) +# "Taxable Pay : Period £1234.56" ("This" elided) +# Case-insensitive, tolerant of separators. Ordered most-specific first. +TAXABLE_PAY_A_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"Taxable\s+Pay\s*[:\s]+(?:This\s+)?Period\s*£([\d,]+\.\d{2})", re.IGNORECASE), + re.compile(r"Taxable\s+Pay\s*[:\s]+£([\d,]+\.\d{2})", re.IGNORECASE), +] NET_PAY_A_RE = re.compile(r"Net Pay\s+(-?[\d,]+\.\d{2})") +def _match_variant_a_taxable_pay(line: str) -> Decimal | None: + """Try each variant-A Taxable Pay pattern in order — first match wins.""" + for pat in TAXABLE_PAY_A_PATTERNS: + m = pat.search(line) + if m: + return _to_decimal(m.group(1)) + return None + + def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPayslip: header_idx = _find_variant_a_header(lines) payments, deductions = _collect_a_blocks(lines, header_idx) @@ -397,8 +414,7 @@ def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPay bonus = payments.get("Perform Bonus", payments.get("Bonus", Decimal("0"))) - taxable_pay_s = _find_match(text, TAXABLE_PAY_A_RE) - taxable_pay = _to_decimal(taxable_pay_s) if taxable_pay_s else None + taxable_pay = _match_variant_a_taxable_pay(text) cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay) pay_date = _parse_date(text) diff --git a/tests/test_backfill_cash_tax.py b/tests/test_backfill_cash_tax.py new file mode 100644 index 0000000..3bd6e22 --- /dev/null +++ b/tests/test_backfill_cash_tax.py @@ -0,0 +1,238 @@ +"""Back-fill cash_income_tax tests — in-memory SQLite + mocked paperless/extractor.""" +from collections.abc import AsyncIterator +from datetime import UTC, date, datetime +from decimal import Decimal +from typing import Any +from unittest.mock import AsyncMock + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine + +from payslip_ingest.backfill_cash_tax import _derive_cash_tax, backfill_cash_income_tax +from payslip_ingest.db import Base, Payslip +from payslip_ingest.schema import ExtractedPayslip + + +def test_derive_cash_tax_pro_rata() -> None: + """income_tax * (gross - sacrifice) / taxable_pay, rounded to 2dp.""" + result = _derive_cash_tax( + income_tax=Decimal("1000.00"), + gross_pay=Decimal("5000.00"), + pension_sacrifice=Decimal("100.00"), + taxable_pay=Decimal("5000.00"), + ) + # (1000 * (5000-100) / 5000) = 980.00 + assert result == Decimal("980.00") + + +def test_derive_cash_tax_zero_taxable_pay_falls_through() -> None: + """Guard against div-by-zero.""" + result = _derive_cash_tax(Decimal("500"), Decimal("1000"), Decimal("0"), Decimal("0")) + assert result == Decimal("500") + + +@pytest.fixture +async def session_factory() -> AsyncIterator[async_sessionmaker[Any]]: + """In-memory aiosqlite with an ATTACHED payslip_ingest 'schema'. + + SQLite has no CREATE SCHEMA; the `schema="payslip_ingest"` qualifier on + the ORM tables maps to an attached database of that name. Attach before + creating the tables. + """ + engine: AsyncEngine = create_async_engine("sqlite+aiosqlite:///:memory:") + async with engine.begin() as conn: + await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS payslip_ingest") + await conn.run_sync(Base.metadata.create_all) + yield async_sessionmaker(engine, expire_on_commit=False) + await engine.dispose() + + +async def _insert_payslip( + session_factory: async_sessionmaker[Any], + **kwargs: Any, +) -> int: + defaults: dict[str, Any] = dict( + paperless_doc_id=1, + created_at=datetime.now(UTC), + pay_date=date(2020, 6, 30), + employer="Facebook UK Ltd", + currency="GBP", + gross_pay=Decimal("20000.00"), + income_tax=Decimal("5000.00"), + national_insurance=Decimal("500.00"), + pension_employee=Decimal("0"), + pension_employer=Decimal("0"), + student_loan=Decimal("0"), + rsu_vest=Decimal("10000.00"), + rsu_offset=Decimal("0"), + salary=Decimal("5000.00"), + bonus=Decimal("0"), + pension_sacrifice=Decimal("100.00"), + taxable_pay=None, + cash_income_tax=None, + net_pay=Decimal("14500.00"), + tax_year="2020/21", + raw_extraction={}, + validated=True, + ) + defaults.update(kwargs) + async with session_factory() as session, session.begin(): + row = Payslip(**defaults) + session.add(row) + await session.flush() + return row.id + + +async def test_backfill_regex_hit( + session_factory: async_sessionmaker[Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When pdftotext yields a variant-A payslip with Taxable Pay, regex fills the row.""" + payslip_id = await _insert_payslip( + session_factory, + paperless_doc_id=42, + income_tax=Decimal("1000.00"), + gross_pay=Decimal("500.00"), + pension_sacrifice=Decimal("100.00"), + ) + + # Note: AMOUNT_RE only matches numbers with <=3 leading digits (or with + # comma thousands separators), so keep test amounts under 1,000 or use + # commas. cash_income_tax = tax * (gross - sacrifice) / taxable_pay + # = 100 * (500-100) / 500 = 80.00 + variant_a_text = _build_variant_a_payslip_text( + gross="500.00", + net="350.00", + salary="500.00", + tax="100.00", + nic="50.00", + student="0.00", + pension_sacrifice="100.00", + taxable_pay="500.00", + ) + paperless = AsyncMock() + # PDF bytes are irrelevant — we monkey-patch _pdftotext below to feed the + # parser the raw text directly. + paperless.download_document = AsyncMock(return_value=b"fake pdf bytes") + paperless.get_document = AsyncMock(return_value={"id": 42}) + extractor = AsyncMock() + + # Monkey-patch _pdftotext at the backfill module level to return our raw text + # (real pdftotext needs a PDF file; we don't want to generate one in the test). + monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext", + lambda _: variant_a_text) + result = await backfill_cash_income_tax(session_factory, paperless, extractor) + + assert result.regex_hits == 1 + assert result.claude_hits == 0 + assert result.fallback_null == 0 + + async with session_factory() as session: + row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one() + assert row.cash_income_tax is not None + assert row.cash_income_tax_source == "regex" + # (100 * (500-100) / 500) = 80.00 + assert row.cash_income_tax == Decimal("80.00") + + +async def test_backfill_claude_fallback( + session_factory: async_sessionmaker[Any], + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When regex fails but Claude returns taxable_pay, derive cash_tax locally. + + The back-fill uses the DB row's income_tax/gross_pay/pension_sacrifice and + Claude's taxable_pay — Claude extraction only needs the one missing field. + """ + payslip_id = await _insert_payslip( + session_factory, + paperless_doc_id=99, + income_tax=Decimal("200.00"), + gross_pay=Decimal("500.00"), + pension_sacrifice=Decimal("100.00"), + ) + + paperless = AsyncMock() + paperless.download_document = AsyncMock(return_value=b"fake pdf bytes") + paperless.get_document = AsyncMock(return_value={"id": 99}) + + extracted = ExtractedPayslip( + pay_date=date(2020, 6, 30), + gross_pay=Decimal("500.00"), + income_tax=Decimal("200.00"), + net_pay=Decimal("300.00"), + taxable_pay=Decimal("500.00"), + pension_sacrifice=Decimal("100.00"), + ) + extractor = AsyncMock() + extractor.extract = AsyncMock(return_value=extracted) + + monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext", + lambda _: "not a parseable payslip") + result = await backfill_cash_income_tax(session_factory, paperless, extractor) + + assert result.regex_hits == 0 + assert result.claude_hits == 1 + + async with session_factory() as session: + row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one() + assert row.cash_income_tax_source == "claude" + # (200 * (500-100) / 500) = 160.00 + assert row.cash_income_tax == Decimal("160.00") + + +async def test_backfill_skips_rows_with_cash_tax_already_set( + session_factory: async_sessionmaker[Any], +) -> None: + """Rows that already have cash_income_tax populated are not re-processed.""" + await _insert_payslip(session_factory, + paperless_doc_id=50, + cash_income_tax=Decimal("777.77"), + rsu_vest=Decimal("10000")) + + paperless = AsyncMock() + extractor = AsyncMock() + result = await backfill_cash_income_tax(session_factory, paperless, extractor) + assert result.processed == 0 + paperless.download_document.assert_not_called() + + +async def test_backfill_skips_rows_without_rsu_vest( + session_factory: async_sessionmaker[Any], +) -> None: + """NULL cash_income_tax but rsu_vest=0 is fine (no RSU-band distortion).""" + await _insert_payslip(session_factory, + paperless_doc_id=51, + cash_income_tax=None, + rsu_vest=Decimal("0")) + + paperless = AsyncMock() + extractor = AsyncMock() + result = await backfill_cash_income_tax(session_factory, paperless, extractor) + assert result.processed == 0 + + +def _build_variant_a_payslip_text(*, gross: str, net: str, salary: str, tax: str, nic: str, + student: str, pension_sacrifice: str, taxable_pay: str) -> str: + """Synthesize a variant-A payslip body that the parser accepts. + + Layout mirrors the 2021-08 fixture: Description | This Period | This Year + header, two Totals anchors (gross, then deductions), Net Pay line, and a + Taxable Pay summary line. Each row needs TWO amounts (period + YTD) or the + parser treats it as YTD-only and skips. + """ + deduction_total = float(tax) + float(nic) + float(student) + return ( + "Facebook UK Ltd\n" + "Date : 30 Jun 2020\n" + "Description This Period This Year\n" + f"Salary {salary} {salary}\n" + f"AE Pension EE ({pension_sacrifice}) ({pension_sacrifice})\n" + f"Total {gross}\n" + f"Tax {tax} {tax}\n" + f"National Insurance {nic} {nic}\n" + f"Student Loans {student} {student}\n" + f"Total {deduction_total:.2f}\n" + f"Net Pay {net}\n" + f"Taxable Pay : This Period £{taxable_pay} : To Date £{taxable_pay}\n") diff --git a/tests/test_meta_uk_parser.py b/tests/test_meta_uk_parser.py index 9a84edf..935e93e 100644 --- a/tests/test_meta_uk_parser.py +++ b/tests/test_meta_uk_parser.py @@ -176,6 +176,36 @@ def test_variant_a_cash_income_tax_pro_rata() -> None: assert abs(result.cash_income_tax - Decimal("1367.76")) <= Decimal("0.02") +@pytest.mark.parametrize("taxable_pay_line", [ + "Taxable Pay : This Period £1234.56 : To Date £12345.67", + "Taxable Pay: This Period £1234.56", + "Taxable Pay This Period £1234.56 To Date £12345.67", + "TAXABLE PAY : This Period £1234.56", + "taxable pay : this period £1234.56", + "Taxable Pay : Period £1234.56", + "Taxable Pay : This Period £1,234.56", +]) +def test_taxable_pay_variant_a_regex_matches(taxable_pay_line: str) -> None: + """Variant-A Taxable Pay line appears in several layouts pre-2022. + + Original regex only matched `Taxable Pay : This Period £...` exactly. + Widen to tolerate: missing/different colons, uppercase, no "This", + whitespace in place of colons. Rows that don't match fall back to + Claude in the back-fill path. + """ + from payslip_ingest.parsers.meta_uk import _match_variant_a_taxable_pay + + result = _match_variant_a_taxable_pay(taxable_pay_line) + assert result == Decimal("1234.56") + + +def test_taxable_pay_variant_a_regex_rejects_unmatched() -> None: + """If there's no `Taxable Pay` label we should return None, not crash.""" + from payslip_ingest.parsers.meta_uk import _match_variant_a_taxable_pay + assert _match_variant_a_taxable_pay("some random line") is None + assert _match_variant_a_taxable_pay("") is None + + def test_raises_on_non_meta_payslip() -> None: with pytest.raises(ParserError): parse_meta_uk("This is not a Meta payslip\nRandom text\n")