payslip-ingest/payslip_ingest/processor.py
Viktor Barzin 974181674d v2: regex parser for Meta UK template + accurate RSU tax attribution
## Context

v1 shipped a Claude Haiku-based extractor that validated only 10/71
backfilled rows. Haiku fumbles the arithmetic on pension salary-sacrifice,
conflates RSU vest with regular earnings, and occasionally misreads YTD
vs this-period columns — so 86% of rows land with validated=false and the
downstream dashboards under-report take-home.

Meta UK uses a stable two-variant template (pre/post 2022-01-31 boundary),
so a regex parser is both faster (ms vs. 30-90s + $0.01-0.05/call) and
more accurate. v2 introduces that parser as the primary path, keeps
Claude as the fallback for non-Meta payslips, and surfaces new fields
the dashboard needs to attribute PAYE between cash salary and RSU vests
correctly.

## This change

### Parser (new)

`payslip_ingest/parsers/meta_uk.py` detects the layout variant by header
presence:

- **Variant A** (pre-2022): vertical Description/This Period/This Year.
  `AE Pension EE` is a positive deduction against a pre-sacrifice gross —
  maps to `pension_employee` for the existing validation formula to hold.
- **Variant B** (post-2022): side-by-side Payments | Deductions | Year to
  Date. `AE Pension EE` is NEGATIVE in Payments (salary sacrifice) — maps
  to `pension_sacrifice` and is already netted into Total Payment.
  `rsu_vest = RSU Tax Offset + RSU Excs Refund` (Meta's template inflates
  Taxable Pay without using a matching offset deduction).

Column boundaries come from the header row's anchor positions; each data
row slices into 3 cells and the last numeric token per cell is the amount.
Anchor misses raise ParserError so the caller falls back to Claude rather
than silently returning bad data.

### New fields

Schema + DB + Claude prompt gain:

- `salary`, `bonus`, `pension_sacrifice` — earnings decomposition for the
  dashboard's bonus-sacrifice visibility and earnings-breakdown chart
- `taxable_pay`, `ytd_tax_paid`, `ytd_taxable_pay`, `ytd_gross` — powers
  the YTD-effective-rate method of attributing cash tax vs RSU tax, which
  is the only method that's accurate month-to-month

All new columns default to 0 / null so v1 rows continue to round-trip.

### Orchestration

processor.py tries `parse_meta_uk(pdftotext(pdf))` first. On success the
result goes straight to the DB — zero Claude tokens spent, extraction in
milliseconds. On ParserError it falls through to ClaudeExtractor as before.
ProcessResult gains an `extractor` field ("meta_uk_regex" | "claude") so
backfill logs show the hit rate.

## Tests

- `test_meta_uk_parser.py` — 11 tests covering variant A, variant B
  (standard + bonus month + bonus-sacrificed month), malformed inputs,
  and end-to-end totals validation for all 4 golden fixtures.
- `test_processor.py` — 2 new tests proving the regex-first short-circuit
  and the Claude fallback on non-Meta inputs.

Fixtures under `tests/fixtures/` are hand-crafted `pdftotext -layout`
emulations — real Meta numbers from the plan's sample payslips for
variant B, synthesized realistic variant A and bonus-sacrificed samples.

0001_initial.py reformat is yapf cleanup touched during the session's
format pass; not a behavior change.

## Test Plan

### Automated

```
$ poetry run pytest
============================= test session starts ==============================
collected 53 items

tests/test_extractor.py .....                                            [  9%]
tests/test_meta_uk_parser.py ...........                                 [ 30%]
tests/test_paperless.py ......                                           [ 41%]
tests/test_processor.py ..............                                   [ 67%]
tests/test_schema.py ....                                                [ 75%]
tests/test_tax_year.py ........                                          [ 90%]
tests/test_webhook.py .....                                              [100%]
============================== 53 passed in 1.66s ==============================

$ poetry run ruff check .
All checks passed!

$ poetry run mypy .
Success: no issues found in 24 source files

$ poetry run yapf --style pyproject.toml --diff --recursive payslip_ingest tests
(no output — all files are yapf-clean)
```

### Manual Verification

Smoke-test the parser against a real Meta payslip PDF on the deploy host:

```
# After 0003 migration applied to prod DB
$ poetry run python -c "
from payslip_ingest.parsers import parse_meta_uk
import subprocess
text = subprocess.check_output(['pdftotext', '-layout', '/path/to/real.pdf', '-']).decode()
p = parse_meta_uk(text)
print(p.model_dump_json(indent=2))
"
```

Expected: JSON with salary/bonus/rsu_vest/pension_sacrifice populated and
`validate_totals(p)` returning True.

## Reproduce locally

1. `cd payslip-ingest && poetry install`
2. `poetry run pytest tests/test_meta_uk_parser.py -v`
3. Expected: 11 tests pass, each fixture validates totals within 2p.

Closes: code-un1

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 10:53:52 +00:00

186 lines
6.6 KiB
Python

