payslip-ingest/payslip_ingest/backfill_cash_tax.py

152 lines
5.3 KiB
Python
Raw Permalink Normal View History

"""Back-fill `cash_income_tax` on rows where it's NULL.
Dashboard Panel 11's RSU-attributed tax band is computed as
`income_tax - COALESCE(cash_income_tax, income_tax)`; rows where
`cash_income_tax` is NULL fall back to `income_tax` and collapse the band to
zero, making the cash-band swallow the RSU-band. Back-filling those rows
restores the split.
Strategy for each NULL row with `rsu_vest > 0`:
1. Fetch the PDF from Paperless.
2. Run the widened regex parser. If it yields a non-NULL `cash_income_tax`,
UPDATE the row with source='regex'.
3. Otherwise call Claude (narrow prompt, just taxable_pay). If it returns a
taxable_pay, compute the pro-rata share and UPDATE with source='claude'.
4. If neither succeeds, mark source='fallback_null' so the audit column
shows we tried.
Idempotent re-running only touches rows that are still NULL. Safe to run
as a K8s one-shot Job or via CLI on demand.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from decimal import Decimal
from typing import Any
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import Payslip
from payslip_ingest.extractor import ClaudeExtractor, ExtractorError
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.parsers.meta_uk import ParserError, parse_meta_uk
from payslip_ingest.processor import _pdftotext
log = logging.getLogger(__name__)
@dataclass
class BackfillResult:
processed: int = 0
regex_hits: int = 0
claude_hits: int = 0
fallback_null: int = 0
errors: int = 0
async def backfill_cash_income_tax(
db_session_factory: async_sessionmaker[Any],
paperless: PaperlessClient,
extractor: ClaudeExtractor,
limit: int | None = None,
) -> BackfillResult:
"""Walk every NULL `cash_income_tax` row with `rsu_vest > 0` and try to fill it."""
async with db_session_factory() as session:
rows = (await session.execute(
select(Payslip.id, Payslip.paperless_doc_id, Payslip.income_tax, Payslip.gross_pay,
Payslip.pension_sacrifice).where(
Payslip.cash_income_tax.is_(None),
Payslip.rsu_vest > 0,
).order_by(Payslip.pay_date))).all()
if limit is not None:
rows = rows[:limit]
result = BackfillResult()
for row in rows:
try:
await _backfill_one(row, db_session_factory, paperless, extractor, result)
except Exception as exc:
log.exception("backfill failed for payslip id=%s doc_id=%s: %s", row.id,
row.paperless_doc_id, exc)
result.errors += 1
result.processed += 1
return result
async def _backfill_one(
row: Any,
db_session_factory: async_sessionmaker[Any],
paperless: PaperlessClient,
extractor: ClaudeExtractor,
result: BackfillResult,
) -> None:
pdf_bytes = await paperless.download_document(row.paperless_doc_id)
# Pass 1 — widened regex parser.
text_body = _pdftotext(pdf_bytes) or ""
if text_body:
try:
parsed = parse_meta_uk(text_body)
if parsed.cash_income_tax is not None and parsed.taxable_pay is not None:
await _apply_update(db_session_factory, row.id, parsed.cash_income_tax, "regex")
result.regex_hits += 1
return
except ParserError:
pass
# Pass 2 — Claude fallback for the one field we need.
metadata = await paperless.get_document(row.paperless_doc_id)
try:
extracted = await extractor.extract(pdf_bytes, metadata)
except ExtractorError as exc:
log.warning("Claude extraction failed for doc_id=%s: %s", row.paperless_doc_id, exc)
extracted = None
if extracted is not None and extracted.taxable_pay and extracted.taxable_pay > 0:
cash_tax = _derive_cash_tax(row.income_tax, row.gross_pay, row.pension_sacrifice,
extracted.taxable_pay)
await _apply_update(db_session_factory, row.id, cash_tax, "claude")
result.claude_hits += 1
return
await _apply_source_only(db_session_factory, row.id, "fallback_null")
result.fallback_null += 1
def _derive_cash_tax(income_tax: Decimal, gross_pay: Decimal, pension_sacrifice: Decimal,
taxable_pay: Decimal) -> Decimal:
if taxable_pay <= 0:
return income_tax
return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01"))
async def _apply_update(
db_session_factory: async_sessionmaker[Any],
payslip_id: int,
cash_tax: Decimal,
source: str,
) -> None:
async with db_session_factory() as session, session.begin():
await session.execute(
update(Payslip).where(Payslip.id == payslip_id).values(
cash_income_tax=cash_tax,
cash_income_tax_source=source,
))
async def _apply_source_only(
db_session_factory: async_sessionmaker[Any],
payslip_id: int,
source: str,
) -> None:
async with db_session_factory() as session, session.begin():
await session.execute(
update(Payslip).where(Payslip.id == payslip_id).values(
cash_income_tax_source=source))
__all__ = ["BackfillResult", "backfill_cash_income_tax", "_derive_cash_tax"]