payslip-ingest/payslip_ingest/extractor.py
Viktor Barzin 974181674d v2: regex parser for Meta UK template + accurate RSU tax attribution
## Context

v1 shipped a Claude Haiku-based extractor that validated only 10/71
backfilled rows. Haiku fumbles the arithmetic on pension salary-sacrifice,
conflates RSU vest with regular earnings, and occasionally misreads YTD
vs this-period columns — so 86% of rows land with validated=false and the
downstream dashboards under-report take-home.

Meta UK uses a stable two-variant template (pre/post 2022-01-31 boundary),
so a regex parser is both faster (ms vs. 30-90s + $0.01-0.05/call) and
more accurate. v2 introduces that parser as the primary path, keeps
Claude as the fallback for non-Meta payslips, and surfaces new fields
the dashboard needs to attribute PAYE between cash salary and RSU vests
correctly.

## This change

### Parser (new)

`payslip_ingest/parsers/meta_uk.py` detects the layout variant by header
presence:

- **Variant A** (pre-2022): vertical Description/This Period/This Year.
  `AE Pension EE` is a positive deduction against a pre-sacrifice gross —
  maps to `pension_employee` for the existing validation formula to hold.
- **Variant B** (post-2022): side-by-side Payments | Deductions | Year to
  Date. `AE Pension EE` is NEGATIVE in Payments (salary sacrifice) — maps
  to `pension_sacrifice` and is already netted into Total Payment.
  `rsu_vest = RSU Tax Offset + RSU Excs Refund` (Meta's template inflates
  Taxable Pay without using a matching offset deduction).

Column boundaries come from the header row's anchor positions; each data
row slices into 3 cells and the last numeric token per cell is the amount.
Anchor misses raise ParserError so the caller falls back to Claude rather
than silently returning bad data.

### New fields

Schema + DB + Claude prompt gain:

- `salary`, `bonus`, `pension_sacrifice` — earnings decomposition for the
  dashboard's bonus-sacrifice visibility and earnings-breakdown chart
- `taxable_pay`, `ytd_tax_paid`, `ytd_taxable_pay`, `ytd_gross` — powers
  the YTD-effective-rate method of attributing cash tax vs RSU tax, which
  is the only method that's accurate month-to-month

All new columns default to 0 / null so v1 rows continue to round-trip.

### Orchestration

processor.py tries `parse_meta_uk(pdftotext(pdf))` first. On success the
result goes straight to the DB — zero Claude tokens spent, extraction in
milliseconds. On ParserError it falls through to ClaudeExtractor as before.
ProcessResult gains an `extractor` field ("meta_uk_regex" | "claude") so
backfill logs show the hit rate.

## Tests

- `test_meta_uk_parser.py` — 11 tests covering variant A, variant B
  (standard + bonus month + bonus-sacrificed month), malformed inputs,
  and end-to-end totals validation for all 4 golden fixtures.
- `test_processor.py` — 2 new tests proving the regex-first short-circuit
  and the Claude fallback on non-Meta inputs.

Fixtures under `tests/fixtures/` are hand-crafted `pdftotext -layout`
emulations — real Meta numbers from the plan's sample payslips for
variant B, synthesized realistic variant A and bonus-sacrificed samples.

0001_initial.py reformat is yapf cleanup touched during the session's
format pass; not a behavior change.

## Test Plan

### Automated

```
$ poetry run pytest
============================= test session starts ==============================
collected 53 items

tests/test_extractor.py .....                                            [  9%]
tests/test_meta_uk_parser.py ...........                                 [ 30%]
tests/test_paperless.py ......                                           [ 41%]
tests/test_processor.py ..............                                   [ 67%]
tests/test_schema.py ....                                                [ 75%]
tests/test_tax_year.py ........                                          [ 90%]
tests/test_webhook.py .....                                              [100%]
============================== 53 passed in 1.66s ==============================

$ poetry run ruff check .
All checks passed!

$ poetry run mypy .
Success: no issues found in 24 source files

$ poetry run yapf --style pyproject.toml --diff --recursive payslip_ingest tests
(no output — all files are yapf-clean)
```

### Manual Verification

Smoke-test the parser against a real Meta payslip PDF on the deploy host:

```
# After 0003 migration applied to prod DB
$ poetry run python -c "
from payslip_ingest.parsers import parse_meta_uk
import subprocess
text = subprocess.check_output(['pdftotext', '-layout', '/path/to/real.pdf', '-']).decode()
p = parse_meta_uk(text)
print(p.model_dump_json(indent=2))
"
```

Expected: JSON with salary/bonus/rsu_vest/pension_sacrifice populated and
`validate_totals(p)` returning True.

## Reproduce locally

1. `cd payslip-ingest && poetry install`
2. `poetry run pytest tests/test_meta_uk_parser.py -v`
3. Expected: 11 tests pass, each fixture validates totals within 2p.

Closes: code-un1

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 10:53:52 +00:00

274 lines
11 KiB
Python

