payslip-ingest/tests/test_backfill_cash_tax.py
Viktor Barzin 3b9c69bfd3 backfill: cash_income_tax back-fill for variant-A NULL rows
Phase B of RSU tax spike fix. Vest-month spikes on the dashboard trace to
variant-A slips (2019–mid-2022) where `cash_income_tax` is NULL — the
dashboard's COALESCE fallback returns full PAYE, masquerading as cash tax.

Three changes:

1. Widen variant-A Taxable Pay regex. Original pattern only matched
   `Taxable Pay : This Period £...`; add case-insensitive variants that
   tolerate missing/different colons, elided "This", and uppercase labels.
   Covers older 2019-2020 templates that failed the previous match.

2. New `backfill_cash_income_tax` module — walks every NULL-cash-tax row
   with rsu_vest > 0, re-downloads the PDF from Paperless, runs the
   widened regex parser, falls back to Claude for taxable_pay extraction
   if regex still misses, and derives cash_income_tax pro-rata. Records
   provenance in new `cash_income_tax_source` column (regex/claude/
   fallback_null). Idempotent — only touches NULL rows.

3. Migration 0006 adds the `cash_income_tax_source` audit column.

CLI: `python -m payslip_ingest backfill-cash-tax [--limit N]`. Meant to
run as a one-shot K8s Job after `alembic upgrade head`.

Part of: code-860
2026-04-19 18:15:18 +00:00

238 lines
9.2 KiB
Python

