import json import logging import re import shutil import subprocess from dataclasses import dataclass from decimal import Decimal from typing import Any, Protocol from sqlalchemy import select from sqlalchemy.ext.asyncio import async_sessionmaker from payslip_ingest.db import Payslip from payslip_ingest.extractor import ClaudeExtractor from payslip_ingest.paperless import PaperlessClient from payslip_ingest.parsers import ParserError, parse_meta_uk from payslip_ingest.schema import ExtractedPayslip, validate_totals from payslip_ingest.tax_year import derive_tax_year log = logging.getLogger(__name__) # Paperless's `payslip` tag has drifted over the years — it gets sprinkled on # annual summaries (P60), performance/bonus letters, RSU grants, comp-review # letters. Those are legitimate financial docs but they aren't monthly payslips # and including them would skew every chart (a P60 looks like a single payslip # 12x normal size). We skip by title pattern before hitting Claude so we don't # burn extraction budget on them either. NON_PAYSLIP_TITLE_RE = re.compile( r"p[\s._-]?60" r"|performance.*(letter|year.end)|year.end.*letter" r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter" r"|rsu\s*grant", re.IGNORECASE, ) # Some Paperless docs have no title at all — the title filter can't catch # them. These are detected by content signature in the pdftotext output. # Only apply to the first ~500 chars so we don't accidentally false-positive # a real payslip that happens to mention "P60" in a footnote somewhere. NON_PAYSLIP_CONTENT_RE = re.compile( r"P60 End of Year Certificate" r"|Employer's summary.+tax year ending" r"|Take-home income per month", re.IGNORECASE, ) PDFTOTEXT_PATH = shutil.which("pdftotext") class _SessionFactory(Protocol): def __call__(self) -> Any: ... @dataclass class ProcessResult: doc_id: int status: str payslip_id: int | None = None validated: bool | None = None extractor: str | None = None # "meta_uk_regex" | "claude" | None async def process_document( doc_id: int, db_session_factory: async_sessionmaker[Any] | _SessionFactory, paperless: PaperlessClient, extractor: ClaudeExtractor, ) -> ProcessResult: async with db_session_factory() as session: existing = await session.execute( select(Payslip.id).where(Payslip.paperless_doc_id == doc_id)) if existing.scalar() is not None: log.info("skipping doc_id=%s — already ingested", doc_id) return ProcessResult(doc_id=doc_id, status="skipped") metadata = await paperless.get_document(doc_id) title = (metadata.get("title") or "").strip() if NON_PAYSLIP_TITLE_RE.search(title): log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title) return ProcessResult(doc_id=doc_id, status="skipped_non_payslip") pdf_bytes = await paperless.download_document(doc_id) # Content-level non-payslip check (catches P60s with no Paperless title, # personal income spreadsheets, etc.) before we burn extractor budget. text_peek = _pdftotext(pdf_bytes) or "" if NON_PAYSLIP_CONTENT_RE.search(text_peek[:500]): log.info("skipping doc_id=%s — content matches non-payslip signature", doc_id) return ProcessResult(doc_id=doc_id, status="skipped_non_payslip") extracted, which = await _extract(pdf_bytes, metadata, extractor) # Sanity check: Viktor joined Meta UK in 2019. Any pay_date earlier # than 2010 or a zero gross almost certainly means the extractor # hallucinated on a non-payslip PDF that slipped past the title filter. # Reject rather than poison the DB with a 1900-01-01 ghost row. if extracted.pay_date.year < 2010: raise ValueError( f"doc_id={doc_id} extractor={which} produced implausible pay_date={extracted.pay_date}") if extracted.gross_pay == 0 and extracted.net_pay == 0: raise ValueError(f"doc_id={doc_id} extractor={which} produced zero gross and net") validated = validate_totals(extracted) if not validated: log.warning( "totals mismatch for doc_id=%s extractor=%s gross=%s net=%s — storing validated=False", doc_id, which, extracted.gross_pay, extracted.net_pay, ) payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated) status = "inserted" if payslip_id is not None else "skipped" return ProcessResult(doc_id=doc_id, status=status, payslip_id=payslip_id, validated=validated, extractor=which) async def _extract( pdf_bytes: bytes, metadata: dict[str, Any], extractor: ClaudeExtractor, ) -> tuple[ExtractedPayslip, str]: """Try the regex parser first; fall back to Claude if it can't match. The regex path runs in milliseconds and validates ~100% for Meta UK payslips. Claude is expensive ($0.01-0.05 + 30-90s wall time) and only succeeds ~15% of the time on Meta templates because it fumbles pension-sacrifice arithmetic and YTD-vs-this-period columns. """ text = _pdftotext(pdf_bytes) if text: try: parsed = parse_meta_uk(text) log.info("regex parser hit: gross=%s net=%s", parsed.gross_pay, parsed.net_pay) return parsed, "meta_uk_regex" except ParserError as exc: log.info("regex parser miss (%s) — falling back to Claude", exc) extracted = await extractor.extract(pdf_bytes, metadata) return extracted, "claude" def _pdftotext(pdf_bytes: bytes) -> str | None: if not PDFTOTEXT_PATH: return None try: proc = subprocess.run( [PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"], input=pdf_bytes, capture_output=True, timeout=30, check=False, ) except (subprocess.SubprocessError, OSError) as exc: log.warning("pdftotext failed: %s", exc) return None text = proc.stdout.decode("utf-8", errors="replace").strip() return text or None async def _insert_payslip( db_session_factory: async_sessionmaker[Any] | _SessionFactory, doc_id: int, extracted: ExtractedPayslip, validated: bool, ) -> int | None: raw = json.loads(extracted.model_dump_json()) async with db_session_factory() as session, session.begin(): existing = await session.execute( select(Payslip.id).where(Payslip.paperless_doc_id == doc_id)) existing_id = existing.scalar() if existing_id is not None: return None row = Payslip( paperless_doc_id=doc_id, pay_date=extracted.pay_date, pay_period_start=extracted.pay_period_start, pay_period_end=extracted.pay_period_end, employer=extracted.employer, currency=extracted.currency, gross_pay=extracted.gross_pay, income_tax=extracted.income_tax, national_insurance=extracted.national_insurance, pension_employee=extracted.pension_employee, pension_employer=extracted.pension_employer, student_loan=extracted.student_loan, rsu_vest=extracted.rsu_vest, rsu_offset=extracted.rsu_offset, salary=extracted.salary, bonus=extracted.bonus, pension_sacrifice=extracted.pension_sacrifice, taxable_pay=extracted.taxable_pay, ytd_tax_paid=extracted.ytd_tax_paid, ytd_taxable_pay=extracted.ytd_taxable_pay, ytd_gross=extracted.ytd_gross, other_deductions=_decimals_to_float(extracted.other_deductions), net_pay=extracted.net_pay, tax_year=derive_tax_year(extracted.pay_date), raw_extraction=raw, validated=validated, ) session.add(row) await session.flush() return row.id def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]: return {k: float(v) for k, v in mapping.items()}