import asyncio
import base64
import json
import shutil
from typing import Any
import httpx
from pydantic import ValidationError
from payslip_ingest.schema import ExtractedPayslip
AGENT_PATH = ".claude/agents/payslip-extractor"
PDFTOTEXT_PATH = shutil.which("pdftotext")
EXTRACTION_PROMPT = (
"You are extracting fields from a UK payslip PDF. Return ONLY a single JSON object "
"matching this exact schema — no prose, no markdown fences.\n"
"\n"
"Schema:\n"
"{\n"
' "pay_date": "YYYY-MM-DD",\n'
' "pay_period_start": "YYYY-MM-DD or null",\n'
' "pay_period_end": "YYYY-MM-DD or null",\n'
' "employer": "string or null",\n'
' "currency": "GBP",\n'
' "gross_pay": number,\n'
' "income_tax": number,\n'
' "national_insurance": number,\n'
' "pension_employee": number,\n'
' "pension_employer": number,\n'
' "student_loan": number,\n'
' "rsu_vest": number,\n'
' "rsu_offset": number,\n'
' "salary": number,\n'
' "bonus": number,\n'
' "pension_sacrifice": number,\n'
' "taxable_pay": number or null,\n'
' "ytd_tax_paid": number or null,\n'
' "ytd_taxable_pay": number or null,\n'
' "ytd_gross": number or null,\n'
' "other_deductions": {"label": number, ...},\n'
' "net_pay": number\n'
"}\n"
"\n"
"Rules:\n"
"- Report numbers as the payslip shows them; do not compute sums.\n"
"- Unknown numeric fields → 0 (for required) or null (for nullable), not empty strings.\n"
"- `rsu_vest`: notional stock value from the EARNINGS block labelled "
'"RSU Vest", "RSU Tax Offset", "RSU Excs Refund" (sum both if present), '
'"Restricted Stock Units", "Notional Pay", "GSU Vest". For Meta UK this is '
"the grossed-up RSU value — Schwab handles the sell-to-cover via share sale.\n"
"- `rsu_offset`: the matching DEDUCTION-side entry if the template uses one "
'("Shares Retained", "Notional Pay Offset"). Meta\'s template does NOT — '
"leave as 0 for Meta.\n"
"- `salary`: basic pay line (usually labelled \"Salary\" or \"Basic Pay\").\n"
"- `bonus`: bonus line (\"Perform Bonus\", \"Bonus\", \"Performance Bonus\"). 0 if absent.\n"
"- `pension_sacrifice`: absolute value of any NEGATIVE pension line in the "
'EARNINGS/PAYMENTS block (e.g. "AE Pension EE -600.20"). This is pre-tax '
"salary-sacrifice and is already subtracted from gross. Use `pension_employee` "
"instead for any POSITIVE pension deduction on the Deductions side.\n"
"- `taxable_pay`: value from the \"Taxable Pay\" line in the summary block, "
'THIS PERIOD column. For Meta this is the post-sacrifice + RSU-grossed-up base '
"that PAYE is computed on. Null if the payslip does not surface it.\n"
"- `ytd_tax_paid`, `ytd_taxable_pay`, `ytd_gross`: YTD column values from the "
"same summary block. Null if not present.\n"
"- `other_deductions` covers cycle-to-work, share-save, private medical, court "
"orders, anything not mapped above — ONLY for lines in the Deductions column "
"of a post-2022 Meta layout or a standalone deduction on other templates. Do "
"NOT add negative Payments lines here (they are already netted into gross).\n"
"- All money in GBP unless the payslip is denominated otherwise.\n"
'- If a field\'s value is ambiguous, pick "this period" (not YTD) for the main '
"fields; use YTD only for `ytd_*` fields.")
POLL_INTERVAL_SECONDS = 3
MAX_POLL_SECONDS = 600
BUSY_RETRY_DELAY_SECONDS = 10
MAX_BUSY_RETRIES = 90
DEFAULT_MAX_BUDGET_USD = 1.0
DEFAULT_TIMEOUT_SECONDS = 600
TERMINAL_STATUSES = {"completed", "failed", "timeout", "error"}
class ExtractorError(RuntimeError):
pass
def _build_prompt(pdf_bytes: bytes) -> str:
"""Shrink the prompt: prefer pdftotext output over raw base64.
Base64 of a 200KB PDF expands to ~270KB of tokens, which makes even Haiku
take 5-10 minutes per extraction. pdftotext normally yields 2-5KB of clean
text that Claude processes in seconds. We ship the PDF bytes as a fallback
only when pdftotext isn't available or fails (scanned-image PDFs, etc.).
"""
if PDFTOTEXT_PATH:
try:
import subprocess
proc = subprocess.run(
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
input=pdf_bytes,
capture_output=True,
timeout=30,
check=False,
)
text = proc.stdout.decode("utf-8", errors="replace").strip()
if text:
return f"{EXTRACTION_PROMPT}\n\nPAYSLIP_TEXT:\n{text}\n"
except (subprocess.SubprocessError, OSError):
pass
encoded = base64.b64encode(pdf_bytes).decode("ascii")
return f"{EXTRACTION_PROMPT}\n\nPDF_BASE64:\n{encoded}\n"
class ClaudeExtractor:
"""Calls claude-agent-service to extract structured fields from a payslip PDF.
The agent service serializes execution (one job at a time, 409 when busy);
we back off and retry so the caller-side queue doesn't have to know.
"""
def __init__(
self,
base_url: str,
bearer_token: str,
client: httpx.AsyncClient | None = None,
):
self._base_url = base_url.rstrip("/")
self._headers = {"Authorization": f"Bearer {bearer_token}"}
self._client = client or httpx.AsyncClient(timeout=60.0)
self._owns_client = client is None
async def aclose(self) -> None:
if self._owns_client:
await self._client.aclose()
async def __aenter__(self) -> "ClaudeExtractor":
return self
async def __aexit__(self, *exc: object) -> None:
await self.aclose()
async def extract(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> ExtractedPayslip:
job_id = await self._submit_job(pdf_bytes, doc_metadata)
output_lines = await self._poll_until_done(job_id)
payload = _parse_output(output_lines)
try:
return ExtractedPayslip.model_validate(payload)
except ValidationError as exc:
raise ExtractorError(f"Extracted payload failed schema validation: {exc}") from exc
async def _submit_job(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> str:
prompt = _build_prompt(pdf_bytes)
body = {
"prompt": prompt,
"agent": AGENT_PATH,
"max_budget_usd": DEFAULT_MAX_BUDGET_USD,
"timeout_seconds": DEFAULT_TIMEOUT_SECONDS,
"metadata": {
"paperless_doc_id": doc_metadata.get("id")
},
}
for _ in range(MAX_BUSY_RETRIES):
resp = await self._client.post(f"{self._base_url}/execute",
headers=self._headers,
json=body)
if resp.status_code == 409:
await asyncio.sleep(BUSY_RETRY_DELAY_SECONDS)
continue
resp.raise_for_status()
job_id = resp.json().get("job_id")
if not isinstance(job_id, str):
raise ExtractorError(f"Missing job_id in response: {resp.json()}")
return job_id
raise ExtractorError(f"Agent service remained busy after {MAX_BUSY_RETRIES} retries")
async def _poll_until_done(self, job_id: str) -> list[str]:
max_iterations = max(1, MAX_POLL_SECONDS // max(1, POLL_INTERVAL_SECONDS))
for _ in range(max_iterations):
resp = await self._client.get(f"{self._base_url}/jobs/{job_id}", headers=self._headers)
resp.raise_for_status()
job = resp.json()
status = job.get("status")
if status in TERMINAL_STATUSES:
if status != "completed":
raise ExtractorError(f"Job {job_id} terminated with status={status}: {job}")
output = job.get("output", [])
if not isinstance(output, list):
raise ExtractorError(f"Job {job_id} output is not a list: {output!r}")
return [str(line) for line in output]
await asyncio.sleep(POLL_INTERVAL_SECONDS)
raise TimeoutError(f"Job {job_id} did not complete within {MAX_POLL_SECONDS}s")
def _parse_output(output_lines: list[str]) -> dict[str, Any]:
"""Extract the JSON payload from claude CLI --output-format json stream.
The CLI emits one JSON object per line; the final 'result' message holds the
assistant's final text. We walk from the end, parse each line, and return
the first embedded JSON object we can recover from the assistant response.
"""
non_empty = [line.strip() for line in output_lines if line.strip()]
if not non_empty:
raise ExtractorError("Agent produced no output")
for line in reversed(non_empty):
try:
parsed = json.loads(line)
except json.JSONDecodeError:
continue
text = _extract_assistant_text(parsed)
if text is None:
continue
payload = _first_json_object(text)
if payload is not None:
return payload
# Fallback: the last line itself might be the JSON object.
try:
candidate = json.loads(non_empty[-1])
except json.JSONDecodeError as exc:
raise ExtractorError(f"Could not parse JSON from agent output: {exc}") from exc
if isinstance(candidate, dict):
return candidate
raise ExtractorError(f"Last agent line is not a JSON object: {candidate!r}")
def _extract_assistant_text(parsed: Any) -> str | None:
if not isinstance(parsed, dict):
return None
result = parsed.get("result")
if parsed.get("type") == "result" and isinstance(result, str):
return result
message = parsed.get("message")
if isinstance(message, dict):
content = message.get("content")
if isinstance(content, list):
texts = [
block.get("text", "") for block in content
if isinstance(block, dict) and block.get("type") == "text"
]
combined = "".join(str(t) for t in texts)
if combined:
return combined
if isinstance(content, str):
return content
text = parsed.get("text")
if isinstance(text, str):
return text
return None
def _first_json_object(text: str) -> dict[str, Any] | None:
start = text.find("{")
while start != -1:
depth = 0
for i in range(start, len(text)):
ch = text[i]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start:i + 1]
try:
obj = json.loads(candidate)
except json.JSONDecodeError:
break
if isinstance(obj, dict):
return obj
break
start = text.find("{", start + 1)
return None