"""Back-fill cash_income_tax tests — in-memory SQLite + mocked paperless/extractor."""
from collections.abc import AsyncIterator
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
from unittest.mock import AsyncMock
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from payslip_ingest.backfill_cash_tax import _derive_cash_tax, backfill_cash_income_tax
from payslip_ingest.db import Base, Payslip
from payslip_ingest.schema import ExtractedPayslip
def test_derive_cash_tax_pro_rata() -> None:
"""income_tax * (gross - sacrifice) / taxable_pay, rounded to 2dp."""
result = _derive_cash_tax(
income_tax=Decimal("1000.00"),
gross_pay=Decimal("5000.00"),
pension_sacrifice=Decimal("100.00"),
taxable_pay=Decimal("5000.00"),
)
# (1000 * (5000-100) / 5000) = 980.00
assert result == Decimal("980.00")
def test_derive_cash_tax_zero_taxable_pay_falls_through() -> None:
"""Guard against div-by-zero."""
result = _derive_cash_tax(Decimal("500"), Decimal("1000"), Decimal("0"), Decimal("0"))
assert result == Decimal("500")
@pytest.fixture
async def session_factory() -> AsyncIterator[async_sessionmaker[Any]]:
"""In-memory aiosqlite with an ATTACHED payslip_ingest 'schema'.
SQLite has no CREATE SCHEMA; the `schema="payslip_ingest"` qualifier on
the ORM tables maps to an attached database of that name. Attach before
creating the tables.
"""
engine: AsyncEngine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS payslip_ingest")
await conn.run_sync(Base.metadata.create_all)
yield async_sessionmaker(engine, expire_on_commit=False)
await engine.dispose()
async def _insert_payslip(
session_factory: async_sessionmaker[Any],
**kwargs: Any,
) -> int:
defaults: dict[str, Any] = dict(
paperless_doc_id=1,
created_at=datetime.now(UTC),
pay_date=date(2020, 6, 30),
employer="Facebook UK Ltd",
currency="GBP",
gross_pay=Decimal("20000.00"),
income_tax=Decimal("5000.00"),
national_insurance=Decimal("500.00"),
pension_employee=Decimal("0"),
pension_employer=Decimal("0"),
student_loan=Decimal("0"),
rsu_vest=Decimal("10000.00"),
rsu_offset=Decimal("0"),
salary=Decimal("5000.00"),
bonus=Decimal("0"),
pension_sacrifice=Decimal("100.00"),
taxable_pay=None,
cash_income_tax=None,
net_pay=Decimal("14500.00"),
tax_year="2020/21",
raw_extraction={},
validated=True,
)
defaults.update(kwargs)
async with session_factory() as session, session.begin():
row = Payslip(**defaults)
session.add(row)
await session.flush()
return row.id
async def test_backfill_regex_hit(
session_factory: async_sessionmaker[Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When pdftotext yields a variant-A payslip with Taxable Pay, regex fills the row."""
payslip_id = await _insert_payslip(
session_factory,
paperless_doc_id=42,
income_tax=Decimal("1000.00"),
gross_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
# Note: AMOUNT_RE only matches numbers with <=3 leading digits (or with
# comma thousands separators), so keep test amounts under 1,000 or use
# commas. cash_income_tax = tax * (gross - sacrifice) / taxable_pay
# = 100 * (500-100) / 500 = 80.00
variant_a_text = _build_variant_a_payslip_text(
gross="500.00",
net="350.00",
salary="500.00",
tax="100.00",
nic="50.00",
student="0.00",
pension_sacrifice="100.00",
taxable_pay="500.00",
)
paperless = AsyncMock()
# PDF bytes are irrelevant — we monkey-patch _pdftotext below to feed the
# parser the raw text directly.
paperless.download_document = AsyncMock(return_value=b"fake pdf bytes")
paperless.get_document = AsyncMock(return_value={"id": 42})
extractor = AsyncMock()
# Monkey-patch _pdftotext at the backfill module level to return our raw text
# (real pdftotext needs a PDF file; we don't want to generate one in the test).
monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext",
lambda _: variant_a_text)
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.regex_hits == 1
assert result.claude_hits == 0
assert result.fallback_null == 0
async with session_factory() as session:
row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one()
assert row.cash_income_tax is not None
assert row.cash_income_tax_source == "regex"
# (100 * (500-100) / 500) = 80.00
assert row.cash_income_tax == Decimal("80.00")
async def test_backfill_claude_fallback(
session_factory: async_sessionmaker[Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When regex fails but Claude returns taxable_pay, derive cash_tax locally.
The back-fill uses the DB row's income_tax/gross_pay/pension_sacrifice and
Claude's taxable_pay — Claude extraction only needs the one missing field.
"""
payslip_id = await _insert_payslip(
session_factory,
paperless_doc_id=99,
income_tax=Decimal("200.00"),
gross_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
paperless = AsyncMock()
paperless.download_document = AsyncMock(return_value=b"fake pdf bytes")
paperless.get_document = AsyncMock(return_value={"id": 99})
extracted = ExtractedPayslip(
pay_date=date(2020, 6, 30),
gross_pay=Decimal("500.00"),
income_tax=Decimal("200.00"),
net_pay=Decimal("300.00"),
taxable_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
extractor = AsyncMock()
extractor.extract = AsyncMock(return_value=extracted)
monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext",
lambda _: "not a parseable payslip")
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.regex_hits == 0
assert result.claude_hits == 1
async with session_factory() as session:
row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one()
assert row.cash_income_tax_source == "claude"
# (200 * (500-100) / 500) = 160.00
assert row.cash_income_tax == Decimal("160.00")
async def test_backfill_skips_rows_with_cash_tax_already_set(
session_factory: async_sessionmaker[Any],
) -> None:
"""Rows that already have cash_income_tax populated are not re-processed."""
await _insert_payslip(session_factory,
paperless_doc_id=50,
cash_income_tax=Decimal("777.77"),
rsu_vest=Decimal("10000"))
paperless = AsyncMock()
extractor = AsyncMock()
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.processed == 0
paperless.download_document.assert_not_called()
async def test_backfill_skips_rows_without_rsu_vest(
session_factory: async_sessionmaker[Any],
) -> None:
"""NULL cash_income_tax but rsu_vest=0 is fine (no RSU-band distortion)."""
await _insert_payslip(session_factory,
paperless_doc_id=51,
cash_income_tax=None,
rsu_vest=Decimal("0"))
paperless = AsyncMock()
extractor = AsyncMock()
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.processed == 0
def _build_variant_a_payslip_text(*, gross: str, net: str, salary: str, tax: str, nic: str,
student: str, pension_sacrifice: str, taxable_pay: str) -> str:
"""Synthesize a variant-A payslip body that the parser accepts.
Layout mirrors the 2021-08 fixture: Description | This Period | This Year
header, two Totals anchors (gross, then deductions), Net Pay line, and a
Taxable Pay summary line. Each row needs TWO amounts (period + YTD) or the
parser treats it as YTD-only and skips.
"""
deduction_total = float(tax) + float(nic) + float(student)
return (
"Facebook UK Ltd\n"
"Date : 30 Jun 2020\n"
"Description This Period This Year\n"
f"Salary {salary} {salary}\n"
f"AE Pension EE ({pension_sacrifice}) ({pension_sacrifice})\n"
f"Total {gross}\n"
f"Tax {tax} {tax}\n"
f"National Insurance {nic} {nic}\n"
f"Student Loans {student} {student}\n"
f"Total {deduction_total:.2f}\n"
f"Net Pay {net}\n"
f"Taxable Pay : This Period £{taxable_pay} : To Date £{taxable_pay}\n")