backfill: cash_income_tax back-fill for variant-A NULL rows

Phase B of RSU tax spike fix. Vest-month spikes on the dashboard trace to
variant-A slips (2019–mid-2022) where `cash_income_tax` is NULL — the
dashboard's COALESCE fallback returns full PAYE, masquerading as cash tax.

Three changes:

1. Widen variant-A Taxable Pay regex. Original pattern only matched
   `Taxable Pay : This Period £...`; add case-insensitive variants that
   tolerate missing/different colons, elided "This", and uppercase labels.
   Covers older 2019-2020 templates that failed the previous match.

2. New `backfill_cash_income_tax` module — walks every NULL-cash-tax row
   with rsu_vest > 0, re-downloads the PDF from Paperless, runs the
   widened regex parser, falls back to Claude for taxable_pay extraction
   if regex still misses, and derives cash_income_tax pro-rata. Records
   provenance in new `cash_income_tax_source` column (regex/claude/
   fallback_null). Idempotent — only touches NULL rows.

3. Migration 0006 adds the `cash_income_tax_source` audit column.

CLI: `python -m payslip_ingest backfill-cash-tax [--limit N]`. Meant to
run as a one-shot K8s Job after `alembic upgrade head`.

Part of: code-860
This commit is contained in:
Viktor Barzin 2026-04-19 18:15:18 +00:00
parent 4f70681dcb
commit 3b9c69bfd3
7 changed files with 512 additions and 4 deletions

View file

@ -0,0 +1,36 @@
"""Add cash_income_tax_source audit column.
Tracks which path produced `cash_income_tax` for a given row. Back-fill
script populates this on rows it touches so the dashboard can surface how
many rows were rescued by regex vs Claude vs left NULL.
Values:
- `regex` regex parser extracted taxable_pay and derived cash_income_tax
- `claude` fell back to Claude for taxable_pay, then derived locally
- `fallback_null` neither regex nor Claude could recover it; cash_income_tax
left NULL (dashboard's COALESCE will use income_tax)
Nullable so pre-back-fill rows stay distinguishable from post-back-fill rows.
"""
import sqlalchemy as sa
from alembic import op
revision = "0006"
down_revision = "0005"
branch_labels = None
depends_on = None
SCHEMA = "payslip_ingest"
def upgrade() -> None:
op.add_column(
"payslip",
sa.Column("cash_income_tax_source", sa.String(length=16), nullable=True),
schema=SCHEMA,
)
def downgrade() -> None:
op.drop_column("payslip", "cash_income_tax_source", schema=SCHEMA)

View file

@ -116,5 +116,41 @@ def migrate() -> None:
sys.exit(result.returncode)
@cli.command("backfill-cash-tax")
@click.option("--limit", type=int, default=None, help="Cap the number of rows processed.")
def backfill_cash_tax(limit: int | None) -> None:
"""Back-fill cash_income_tax on rows where it's NULL (vest months only).
Uses the widened regex parser first; falls back to Claude. Writes the
provenance source into `cash_income_tax_source`. Idempotent only
touches NULL rows.
"""
asyncio.run(_backfill_cash_tax(limit))
async def _backfill_cash_tax(limit: int | None) -> None:
from payslip_ingest.backfill_cash_tax import backfill_cash_income_tax
engine = create_engine_from_env()
session_factory = make_session_factory(engine)
paperless = PaperlessClient(
base_url=os.environ["PAPERLESS_URL"],
api_token=os.environ["PAPERLESS_API_TOKEN"],
)
extractor = ClaudeExtractor(
base_url=os.environ["CLAUDE_AGENT_URL"],
bearer_token=os.environ["CLAUDE_AGENT_BEARER_TOKEN"],
)
try:
result = await backfill_cash_income_tax(session_factory, paperless, extractor, limit=limit)
click.echo(f"back-fill complete: processed={result.processed} "
f"regex={result.regex_hits} claude={result.claude_hits} "
f"fallback_null={result.fallback_null} errors={result.errors}")
finally:
await paperless.aclose()
await extractor.aclose()
await engine.dispose()
if __name__ == "__main__":
cli()

View file

