payslip-ingest/payslip_ingest/processor.py
Viktor Barzin 92e4ecaf78 processor: dedup bonus within tax year — zero out repeats
Meta UK pays one performance bonus per tax year (2 historically, 1
since the comp cycle change). If the same bonus amount appears on a
second payslip in the same tax_year — typically from the extractor
misreading a YTD figure as a current-period row — summing the column
on the dashboard exaggerates total comp by 2x.

`_dedup_bonus` keeps the first occurrence per (tax_year, amount) and
stores subsequent matches with bonus=0. Original figure is preserved
in raw_extraction for auditability; a warning is logged each time.

Fixes the 2021 tax year inflation flagged by the user. Existing
duplicates need a one-shot SQL cleanup (backfill task code-7z0).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 15:33:07 +00:00

306 lines
12 KiB
Python

import json
import logging
import re
import shutil
import subprocess
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import P60Reference, Payslip
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.parsers import ExtractedP60, ParserError, parse_meta_uk, parse_p60
from payslip_ingest.parsers.p60 import P60ParserError
from payslip_ingest.schema import ExtractedPayslip, validate_totals
from payslip_ingest.tax_year import derive_tax_year
log = logging.getLogger(__name__)
# Paperless's `payslip` tag has drifted over the years — it gets sprinkled on
# annual summaries (P60), performance/bonus letters, RSU grants, comp-review
# letters. Those are legitimate financial docs but they aren't monthly payslips
# and including them would skew every chart (a P60 looks like a single payslip
# 12x normal size). We skip by title pattern before hitting Claude so we don't
# burn extraction budget on them either.
NON_PAYSLIP_TITLE_RE = re.compile(
r"p[\s._-]?60"
r"|performance.*(letter|year.end)|year.end.*letter"
r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter"
r"|rsu\s*grant",
re.IGNORECASE,
)
# Some Paperless docs have no title at all — the title filter can't catch
# them. These are detected by content signature in the pdftotext output.
# Only apply to the first ~500 chars so we don't accidentally false-positive
# a real payslip that happens to mention "P60" in a footnote somewhere.
NON_PAYSLIP_CONTENT_RE = re.compile(
r"P60 End of Year Certificate"
r"|Employer's summary.+tax year ending"
r"|Take-home income per month",
re.IGNORECASE,
)
PDFTOTEXT_PATH = shutil.which("pdftotext")
class _SessionFactory(Protocol):
def __call__(self) -> Any:
...
@dataclass
class ProcessResult:
doc_id: int
status: str
payslip_id: int | None = None
p60_id: int | None = None
validated: bool | None = None
extractor: str | None = None # "meta_uk_regex" | "claude" | "p60_regex" | None
async def process_document(
doc_id: int,
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
paperless: PaperlessClient,
extractor: ClaudeExtractor,
p60_tag_id: int | None = None,
) -> ProcessResult:
async with db_session_factory() as session:
existing_payslip = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
if existing_payslip.scalar() is not None:
log.info("skipping doc_id=%s — already ingested as payslip", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
existing_p60 = await session.execute(
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
if existing_p60.scalar() is not None:
log.info("skipping doc_id=%s — already ingested as P60", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
metadata = await paperless.get_document(doc_id)
tag_ids = metadata.get("tags") or []
if p60_tag_id is not None and p60_tag_id in tag_ids:
pdf_bytes = await paperless.download_document(doc_id)
return await _handle_p60(doc_id, pdf_bytes, db_session_factory)
title = (metadata.get("title") or "").strip()
if NON_PAYSLIP_TITLE_RE.search(title):
log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title)
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
pdf_bytes = await paperless.download_document(doc_id)
# Content-level non-payslip check (catches P60s with no Paperless title,
# personal income spreadsheets, etc.) before we burn extractor budget.
text_peek = _pdftotext(pdf_bytes) or ""
if NON_PAYSLIP_CONTENT_RE.search(text_peek[:500]):
log.info("skipping doc_id=%s — content matches non-payslip signature", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
extracted, which = await _extract(pdf_bytes, metadata, extractor)
# Sanity check: Viktor joined Meta UK in 2019. Any pay_date earlier
# than 2010 or a zero gross almost certainly means the extractor
# hallucinated on a non-payslip PDF that slipped past the title filter.
# Reject rather than poison the DB with a 1900-01-01 ghost row.
if extracted.pay_date.year < 2010:
raise ValueError(
f"doc_id={doc_id} extractor={which} produced implausible pay_date={extracted.pay_date}")
if extracted.gross_pay == 0 and extracted.net_pay == 0:
raise ValueError(f"doc_id={doc_id} extractor={which} produced zero gross and net")
validated = validate_totals(extracted)
if not validated:
log.warning(
"totals mismatch for doc_id=%s extractor=%s gross=%s net=%s — storing validated=False",
doc_id,
which,
extracted.gross_pay,
extracted.net_pay,
)
payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated)
status = "inserted" if payslip_id is not None else "skipped"
return ProcessResult(doc_id=doc_id,
status=status,
payslip_id=payslip_id,
validated=validated,
extractor=which)
async def _extract(
pdf_bytes: bytes,
metadata: dict[str, Any],
extractor: ClaudeExtractor,
) -> tuple[ExtractedPayslip, str]:
"""Try the regex parser first; fall back to Claude if it can't match.
The regex path runs in milliseconds and validates ~100% for Meta UK
payslips. Claude is expensive ($0.01-0.05 + 30-90s wall time) and only
succeeds ~15% of the time on Meta templates because it fumbles
pension-sacrifice arithmetic and YTD-vs-this-period columns.
"""
text = _pdftotext(pdf_bytes)
if text:
try:
parsed = parse_meta_uk(text)
log.info("regex parser hit: gross=%s net=%s", parsed.gross_pay, parsed.net_pay)
return parsed, "meta_uk_regex"
except ParserError as exc:
log.info("regex parser miss (%s) — falling back to Claude", exc)
extracted = await extractor.extract(pdf_bytes, metadata)
return extracted, "claude"
def _pdftotext(pdf_bytes: bytes) -> str | None:
if not PDFTOTEXT_PATH:
return None
try:
proc = subprocess.run(
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
input=pdf_bytes,
capture_output=True,
timeout=30,
check=False,
)
except (subprocess.SubprocessError, OSError) as exc:
log.warning("pdftotext failed: %s", exc)
return None
text = proc.stdout.decode("utf-8", errors="replace").strip()
return text or None
async def _insert_payslip(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
doc_id: int,
extracted: ExtractedPayslip,
validated: bool,
) -> int | None:
raw = json.loads(extracted.model_dump_json())
async with db_session_factory() as session, session.begin():
existing = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
existing_id = existing.scalar()
if existing_id is not None:
return None
tax_year = derive_tax_year(extracted.pay_date)
bonus = await _dedup_bonus(session, tax_year, extracted.bonus, doc_id)
row = Payslip(
paperless_doc_id=doc_id,
pay_date=extracted.pay_date,
pay_period_start=extracted.pay_period_start,
pay_period_end=extracted.pay_period_end,
employer=extracted.employer,
currency=extracted.currency,
gross_pay=extracted.gross_pay,
income_tax=extracted.income_tax,
national_insurance=extracted.national_insurance,
pension_employee=extracted.pension_employee,
pension_employer=extracted.pension_employer,
student_loan=extracted.student_loan,
rsu_vest=extracted.rsu_vest,
rsu_offset=extracted.rsu_offset,
salary=extracted.salary,
bonus=bonus,
pension_sacrifice=extracted.pension_sacrifice,
taxable_pay=extracted.taxable_pay,
ytd_tax_paid=extracted.ytd_tax_paid,
ytd_taxable_pay=extracted.ytd_taxable_pay,
ytd_gross=extracted.ytd_gross,
cash_income_tax=extracted.cash_income_tax,
ytd_rsu_tax_offset=extracted.ytd_rsu_tax_offset,
ytd_rsu_excs_refund=extracted.ytd_rsu_excs_refund,
other_deductions=_decimals_to_float(extracted.other_deductions),
net_pay=extracted.net_pay,
tax_year=tax_year,
raw_extraction=raw,
validated=validated,
)
session.add(row)
await session.flush()
return row.id
def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]:
return {k: float(v) for k, v in mapping.items()}
async def _dedup_bonus(session: Any, tax_year: str, bonus: Decimal, doc_id: int) -> Decimal:
"""Zero out a repeated bonus within the same tax year.
Meta pays one performance bonus per year (capped at 2 historically; 1
since the comp cycle change). If the parser or Claude extractor picks
up the same amount twice — usually from a payslip that reprints the
YTD bonus figure as a current-period row — charting sums the amount
multiple times and exaggerates total comp. Rather than surface
duplicates, we keep the first occurrence (earliest pay_date in the
tax year) and ingest subsequent matches with bonus=0. The original
figure stays in `raw_extraction` for auditability.
"""
if bonus <= 0:
return bonus
existing = await session.execute(
select(Payslip.id).where(Payslip.tax_year == tax_year, Payslip.bonus == bonus))
if existing.scalar() is not None:
log.warning("duplicate bonus suppressed: doc_id=%s tax_year=%s bonus=%s (kept first)",
doc_id, tax_year, bonus)
return Decimal("0")
return bonus
async def _handle_p60(
doc_id: int,
pdf_bytes: bytes,
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
) -> ProcessResult:
text = _pdftotext(pdf_bytes)
if not text:
raise ValueError(f"doc_id={doc_id} P60 pdftotext extraction returned empty")
try:
parsed = parse_p60(text)
except P60ParserError as exc:
raise ValueError(f"doc_id={doc_id} P60 parser miss: {exc}") from exc
log.info("p60 parsed: tax_year=%s employer=%s gross=%s tax=%s", parsed.tax_year,
parsed.employer, parsed.gross_pay, parsed.income_tax)
p60_id = await _insert_p60(db_session_factory, doc_id, parsed)
return ProcessResult(
doc_id=doc_id,
status="inserted" if p60_id is not None else "skipped",
p60_id=p60_id,
extractor="p60_regex",
)
async def _insert_p60(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
doc_id: int,
parsed: ExtractedP60,
) -> int | None:
async with db_session_factory() as session, session.begin():
existing = await session.execute(
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
if existing.scalar() is not None:
return None
row = P60Reference(
paperless_doc_id=doc_id,
tax_year=parsed.tax_year,
employer=parsed.employer,
employer_paye_ref=parsed.employer_paye_ref,
gross_pay=parsed.gross_pay,
income_tax=parsed.income_tax,
national_insurance=parsed.national_insurance,
student_loan=parsed.student_loan,
tax_code=parsed.tax_code,
raw_extraction=parsed.to_raw(),
)
session.add(row)
await session.flush()
return row.id