Meta UK pays one performance bonus per tax year (2 historically, 1 since the comp cycle change). If the same bonus amount appears on a second payslip in the same tax_year — typically from the extractor misreading a YTD figure as a current-period row — summing the column on the dashboard exaggerates total comp by 2x. `_dedup_bonus` keeps the first occurrence per (tax_year, amount) and stores subsequent matches with bonus=0. Original figure is preserved in raw_extraction for auditability; a warning is logged each time. Fixes the 2021 tax year inflation flagged by the user. Existing duplicates need a one-shot SQL cleanup (backfill task code-7z0). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
306 lines
12 KiB
Python
306 lines
12 KiB
Python
import json
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from decimal import Decimal
|
|
from typing import Any, Protocol
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker
|
|
|
|
from payslip_ingest.db import P60Reference, Payslip
|
|
from payslip_ingest.extractor import ClaudeExtractor
|
|
from payslip_ingest.paperless import PaperlessClient
|
|
from payslip_ingest.parsers import ExtractedP60, ParserError, parse_meta_uk, parse_p60
|
|
from payslip_ingest.parsers.p60 import P60ParserError
|
|
from payslip_ingest.schema import ExtractedPayslip, validate_totals
|
|
from payslip_ingest.tax_year import derive_tax_year
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Paperless's `payslip` tag has drifted over the years — it gets sprinkled on
|
|
# annual summaries (P60), performance/bonus letters, RSU grants, comp-review
|
|
# letters. Those are legitimate financial docs but they aren't monthly payslips
|
|
# and including them would skew every chart (a P60 looks like a single payslip
|
|
# 12x normal size). We skip by title pattern before hitting Claude so we don't
|
|
# burn extraction budget on them either.
|
|
NON_PAYSLIP_TITLE_RE = re.compile(
|
|
r"p[\s._-]?60"
|
|
r"|performance.*(letter|year.end)|year.end.*letter"
|
|
r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter"
|
|
r"|rsu\s*grant",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
# Some Paperless docs have no title at all — the title filter can't catch
|
|
# them. These are detected by content signature in the pdftotext output.
|
|
# Only apply to the first ~500 chars so we don't accidentally false-positive
|
|
# a real payslip that happens to mention "P60" in a footnote somewhere.
|
|
NON_PAYSLIP_CONTENT_RE = re.compile(
|
|
r"P60 End of Year Certificate"
|
|
r"|Employer's summary.+tax year ending"
|
|
r"|Take-home income per month",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
PDFTOTEXT_PATH = shutil.which("pdftotext")
|
|
|
|
|
|
class _SessionFactory(Protocol):
|
|
|
|
def __call__(self) -> Any:
|
|
...
|
|
|
|
|
|
@dataclass
|
|
class ProcessResult:
|
|
doc_id: int
|
|
status: str
|
|
payslip_id: int | None = None
|
|
p60_id: int | None = None
|
|
validated: bool | None = None
|
|
extractor: str | None = None # "meta_uk_regex" | "claude" | "p60_regex" | None
|
|
|
|
|
|
async def process_document(
|
|
doc_id: int,
|
|
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
|
|
paperless: PaperlessClient,
|
|
extractor: ClaudeExtractor,
|
|
p60_tag_id: int | None = None,
|
|
) -> ProcessResult:
|
|
async with db_session_factory() as session:
|
|
existing_payslip = await session.execute(
|
|
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
|
|
if existing_payslip.scalar() is not None:
|
|
log.info("skipping doc_id=%s — already ingested as payslip", doc_id)
|
|
return ProcessResult(doc_id=doc_id, status="skipped")
|
|
existing_p60 = await session.execute(
|
|
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
|
|
if existing_p60.scalar() is not None:
|
|
log.info("skipping doc_id=%s — already ingested as P60", doc_id)
|
|
return ProcessResult(doc_id=doc_id, status="skipped")
|
|
|
|
metadata = await paperless.get_document(doc_id)
|
|
tag_ids = metadata.get("tags") or []
|
|
if p60_tag_id is not None and p60_tag_id in tag_ids:
|
|
pdf_bytes = await paperless.download_document(doc_id)
|
|
return await _handle_p60(doc_id, pdf_bytes, db_session_factory)
|
|
|
|
title = (metadata.get("title") or "").strip()
|
|
if NON_PAYSLIP_TITLE_RE.search(title):
|
|
log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title)
|
|
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
|
|
pdf_bytes = await paperless.download_document(doc_id)
|
|
|
|
# Content-level non-payslip check (catches P60s with no Paperless title,
|
|
# personal income spreadsheets, etc.) before we burn extractor budget.
|
|
text_peek = _pdftotext(pdf_bytes) or ""
|
|
if NON_PAYSLIP_CONTENT_RE.search(text_peek[:500]):
|
|
log.info("skipping doc_id=%s — content matches non-payslip signature", doc_id)
|
|
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
|
|
|
|
extracted, which = await _extract(pdf_bytes, metadata, extractor)
|
|
|
|
# Sanity check: Viktor joined Meta UK in 2019. Any pay_date earlier
|
|
# than 2010 or a zero gross almost certainly means the extractor
|
|
# hallucinated on a non-payslip PDF that slipped past the title filter.
|
|
# Reject rather than poison the DB with a 1900-01-01 ghost row.
|
|
if extracted.pay_date.year < 2010:
|
|
raise ValueError(
|
|
f"doc_id={doc_id} extractor={which} produced implausible pay_date={extracted.pay_date}")
|
|
if extracted.gross_pay == 0 and extracted.net_pay == 0:
|
|
raise ValueError(f"doc_id={doc_id} extractor={which} produced zero gross and net")
|
|
|
|
validated = validate_totals(extracted)
|
|
if not validated:
|
|
log.warning(
|
|
"totals mismatch for doc_id=%s extractor=%s gross=%s net=%s — storing validated=False",
|
|
doc_id,
|
|
which,
|
|
extracted.gross_pay,
|
|
extracted.net_pay,
|
|
)
|
|
|
|
payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated)
|
|
status = "inserted" if payslip_id is not None else "skipped"
|
|
return ProcessResult(doc_id=doc_id,
|
|
status=status,
|
|
payslip_id=payslip_id,
|
|
validated=validated,
|
|
extractor=which)
|
|
|
|
|
|
async def _extract(
|
|
pdf_bytes: bytes,
|
|
metadata: dict[str, Any],
|
|
extractor: ClaudeExtractor,
|
|
) -> tuple[ExtractedPayslip, str]:
|
|
"""Try the regex parser first; fall back to Claude if it can't match.
|
|
|
|
The regex path runs in milliseconds and validates ~100% for Meta UK
|
|
payslips. Claude is expensive ($0.01-0.05 + 30-90s wall time) and only
|
|
succeeds ~15% of the time on Meta templates because it fumbles
|
|
pension-sacrifice arithmetic and YTD-vs-this-period columns.
|
|
"""
|
|
text = _pdftotext(pdf_bytes)
|
|
if text:
|
|
try:
|
|
parsed = parse_meta_uk(text)
|
|
log.info("regex parser hit: gross=%s net=%s", parsed.gross_pay, parsed.net_pay)
|
|
return parsed, "meta_uk_regex"
|
|
except ParserError as exc:
|
|
log.info("regex parser miss (%s) — falling back to Claude", exc)
|
|
|
|
extracted = await extractor.extract(pdf_bytes, metadata)
|
|
return extracted, "claude"
|
|
|
|
|
|
def _pdftotext(pdf_bytes: bytes) -> str | None:
|
|
if not PDFTOTEXT_PATH:
|
|
return None
|
|
try:
|
|
proc = subprocess.run(
|
|
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
|
|
input=pdf_bytes,
|
|
capture_output=True,
|
|
timeout=30,
|
|
check=False,
|
|
)
|
|
except (subprocess.SubprocessError, OSError) as exc:
|
|
log.warning("pdftotext failed: %s", exc)
|
|
return None
|
|
text = proc.stdout.decode("utf-8", errors="replace").strip()
|
|
return text or None
|
|
|
|
|
|
async def _insert_payslip(
|
|
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
|
|
doc_id: int,
|
|
extracted: ExtractedPayslip,
|
|
validated: bool,
|
|
) -> int | None:
|
|
raw = json.loads(extracted.model_dump_json())
|
|
async with db_session_factory() as session, session.begin():
|
|
existing = await session.execute(
|
|
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
|
|
existing_id = existing.scalar()
|
|
if existing_id is not None:
|
|
return None
|
|
|
|
tax_year = derive_tax_year(extracted.pay_date)
|
|
bonus = await _dedup_bonus(session, tax_year, extracted.bonus, doc_id)
|
|
|
|
row = Payslip(
|
|
paperless_doc_id=doc_id,
|
|
pay_date=extracted.pay_date,
|
|
pay_period_start=extracted.pay_period_start,
|
|
pay_period_end=extracted.pay_period_end,
|
|
employer=extracted.employer,
|
|
currency=extracted.currency,
|
|
gross_pay=extracted.gross_pay,
|
|
income_tax=extracted.income_tax,
|
|
national_insurance=extracted.national_insurance,
|
|
pension_employee=extracted.pension_employee,
|
|
pension_employer=extracted.pension_employer,
|
|
student_loan=extracted.student_loan,
|
|
rsu_vest=extracted.rsu_vest,
|
|
rsu_offset=extracted.rsu_offset,
|
|
salary=extracted.salary,
|
|
bonus=bonus,
|
|
pension_sacrifice=extracted.pension_sacrifice,
|
|
taxable_pay=extracted.taxable_pay,
|
|
ytd_tax_paid=extracted.ytd_tax_paid,
|
|
ytd_taxable_pay=extracted.ytd_taxable_pay,
|
|
ytd_gross=extracted.ytd_gross,
|
|
cash_income_tax=extracted.cash_income_tax,
|
|
ytd_rsu_tax_offset=extracted.ytd_rsu_tax_offset,
|
|
ytd_rsu_excs_refund=extracted.ytd_rsu_excs_refund,
|
|
other_deductions=_decimals_to_float(extracted.other_deductions),
|
|
net_pay=extracted.net_pay,
|
|
tax_year=tax_year,
|
|
raw_extraction=raw,
|
|
validated=validated,
|
|
)
|
|
session.add(row)
|
|
await session.flush()
|
|
return row.id
|
|
|
|
|
|
def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]:
|
|
return {k: float(v) for k, v in mapping.items()}
|
|
|
|
|
|
async def _dedup_bonus(session: Any, tax_year: str, bonus: Decimal, doc_id: int) -> Decimal:
|
|
"""Zero out a repeated bonus within the same tax year.
|
|
|
|
Meta pays one performance bonus per year (capped at 2 historically; 1
|
|
since the comp cycle change). If the parser or Claude extractor picks
|
|
up the same amount twice — usually from a payslip that reprints the
|
|
YTD bonus figure as a current-period row — charting sums the amount
|
|
multiple times and exaggerates total comp. Rather than surface
|
|
duplicates, we keep the first occurrence (earliest pay_date in the
|
|
tax year) and ingest subsequent matches with bonus=0. The original
|
|
figure stays in `raw_extraction` for auditability.
|
|
"""
|
|
if bonus <= 0:
|
|
return bonus
|
|
existing = await session.execute(
|
|
select(Payslip.id).where(Payslip.tax_year == tax_year, Payslip.bonus == bonus))
|
|
if existing.scalar() is not None:
|
|
log.warning("duplicate bonus suppressed: doc_id=%s tax_year=%s bonus=%s (kept first)",
|
|
doc_id, tax_year, bonus)
|
|
return Decimal("0")
|
|
return bonus
|
|
|
|
|
|
async def _handle_p60(
|
|
doc_id: int,
|
|
pdf_bytes: bytes,
|
|
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
|
|
) -> ProcessResult:
|
|
text = _pdftotext(pdf_bytes)
|
|
if not text:
|
|
raise ValueError(f"doc_id={doc_id} P60 pdftotext extraction returned empty")
|
|
try:
|
|
parsed = parse_p60(text)
|
|
except P60ParserError as exc:
|
|
raise ValueError(f"doc_id={doc_id} P60 parser miss: {exc}") from exc
|
|
log.info("p60 parsed: tax_year=%s employer=%s gross=%s tax=%s", parsed.tax_year,
|
|
parsed.employer, parsed.gross_pay, parsed.income_tax)
|
|
p60_id = await _insert_p60(db_session_factory, doc_id, parsed)
|
|
return ProcessResult(
|
|
doc_id=doc_id,
|
|
status="inserted" if p60_id is not None else "skipped",
|
|
p60_id=p60_id,
|
|
extractor="p60_regex",
|
|
)
|
|
|
|
|
|
async def _insert_p60(
|
|
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
|
|
doc_id: int,
|
|
parsed: ExtractedP60,
|
|
) -> int | None:
|
|
async with db_session_factory() as session, session.begin():
|
|
existing = await session.execute(
|
|
select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id))
|
|
if existing.scalar() is not None:
|
|
return None
|
|
row = P60Reference(
|
|
paperless_doc_id=doc_id,
|
|
tax_year=parsed.tax_year,
|
|
employer=parsed.employer,
|
|
employer_paye_ref=parsed.employer_paye_ref,
|
|
gross_pay=parsed.gross_pay,
|
|
income_tax=parsed.income_tax,
|
|
national_insurance=parsed.national_insurance,
|
|
student_loan=parsed.student_loan,
|
|
tax_code=parsed.tax_code,
|
|
raw_extraction=parsed.to_raw(),
|
|
)
|
|
session.add(row)
|
|
await session.flush()
|
|
return row.id
|