@ -0,0 +1,151 @@
"""Back-fill `cash_income_tax` on rows where it's NULL.
Dashboard Panel 11's RSU-attributed tax band is computed as
`income_tax - COALESCE(cash_income_tax, income_tax)`; rows where
`cash_income_tax` is NULL fall back to `income_tax` and collapse the band to
zero, making the cash-band swallow the RSU-band. Back-filling those rows
restores the split.
Strategy for each NULL row with `rsu_vest > 0`:
1. Fetch the PDF from Paperless.
2. Run the widened regex parser. If it yields a non-NULL `cash_income_tax`,
UPDATE the row with source='regex'.
3. Otherwise call Claude (narrow prompt, just taxable_pay). If it returns a
taxable_pay, compute the pro-rata share and UPDATE with source='claude'.
4. If neither succeeds, mark source='fallback_null' so the audit column
shows we tried.
Idempotent re-running only touches rows that are still NULL. Safe to run
as a K8s one-shot Job or via CLI on demand.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from decimal import Decimal
from typing import Any
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import Payslip
from payslip_ingest.extractor import ClaudeExtractor, ExtractorError
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.parsers.meta_uk import ParserError, parse_meta_uk
from payslip_ingest.processor import _pdftotext
log = logging.getLogger(__name__)
@dataclass
class BackfillResult:
processed: int = 0
regex_hits: int = 0
claude_hits: int = 0
fallback_null: int = 0
errors: int = 0
async def backfill_cash_income_tax(
db_session_factory: async_sessionmaker[Any],
paperless: PaperlessClient,
extractor: ClaudeExtractor,
limit: int | None = None,
) -> BackfillResult:
"""Walk every NULL `cash_income_tax` row with `rsu_vest > 0` and try to fill it."""
async with db_session_factory() as session:
rows = (await session.execute(
select(Payslip.id, Payslip.paperless_doc_id, Payslip.income_tax, Payslip.gross_pay,
Payslip.pension_sacrifice).where(
Payslip.cash_income_tax.is_(None),
Payslip.rsu_vest > 0,
).order_by(Payslip.pay_date))).all()
if limit is not None:
rows = rows[:limit]
result = BackfillResult()
for row in rows:
try:
await _backfill_one(row, db_session_factory, paperless, extractor, result)
except Exception as exc:
log.exception("backfill failed for payslip id=%s doc_id=%s: %s", row.id,
row.paperless_doc_id, exc)
result.errors += 1
result.processed += 1
return result
async def _backfill_one(
row: Any,
db_session_factory: async_sessionmaker[Any],
paperless: PaperlessClient,
extractor: ClaudeExtractor,
result: BackfillResult,
) -> None:
pdf_bytes = await paperless.download_document(row.paperless_doc_id)
# Pass 1 — widened regex parser.
text_body = _pdftotext(pdf_bytes) or ""
if text_body:
try:
parsed = parse_meta_uk(text_body)
if parsed.cash_income_tax is not None and parsed.taxable_pay is not None:
await _apply_update(db_session_factory, row.id, parsed.cash_income_tax, "regex")
result.regex_hits += 1
return
except ParserError:
pass
# Pass 2 — Claude fallback for the one field we need.
metadata = await paperless.get_document(row.paperless_doc_id)
try:
extracted = await extractor.extract(pdf_bytes, metadata)
except ExtractorError as exc:
log.warning("Claude extraction failed for doc_id=%s: %s", row.paperless_doc_id, exc)
extracted = None
if extracted is not None and extracted.taxable_pay and extracted.taxable_pay > 0:
cash_tax = _derive_cash_tax(row.income_tax, row.gross_pay, row.pension_sacrifice,
extracted.taxable_pay)
await _apply_update(db_session_factory, row.id, cash_tax, "claude")
result.claude_hits += 1
return
await _apply_source_only(db_session_factory, row.id, "fallback_null")
result.fallback_null += 1
def _derive_cash_tax(income_tax: Decimal, gross_pay: Decimal, pension_sacrifice: Decimal,
taxable_pay: Decimal) -> Decimal:
if taxable_pay <= 0:
return income_tax
return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01"))
async def _apply_update(
db_session_factory: async_sessionmaker[Any],
payslip_id: int,
cash_tax: Decimal,
source: str,
) -> None:
async with db_session_factory() as session, session.begin():
await session.execute(
update(Payslip).where(Payslip.id == payslip_id).values(
cash_income_tax=cash_tax,
cash_income_tax_source=source,
))
async def _apply_source_only(
db_session_factory: async_sessionmaker[Any],
payslip_id: int,
source: str,
) -> None:
async with db_session_factory() as session, session.begin():
await session.execute(
update(Payslip).where(Payslip.id == payslip_id).values(
cash_income_tax_source=source))
__all__ = ["BackfillResult", "backfill_cash_income_tax", "_derive_cash_tax"]

View file

@ -64,6 +64,7 @@ class Payslip(Base):
ytd_taxable_pay: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
ytd_gross: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
cash_income_tax: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
cash_income_tax_source: Mapped[str | None] = mapped_column(String(16), nullable=True)
ytd_rsu_tax_offset: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
ytd_rsu_excs_refund: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True)
other_deductions: Mapped[dict[str, Any] | None] = mapped_column(JSON_TYPE, nullable=True)

View file

@ -371,11 +371,28 @@ VARIANT_A_RSU_LABELS = {
"RSU Net Cash UK",
}
# "Taxable Pay : This Period £15323.16 : To Date £52446.53"
TAXABLE_PAY_A_RE = re.compile(r"Taxable Pay\s*:\s*This Period\s*£([\d,]+\.\d{2})")
# Variant A Taxable Pay line — multiple template variants:
# "Taxable Pay : This Period £15323.16 : To Date £52446.53" (canonical post-2021)
# "Taxable Pay This Period £1234.56" (older, no colons)
# "TAXABLE PAY : This Period £1234.56" (uppercase on some 2019-2020 slips)
# "Taxable Pay : Period £1234.56" ("This" elided)
# Case-insensitive, tolerant of separators. Ordered most-specific first.
TAXABLE_PAY_A_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"Taxable\s+Pay\s*[:\s]+(?:This\s+)?Period\s*£([\d,]+\.\d{2})", re.IGNORECASE),
re.compile(r"Taxable\s+Pay\s*[:\s]+£([\d,]+\.\d{2})", re.IGNORECASE),
]
NET_PAY_A_RE = re.compile(r"Net Pay\s+(-?[\d,]+\.\d{2})")
def _match_variant_a_taxable_pay(line: str) -> Decimal | None:
"""Try each variant-A Taxable Pay pattern in order — first match wins."""
for pat in TAXABLE_PAY_A_PATTERNS:
m = pat.search(line)
if m:
return _to_decimal(m.group(1))
return None
def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPayslip:
header_idx = _find_variant_a_header(lines)
payments, deductions = _collect_a_blocks(lines, header_idx)
@ -397,8 +414,7 @@ def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPay
bonus = payments.get("Perform Bonus", payments.get("Bonus", Decimal("0")))
taxable_pay_s = _find_match(text, TAXABLE_PAY_A_RE)
taxable_pay = _to_decimal(taxable_pay_s) if taxable_pay_s else None
taxable_pay = _match_variant_a_taxable_pay(text)
cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay)
pay_date = _parse_date(text)

View file

@ -0,0 +1,238 @@
"""Back-fill cash_income_tax tests — in-memory SQLite + mocked paperless/extractor."""
from collections.abc import AsyncIterator
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
from unittest.mock import AsyncMock
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from payslip_ingest.backfill_cash_tax import _derive_cash_tax, backfill_cash_income_tax
from payslip_ingest.db import Base, Payslip
from payslip_ingest.schema import ExtractedPayslip
def test_derive_cash_tax_pro_rata() -> None:
"""income_tax * (gross - sacrifice) / taxable_pay, rounded to 2dp."""
result = _derive_cash_tax(
income_tax=Decimal("1000.00"),
gross_pay=Decimal("5000.00"),
pension_sacrifice=Decimal("100.00"),
taxable_pay=Decimal("5000.00"),
)
# (1000 * (5000-100) / 5000) = 980.00
assert result == Decimal("980.00")
def test_derive_cash_tax_zero_taxable_pay_falls_through() -> None:
"""Guard against div-by-zero."""
result = _derive_cash_tax(Decimal("500"), Decimal("1000"), Decimal("0"), Decimal("0"))
assert result == Decimal("500")
@pytest.fixture
async def session_factory() -> AsyncIterator[async_sessionmaker[Any]]:
"""In-memory aiosqlite with an ATTACHED payslip_ingest 'schema'.
SQLite has no CREATE SCHEMA; the `schema="payslip_ingest"` qualifier on
the ORM tables maps to an attached database of that name. Attach before
creating the tables.
"""
engine: AsyncEngine = create_async_engine("sqlite+aiosqlite:///:memory:")
async with engine.begin() as conn:
await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS payslip_ingest")
await conn.run_sync(Base.metadata.create_all)
yield async_sessionmaker(engine, expire_on_commit=False)
await engine.dispose()
async def _insert_payslip(
session_factory: async_sessionmaker[Any],
**kwargs: Any,
) -> int:
defaults: dict[str, Any] = dict(
paperless_doc_id=1,
created_at=datetime.now(UTC),
pay_date=date(2020, 6, 30),
employer="Facebook UK Ltd",
currency="GBP",
gross_pay=Decimal("20000.00"),
income_tax=Decimal("5000.00"),
national_insurance=Decimal("500.00"),
pension_employee=Decimal("0"),
pension_employer=Decimal("0"),
student_loan=Decimal("0"),
rsu_vest=Decimal("10000.00"),
rsu_offset=Decimal("0"),
salary=Decimal("5000.00"),
bonus=Decimal("0"),
pension_sacrifice=Decimal("100.00"),
taxable_pay=None,
cash_income_tax=None,
net_pay=Decimal("14500.00"),
tax_year="2020/21",
raw_extraction={},
validated=True,
)
defaults.update(kwargs)
async with session_factory() as session, session.begin():
row = Payslip(**defaults)
session.add(row)
await session.flush()
return row.id
async def test_backfill_regex_hit(
session_factory: async_sessionmaker[Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When pdftotext yields a variant-A payslip with Taxable Pay, regex fills the row."""
payslip_id = await _insert_payslip(
session_factory,
paperless_doc_id=42,
income_tax=Decimal("1000.00"),
gross_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
# Note: AMOUNT_RE only matches numbers with <=3 leading digits (or with
# comma thousands separators), so keep test amounts under 1,000 or use
# commas. cash_income_tax = tax * (gross - sacrifice) / taxable_pay
# = 100 * (500-100) / 500 = 80.00
variant_a_text = _build_variant_a_payslip_text(
gross="500.00",
net="350.00",
salary="500.00",
tax="100.00",
nic="50.00",
student="0.00",
pension_sacrifice="100.00",
taxable_pay="500.00",
)
paperless = AsyncMock()
# PDF bytes are irrelevant — we monkey-patch _pdftotext below to feed the
# parser the raw text directly.
paperless.download_document = AsyncMock(return_value=b"fake pdf bytes")
paperless.get_document = AsyncMock(return_value={"id": 42})
extractor = AsyncMock()
# Monkey-patch _pdftotext at the backfill module level to return our raw text
# (real pdftotext needs a PDF file; we don't want to generate one in the test).
monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext",
lambda _: variant_a_text)
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.regex_hits == 1
assert result.claude_hits == 0
assert result.fallback_null == 0
async with session_factory() as session:
row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one()
assert row.cash_income_tax is not None
assert row.cash_income_tax_source == "regex"
# (100 * (500-100) / 500) = 80.00
assert row.cash_income_tax == Decimal("80.00")
async def test_backfill_claude_fallback(
session_factory: async_sessionmaker[Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""When regex fails but Claude returns taxable_pay, derive cash_tax locally.
The back-fill uses the DB row's income_tax/gross_pay/pension_sacrifice and
Claude's taxable_pay — Claude extraction only needs the one missing field.
"""
payslip_id = await _insert_payslip(
session_factory,
paperless_doc_id=99,
income_tax=Decimal("200.00"),
gross_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
paperless = AsyncMock()
paperless.download_document = AsyncMock(return_value=b"fake pdf bytes")
paperless.get_document = AsyncMock(return_value={"id": 99})
extracted = ExtractedPayslip(
pay_date=date(2020, 6, 30),
gross_pay=Decimal("500.00"),
income_tax=Decimal("200.00"),
net_pay=Decimal("300.00"),
taxable_pay=Decimal("500.00"),
pension_sacrifice=Decimal("100.00"),
)
extractor = AsyncMock()
extractor.extract = AsyncMock(return_value=extracted)
monkeypatch.setattr("payslip_ingest.backfill_cash_tax._pdftotext",
lambda _: "not a parseable payslip")
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.regex_hits == 0
assert result.claude_hits == 1
async with session_factory() as session:
row = (await session.execute(select(Payslip).where(Payslip.id == payslip_id))).scalar_one()
assert row.cash_income_tax_source == "claude"
# (200 * (500-100) / 500) = 160.00
assert row.cash_income_tax == Decimal("160.00")
async def test_backfill_skips_rows_with_cash_tax_already_set(
session_factory: async_sessionmaker[Any],
) -> None:
"""Rows that already have cash_income_tax populated are not re-processed."""
await _insert_payslip(session_factory,
paperless_doc_id=50,
cash_income_tax=Decimal("777.77"),
rsu_vest=Decimal("10000"))
paperless = AsyncMock()
extractor = AsyncMock()
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.processed == 0
paperless.download_document.assert_not_called()
async def test_backfill_skips_rows_without_rsu_vest(
session_factory: async_sessionmaker[Any],
) -> None:
"""NULL cash_income_tax but rsu_vest=0 is fine (no RSU-band distortion)."""
await _insert_payslip(session_factory,
paperless_doc_id=51,
cash_income_tax=None,
rsu_vest=Decimal("0"))
paperless = AsyncMock()
extractor = AsyncMock()
result = await backfill_cash_income_tax(session_factory, paperless, extractor)
assert result.processed == 0
def _build_variant_a_payslip_text(*, gross: str, net: str, salary: str, tax: str, nic: str,
student: str, pension_sacrifice: str, taxable_pay: str) -> str:
"""Synthesize a variant-A payslip body that the parser accepts.
Layout mirrors the 2021-08 fixture: Description | This Period | This Year
header, two Totals anchors (gross, then deductions), Net Pay line, and a
Taxable Pay summary line. Each row needs TWO amounts (period + YTD) or the
parser treats it as YTD-only and skips.
"""
deduction_total = float(tax) + float(nic) + float(student)
return (
"Facebook UK Ltd\n"
"Date : 30 Jun 2020\n"
"Description This Period This Year\n"
f"Salary {salary} {salary}\n"
f"AE Pension EE ({pension_sacrifice}) ({pension_sacrifice})\n"
f"Total {gross}\n"
f"Tax {tax} {tax}\n"
f"National Insurance {nic} {nic}\n"
f"Student Loans {student} {student}\n"
f"Total {deduction_total:.2f}\n"
f"Net Pay {net}\n"
f"Taxable Pay : This Period £{taxable_pay} : To Date £{taxable_pay}\n")

View file

@ -176,6 +176,36 @@ def test_variant_a_cash_income_tax_pro_rata() -> None:
assert abs(result.cash_income_tax - Decimal("1367.76")) <= Decimal("0.02")
@pytest.mark.parametrize("taxable_pay_line", [
"Taxable Pay : This Period £1234.56 : To Date £12345.67",
"Taxable Pay: This Period £1234.56",
"Taxable Pay This Period £1234.56 To Date £12345.67",
"TAXABLE PAY : This Period £1234.56",
"taxable pay : this period £1234.56",
"Taxable Pay : Period £1234.56",
"Taxable Pay : This Period £1,234.56",
])
def test_taxable_pay_variant_a_regex_matches(taxable_pay_line: str) -> None:
"""Variant-A Taxable Pay line appears in several layouts pre-2022.
Original regex only matched `Taxable Pay : This Period £...` exactly.
Widen to tolerate: missing/different colons, uppercase, no "This",
whitespace in place of colons. Rows that don't match fall back to
Claude in the back-fill path.
"""
from payslip_ingest.parsers.meta_uk import _match_variant_a_taxable_pay
result = _match_variant_a_taxable_pay(taxable_pay_line)
assert result == Decimal("1234.56")
def test_taxable_pay_variant_a_regex_rejects_unmatched() -> None:
"""If there's no `Taxable Pay` label we should return None, not crash."""
from payslip_ingest.parsers.meta_uk import _match_variant_a_taxable_pay
assert _match_variant_a_taxable_pay("some random line") is None
assert _match_variant_a_taxable_pay("") is None
def test_raises_on_non_meta_payslip() -> None:
with pytest.raises(ParserError):
parse_meta_uk("This is not a Meta payslip\nRandom text\n")