import json import logging import re import shutil import subprocess from dataclasses import dataclass from decimal import Decimal from typing import Any, Protocol from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker from payslip_ingest.db import P60Reference, Payslip from payslip_ingest.extractor import ClaudeExtractor from payslip_ingest.paperless import PaperlessClient from payslip_ingest.parsers import ExtractedP60, ParserError, parse_meta_uk, parse_p60 from payslip_ingest.parsers.p60 import P60ParserError from payslip_ingest.schema import ExtractedPayslip, validate_totals from payslip_ingest.tax_year import derive_tax_year log = logging.getLogger(__name__) # Paperless's `payslip` tag has drifted over the years — it gets sprinkled on # annual summaries (P60), performance/bonus letters, RSU grants, comp-review # letters. Those are legitimate financial docs but they aren't monthly payslips # and including them would skew every chart (a P60 looks like a single payslip # 12x normal size). We skip by title pattern before hitting Claude so we don't # burn extraction budget on them either. NON_PAYSLIP_TITLE_RE = re.compile( r"p[\s._-]?60" r"|performance.*(letter|year.end)|year.end.*letter" r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter" r"|rsu\s*grant", re.IGNORECASE, ) # Some Paperless docs have no title at all — the title filter can't catch # them. These are detected by content signature in the pdftotext output. # Only apply to the first ~500 chars so we don't accidentally false-positive # a real payslip that happens to mention "P60" in a footnote somewhere. NON_PAYSLIP_CONTENT_RE = re.compile( r"P60 End of Year Certificate" r"|Employer's summary.+tax year ending" r"|Take-home income per month", re.IGNORECASE, ) PDFTOTEXT_PATH = shutil.which("pdftotext") class _SessionFactory(Protocol): def __call__(self) -> Any: ... @dataclass class ProcessResult: doc_id: int status: str payslip_id: int | None = None p60_id: int | None = None validated: bool | None = None extractor: str | None = None # "meta_uk_regex" | "claude" | "p60_regex" | None async def process_document( doc_id: int, db_session_factory: async_sessionmaker[Any] | _SessionFactory, paperless: PaperlessClient, extractor: ClaudeExtractor, p60_tag_id: int | None = None, ) -> ProcessResult: async with db_session_factory() as session: existing_payslip = await session.execute( select(Payslip.id).where(Payslip.paperless_doc_id == doc_id)) if existing_payslip.scalar() is not None: log.info("skipping doc_id=%s — already ingested as payslip", doc_id) return ProcessResult(doc_id=doc_id, status="skipped") existing_p60 = await session.execute( select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id)) if existing_p60.scalar() is not None: log.info("skipping doc_id=%s — already ingested as P60", doc_id) return ProcessResult(doc_id=doc_id, status="skipped") metadata = await paperless.get_document(doc_id) tag_ids = metadata.get("tags") or [] if p60_tag_id is not None and p60_tag_id in tag_ids: pdf_bytes = await paperless.download_document(doc_id) return await _handle_p60(doc_id, pdf_bytes, db_session_factory) title = (metadata.get("title") or "").strip() if NON_PAYSLIP_TITLE_RE.search(title): log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title) return ProcessResult(doc_id=doc_id, status="skipped_non_payslip") pdf_bytes = await paperless.download_document(doc_id) # Content-level non-payslip check (catches P60s with no Paperless title, # personal income spreadsheets, etc.) before we burn extractor budget. text_peek = _pdftotext(pdf_bytes) or "" if NON_PAYSLIP_CONTENT_RE.search(text_peek[:500]): log.info("skipping doc_id=%s — content matches non-payslip signature", doc_id) return ProcessResult(doc_id=doc_id, status="skipped_non_payslip") extracted, which = await _extract(pdf_bytes, metadata, extractor) # Sanity check: Viktor joined Meta UK in 2019. Any pay_date earlier # than 2010 or a zero gross almost certainly means the extractor # hallucinated on a non-payslip PDF that slipped past the title filter. # Reject rather than poison the DB with a 1900-01-01 ghost row. if extracted.pay_date.year < 2010: raise ValueError( f"doc_id={doc_id} extractor={which} produced implausible pay_date={extracted.pay_date}") if extracted.gross_pay == 0 and extracted.net_pay == 0: raise ValueError(f"doc_id={doc_id} extractor={which} produced zero gross and net") validated = validate_totals(extracted) if not validated: log.warning( "totals mismatch for doc_id=%s extractor=%s gross=%s net=%s — storing validated=False", doc_id, which, extracted.gross_pay, extracted.net_pay, ) payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated) status = "inserted" if payslip_id is not None else "skipped" return ProcessResult(doc_id=doc_id, status=status, payslip_id=payslip_id, validated=validated, extractor=which) async def _extract( pdf_bytes: bytes, metadata: dict[str, Any], extractor: ClaudeExtractor, ) -> tuple[ExtractedPayslip, str]: """Try the regex parser first; fall back to Claude if it can't match. The regex path runs in milliseconds and validates ~100% for Meta UK payslips. Claude is expensive ($0.01-0.05 + 30-90s wall time) and only succeeds ~15% of the time on Meta templates because it fumbles pension-sacrifice arithmetic and YTD-vs-this-period columns. """ text = _pdftotext(pdf_bytes) if text: try: parsed = parse_meta_uk(text) log.info("regex parser hit: gross=%s net=%s", parsed.gross_pay, parsed.net_pay) return parsed, "meta_uk_regex" except ParserError as exc: log.info("regex parser miss (%s) — falling back to Claude", exc) extracted = await extractor.extract(pdf_bytes, metadata) return extracted, "claude" def _pdftotext(pdf_bytes: bytes) -> str | None: if not PDFTOTEXT_PATH: return None try: proc = subprocess.run( [PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"], input=pdf_bytes, capture_output=True, timeout=30, check=False, ) except (subprocess.SubprocessError, OSError) as exc: log.warning("pdftotext failed: %s", exc) return None text = proc.stdout.decode("utf-8", errors="replace").strip() return text or None async def _insert_payslip( db_session_factory: async_sessionmaker[Any] | _SessionFactory, doc_id: int, extracted: ExtractedPayslip, validated: bool, ) -> int | None: raw = json.loads(extracted.model_dump_json()) async with db_session_factory() as session, session.begin(): existing = await session.execute( select(Payslip.id).where(Payslip.paperless_doc_id == doc_id)) existing_id = existing.scalar() if existing_id is not None: return None tax_year = derive_tax_year(extracted.pay_date) bonus = await _dedup_bonus(session, tax_year, extracted.bonus, doc_id) row = Payslip( paperless_doc_id=doc_id, pay_date=extracted.pay_date, pay_period_start=extracted.pay_period_start, pay_period_end=extracted.pay_period_end, employer=extracted.employer, currency=extracted.currency, gross_pay=extracted.gross_pay, income_tax=extracted.income_tax, national_insurance=extracted.national_insurance, pension_employee=extracted.pension_employee, pension_employer=extracted.pension_employer, student_loan=extracted.student_loan, rsu_vest=extracted.rsu_vest, rsu_offset=extracted.rsu_offset, salary=extracted.salary, bonus=bonus, pension_sacrifice=extracted.pension_sacrifice, taxable_pay=extracted.taxable_pay, ytd_tax_paid=extracted.ytd_tax_paid, ytd_taxable_pay=extracted.ytd_taxable_pay, ytd_gross=extracted.ytd_gross, cash_income_tax=extracted.cash_income_tax, ytd_rsu_tax_offset=extracted.ytd_rsu_tax_offset, ytd_rsu_excs_refund=extracted.ytd_rsu_excs_refund, other_deductions=_decimals_to_float(extracted.other_deductions), net_pay=extracted.net_pay, tax_year=tax_year, raw_extraction=raw, validated=validated, ) session.add(row) await session.flush() return row.id def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]: return {k: float(v) for k, v in mapping.items()} async def _dedup_bonus(session: Any, tax_year: str, bonus: Decimal, doc_id: int) -> Decimal: """Zero out a repeated bonus within the same tax year. Meta pays one performance bonus per year (capped at 2 historically; 1 since the comp cycle change). If the parser or Claude extractor picks up the same amount twice — usually from a payslip that reprints the YTD bonus figure as a current-period row — charting sums the amount multiple times and exaggerates total comp. Rather than surface duplicates, we keep the first occurrence (earliest pay_date in the tax year) and ingest subsequent matches with bonus=0. The original figure stays in `raw_extraction` for auditability. """ if bonus <= 0: return bonus existing = await session.execute( select(Payslip.id).where(Payslip.tax_year == tax_year, Payslip.bonus == bonus)) if existing.scalar() is not None: log.warning("duplicate bonus suppressed: doc_id=%s tax_year=%s bonus=%s (kept first)", doc_id, tax_year, bonus) return Decimal("0") return bonus async def _handle_p60( doc_id: int, pdf_bytes: bytes, db_session_factory: async_sessionmaker[Any] | _SessionFactory, ) -> ProcessResult: text = _pdftotext(pdf_bytes) if not text: raise ValueError(f"doc_id={doc_id} P60 pdftotext extraction returned empty") try: parsed = parse_p60(text) except P60ParserError as exc: raise ValueError(f"doc_id={doc_id} P60 parser miss: {exc}") from exc log.info("p60 parsed: tax_year=%s employer=%s gross=%s tax=%s", parsed.tax_year, parsed.employer, parsed.gross_pay, parsed.income_tax) p60_id = await _insert_p60(db_session_factory, doc_id, parsed) return ProcessResult( doc_id=doc_id, status="inserted" if p60_id is not None else "skipped", p60_id=p60_id, extractor="p60_regex", ) async def _insert_p60( db_session_factory: async_sessionmaker[Any] | _SessionFactory, doc_id: int, parsed: ExtractedP60, ) -> int | None: async with db_session_factory() as session, session.begin(): existing = await session.execute( select(P60Reference.id).where(P60Reference.paperless_doc_id == doc_id)) if existing.scalar() is not None: return None row = P60Reference( paperless_doc_id=doc_id, tax_year=parsed.tax_year, employer=parsed.employer, employer_paye_ref=parsed.employer_paye_ref, gross_pay=parsed.gross_pay, income_tax=parsed.income_tax, national_insurance=parsed.national_insurance, student_loan=parsed.student_loan, tax_code=parsed.tax_code, raw_extraction=parsed.to_raw(), ) session.add(row) await session.flush() return row.id