Phase B of RSU tax spike fix. Vest-month spikes on the dashboard trace to variant-A slips (2019–mid-2022) where `cash_income_tax` is NULL — the dashboard's COALESCE fallback returns full PAYE, masquerading as cash tax. Three changes: 1. Widen variant-A Taxable Pay regex. Original pattern only matched `Taxable Pay : This Period £...`; add case-insensitive variants that tolerate missing/different colons, elided "This", and uppercase labels. Covers older 2019-2020 templates that failed the previous match. 2. New `backfill_cash_income_tax` module — walks every NULL-cash-tax row with rsu_vest > 0, re-downloads the PDF from Paperless, runs the widened regex parser, falls back to Claude for taxable_pay extraction if regex still misses, and derives cash_income_tax pro-rata. Records provenance in new `cash_income_tax_source` column (regex/claude/ fallback_null). Idempotent — only touches NULL rows. 3. Migration 0006 adds the `cash_income_tax_source` audit column. CLI: `python -m payslip_ingest backfill-cash-tax [--limit N]`. Meant to run as a one-shot K8s Job after `alembic upgrade head`. Part of: code-860
151 lines
5.3 KiB
Python
151 lines
5.3 KiB
Python
"""Back-fill `cash_income_tax` on rows where it's NULL.
|
|
|
|
Dashboard Panel 11's RSU-attributed tax band is computed as
|
|
`income_tax - COALESCE(cash_income_tax, income_tax)`; rows where
|
|
`cash_income_tax` is NULL fall back to `income_tax` and collapse the band to
|
|
zero, making the cash-band swallow the RSU-band. Back-filling those rows
|
|
restores the split.
|
|
|
|
Strategy — for each NULL row with `rsu_vest > 0`:
|
|
1. Fetch the PDF from Paperless.
|
|
2. Run the widened regex parser. If it yields a non-NULL `cash_income_tax`,
|
|
UPDATE the row with source='regex'.
|
|
3. Otherwise call Claude (narrow prompt, just taxable_pay). If it returns a
|
|
taxable_pay, compute the pro-rata share and UPDATE with source='claude'.
|
|
4. If neither succeeds, mark source='fallback_null' so the audit column
|
|
shows we tried.
|
|
|
|
Idempotent — re-running only touches rows that are still NULL. Safe to run
|
|
as a K8s one-shot Job or via CLI on demand.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker
|
|
|
|
from payslip_ingest.db import Payslip
|
|
from payslip_ingest.extractor import ClaudeExtractor, ExtractorError
|
|
from payslip_ingest.paperless import PaperlessClient
|
|
from payslip_ingest.parsers.meta_uk import ParserError, parse_meta_uk
|
|
from payslip_ingest.processor import _pdftotext
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class BackfillResult:
|
|
processed: int = 0
|
|
regex_hits: int = 0
|
|
claude_hits: int = 0
|
|
fallback_null: int = 0
|
|
errors: int = 0
|
|
|
|
|
|
async def backfill_cash_income_tax(
|
|
db_session_factory: async_sessionmaker[Any],
|
|
paperless: PaperlessClient,
|
|
extractor: ClaudeExtractor,
|
|
limit: int | None = None,
|
|
) -> BackfillResult:
|
|
"""Walk every NULL `cash_income_tax` row with `rsu_vest > 0` and try to fill it."""
|
|
async with db_session_factory() as session:
|
|
rows = (await session.execute(
|
|
select(Payslip.id, Payslip.paperless_doc_id, Payslip.income_tax, Payslip.gross_pay,
|
|
Payslip.pension_sacrifice).where(
|
|
Payslip.cash_income_tax.is_(None),
|
|
Payslip.rsu_vest > 0,
|
|
).order_by(Payslip.pay_date))).all()
|
|
|
|
if limit is not None:
|
|
rows = rows[:limit]
|
|
|
|
result = BackfillResult()
|
|
for row in rows:
|
|
try:
|
|
await _backfill_one(row, db_session_factory, paperless, extractor, result)
|
|
except Exception as exc:
|
|
log.exception("backfill failed for payslip id=%s doc_id=%s: %s", row.id,
|
|
row.paperless_doc_id, exc)
|
|
result.errors += 1
|
|
result.processed += 1
|
|
return result
|
|
|
|
|
|
async def _backfill_one(
|
|
row: Any,
|
|
db_session_factory: async_sessionmaker[Any],
|
|
paperless: PaperlessClient,
|
|
extractor: ClaudeExtractor,
|
|
result: BackfillResult,
|
|
) -> None:
|
|
pdf_bytes = await paperless.download_document(row.paperless_doc_id)
|
|
|
|
# Pass 1 — widened regex parser.
|
|
text_body = _pdftotext(pdf_bytes) or ""
|
|
if text_body:
|
|
try:
|
|
parsed = parse_meta_uk(text_body)
|
|
if parsed.cash_income_tax is not None and parsed.taxable_pay is not None:
|
|
await _apply_update(db_session_factory, row.id, parsed.cash_income_tax, "regex")
|
|
result.regex_hits += 1
|
|
return
|
|
except ParserError:
|
|
pass
|
|
|
|
# Pass 2 — Claude fallback for the one field we need.
|
|
metadata = await paperless.get_document(row.paperless_doc_id)
|
|
try:
|
|
extracted = await extractor.extract(pdf_bytes, metadata)
|
|
except ExtractorError as exc:
|
|
log.warning("Claude extraction failed for doc_id=%s: %s", row.paperless_doc_id, exc)
|
|
extracted = None
|
|
|
|
if extracted is not None and extracted.taxable_pay and extracted.taxable_pay > 0:
|
|
cash_tax = _derive_cash_tax(row.income_tax, row.gross_pay, row.pension_sacrifice,
|
|
extracted.taxable_pay)
|
|
await _apply_update(db_session_factory, row.id, cash_tax, "claude")
|
|
result.claude_hits += 1
|
|
return
|
|
|
|
await _apply_source_only(db_session_factory, row.id, "fallback_null")
|
|
result.fallback_null += 1
|
|
|
|
|
|
def _derive_cash_tax(income_tax: Decimal, gross_pay: Decimal, pension_sacrifice: Decimal,
|
|
taxable_pay: Decimal) -> Decimal:
|
|
if taxable_pay <= 0:
|
|
return income_tax
|
|
return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01"))
|
|
|
|
|
|
async def _apply_update(
|
|
db_session_factory: async_sessionmaker[Any],
|
|
payslip_id: int,
|
|
cash_tax: Decimal,
|
|
source: str,
|
|
) -> None:
|
|
async with db_session_factory() as session, session.begin():
|
|
await session.execute(
|
|
update(Payslip).where(Payslip.id == payslip_id).values(
|
|
cash_income_tax=cash_tax,
|
|
cash_income_tax_source=source,
|
|
))
|
|
|
|
|
|
async def _apply_source_only(
|
|
db_session_factory: async_sessionmaker[Any],
|
|
payslip_id: int,
|
|
source: str,
|
|
) -> None:
|
|
async with db_session_factory() as session, session.begin():
|
|
await session.execute(
|
|
update(Payslip).where(Payslip.id == payslip_id).values(
|
|
cash_income_tax_source=source))
|
|
|
|
|
|
__all__ = ["BackfillResult", "backfill_cash_income_tax", "_derive_cash_tax"]
|