import json
import logging
import re
import shutil
import subprocess
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import Payslip
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.parsers import ParserError, parse_meta_uk
from payslip_ingest.schema import ExtractedPayslip, validate_totals
from payslip_ingest.tax_year import derive_tax_year
log = logging.getLogger(__name__)
# Paperless's `payslip` tag has drifted over the years — it gets sprinkled on
# annual summaries (P60), performance/bonus letters, RSU grants, comp-review
# letters. Those are legitimate financial docs but they aren't monthly payslips
# and including them would skew every chart (a P60 looks like a single payslip
# 12x normal size). We skip by title pattern before hitting Claude so we don't
# burn extraction budget on them either.
NON_PAYSLIP_TITLE_RE = re.compile(
r"p[\s._-]?60"
r"|performance.*(letter|year.end)|year.end.*letter"
r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter"
r"|rsu\s*grant",
re.IGNORECASE,
)
PDFTOTEXT_PATH = shutil.which("pdftotext")
class _SessionFactory(Protocol):
def __call__(self) -> Any:
...
@dataclass
class ProcessResult:
doc_id: int
status: str
payslip_id: int | None = None
validated: bool | None = None
extractor: str | None = None # "meta_uk_regex" | "claude" | None
async def process_document(
doc_id: int,
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
paperless: PaperlessClient,
extractor: ClaudeExtractor,
) -> ProcessResult:
async with db_session_factory() as session:
existing = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
if existing.scalar() is not None:
log.info("skipping doc_id=%s — already ingested", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
metadata = await paperless.get_document(doc_id)
title = (metadata.get("title") or "").strip()
if NON_PAYSLIP_TITLE_RE.search(title):
log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title)
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
pdf_bytes = await paperless.download_document(doc_id)
extracted, which = await _extract(pdf_bytes, metadata, extractor)
validated = validate_totals(extracted)
if not validated:
log.warning(
"totals mismatch for doc_id=%s extractor=%s gross=%s net=%s — storing validated=False",
doc_id,
which,
extracted.gross_pay,
extracted.net_pay,
)
payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated)
status = "inserted" if payslip_id is not None else "skipped"
return ProcessResult(doc_id=doc_id,
status=status,
payslip_id=payslip_id,
validated=validated,
extractor=which)
async def _extract(
pdf_bytes: bytes,
metadata: dict[str, Any],
extractor: ClaudeExtractor,
) -> tuple[ExtractedPayslip, str]:
"""Try the regex parser first; fall back to Claude if it can't match.
The regex path runs in milliseconds and validates ~100% for Meta UK
payslips. Claude is expensive ($0.01-0.05 + 30-90s wall time) and only
succeeds ~15% of the time on Meta templates because it fumbles
pension-sacrifice arithmetic and YTD-vs-this-period columns.
"""
text = _pdftotext(pdf_bytes)
if text:
try:
parsed = parse_meta_uk(text)
log.info("regex parser hit: gross=%s net=%s", parsed.gross_pay, parsed.net_pay)
return parsed, "meta_uk_regex"
except ParserError as exc:
log.info("regex parser miss (%s) — falling back to Claude", exc)
extracted = await extractor.extract(pdf_bytes, metadata)
return extracted, "claude"
def _pdftotext(pdf_bytes: bytes) -> str | None:
if not PDFTOTEXT_PATH:
return None
try:
proc = subprocess.run(
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
input=pdf_bytes,
capture_output=True,
timeout=30,
check=False,
)
except (subprocess.SubprocessError, OSError) as exc:
log.warning("pdftotext failed: %s", exc)
return None
text = proc.stdout.decode("utf-8", errors="replace").strip()
return text or None
async def _insert_payslip(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
doc_id: int,
extracted: ExtractedPayslip,
validated: bool,
) -> int | None:
raw = json.loads(extracted.model_dump_json())
async with db_session_factory() as session, session.begin():
existing = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
existing_id = existing.scalar()
if existing_id is not None:
return None
row = Payslip(
paperless_doc_id=doc_id,
pay_date=extracted.pay_date,
pay_period_start=extracted.pay_period_start,
pay_period_end=extracted.pay_period_end,
employer=extracted.employer,
currency=extracted.currency,
gross_pay=extracted.gross_pay,
income_tax=extracted.income_tax,
national_insurance=extracted.national_insurance,
pension_employee=extracted.pension_employee,
pension_employer=extracted.pension_employer,
student_loan=extracted.student_loan,
rsu_vest=extracted.rsu_vest,
rsu_offset=extracted.rsu_offset,
salary=extracted.salary,
bonus=extracted.bonus,
pension_sacrifice=extracted.pension_sacrifice,
taxable_pay=extracted.taxable_pay,
ytd_tax_paid=extracted.ytd_tax_paid,
ytd_taxable_pay=extracted.ytd_taxable_pay,
ytd_gross=extracted.ytd_gross,
other_deductions=_decimals_to_float(extracted.other_deductions),
net_pay=extracted.net_pay,
tax_year=derive_tax_year(extracted.pay_date),
raw_extraction=raw,
validated=validated,
)
session.add(row)
await session.flush()
return row.id
def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]:
return {k: float(v) for k, v in mapping.items()}