## Context
v1 shipped a Claude Haiku-based extractor that validated only 10/71
backfilled rows. Haiku fumbles the arithmetic on pension salary-sacrifice,
conflates RSU vest with regular earnings, and occasionally misreads YTD
vs this-period columns — so 86% of rows land with validated=false and the
downstream dashboards under-report take-home.
Meta UK uses a stable two-variant template (pre/post 2022-01-31 boundary),
so a regex parser is both faster (ms vs. 30-90s + $0.01-0.05/call) and
more accurate. v2 introduces that parser as the primary path, keeps
Claude as the fallback for non-Meta payslips, and surfaces new fields
the dashboard needs to attribute PAYE between cash salary and RSU vests
correctly.
## This change
### Parser (new)
`payslip_ingest/parsers/meta_uk.py` detects the layout variant by header
presence:
- **Variant A** (pre-2022): vertical Description/This Period/This Year.
`AE Pension EE` is a positive deduction against a pre-sacrifice gross —
maps to `pension_employee` for the existing validation formula to hold.
- **Variant B** (post-2022): side-by-side Payments | Deductions | Year to
Date. `AE Pension EE` is NEGATIVE in Payments (salary sacrifice) — maps
to `pension_sacrifice` and is already netted into Total Payment.
`rsu_vest = RSU Tax Offset + RSU Excs Refund` (Meta's template inflates
Taxable Pay without using a matching offset deduction).
Column boundaries come from the header row's anchor positions; each data
row slices into 3 cells and the last numeric token per cell is the amount.
Anchor misses raise ParserError so the caller falls back to Claude rather
than silently returning bad data.
### New fields
Schema + DB + Claude prompt gain:
- `salary`, `bonus`, `pension_sacrifice` — earnings decomposition for the
dashboard's bonus-sacrifice visibility and earnings-breakdown chart
- `taxable_pay`, `ytd_tax_paid`, `ytd_taxable_pay`, `ytd_gross` — powers
the YTD-effective-rate method of attributing cash tax vs RSU tax, which
is the only method that's accurate month-to-month
All new columns default to 0 / null so v1 rows continue to round-trip.
### Orchestration
processor.py tries `parse_meta_uk(pdftotext(pdf))` first. On success the
result goes straight to the DB — zero Claude tokens spent, extraction in
milliseconds. On ParserError it falls through to ClaudeExtractor as before.
ProcessResult gains an `extractor` field ("meta_uk_regex" | "claude") so
backfill logs show the hit rate.
## Tests
- `test_meta_uk_parser.py` — 11 tests covering variant A, variant B
(standard + bonus month + bonus-sacrificed month), malformed inputs,
and end-to-end totals validation for all 4 golden fixtures.
- `test_processor.py` — 2 new tests proving the regex-first short-circuit
and the Claude fallback on non-Meta inputs.
Fixtures under `tests/fixtures/` are hand-crafted `pdftotext -layout`
emulations — real Meta numbers from the plan's sample payslips for
variant B, synthesized realistic variant A and bonus-sacrificed samples.
0001_initial.py reformat is yapf cleanup touched during the session's
format pass; not a behavior change.
## Test Plan
### Automated
```
$ poetry run pytest
============================= test session starts ==============================
collected 53 items
tests/test_extractor.py ..... [ 9%]
tests/test_meta_uk_parser.py ........... [ 30%]
tests/test_paperless.py ...... [ 41%]
tests/test_processor.py .............. [ 67%]
tests/test_schema.py .... [ 75%]
tests/test_tax_year.py ........ [ 90%]
tests/test_webhook.py ..... [100%]
============================== 53 passed in 1.66s ==============================
$ poetry run ruff check .
All checks passed!
$ poetry run mypy .
Success: no issues found in 24 source files
$ poetry run yapf --style pyproject.toml --diff --recursive payslip_ingest tests
(no output — all files are yapf-clean)
```
### Manual Verification
Smoke-test the parser against a real Meta payslip PDF on the deploy host:
```
# After 0003 migration applied to prod DB
$ poetry run python -c "
from payslip_ingest.parsers import parse_meta_uk
import subprocess
text = subprocess.check_output(['pdftotext', '-layout', '/path/to/real.pdf', '-']).decode()
p = parse_meta_uk(text)
print(p.model_dump_json(indent=2))
"
```
Expected: JSON with salary/bonus/rsu_vest/pension_sacrifice populated and
`validate_totals(p)` returning True.
## Reproduce locally
1. `cd payslip-ingest && poetry install`
2. `poetry run pytest tests/test_meta_uk_parser.py -v`
3. Expected: 11 tests pass, each fixture validates totals within 2p.
Closes: code-un1
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
274 lines
11 KiB
Python
274 lines
11 KiB
Python
import asyncio
|
|
import base64
|
|
import json
|
|
import shutil
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from pydantic import ValidationError
|
|
|
|
from payslip_ingest.schema import ExtractedPayslip
|
|
|
|
AGENT_PATH = ".claude/agents/payslip-extractor"
|
|
|
|
PDFTOTEXT_PATH = shutil.which("pdftotext")
|
|
|
|
EXTRACTION_PROMPT = (
|
|
"You are extracting fields from a UK payslip PDF. Return ONLY a single JSON object "
|
|
"matching this exact schema — no prose, no markdown fences.\n"
|
|
"\n"
|
|
"Schema:\n"
|
|
"{\n"
|
|
' "pay_date": "YYYY-MM-DD",\n'
|
|
' "pay_period_start": "YYYY-MM-DD or null",\n'
|
|
' "pay_period_end": "YYYY-MM-DD or null",\n'
|
|
' "employer": "string or null",\n'
|
|
' "currency": "GBP",\n'
|
|
' "gross_pay": number,\n'
|
|
' "income_tax": number,\n'
|
|
' "national_insurance": number,\n'
|
|
' "pension_employee": number,\n'
|
|
' "pension_employer": number,\n'
|
|
' "student_loan": number,\n'
|
|
' "rsu_vest": number,\n'
|
|
' "rsu_offset": number,\n'
|
|
' "salary": number,\n'
|
|
' "bonus": number,\n'
|
|
' "pension_sacrifice": number,\n'
|
|
' "taxable_pay": number or null,\n'
|
|
' "ytd_tax_paid": number or null,\n'
|
|
' "ytd_taxable_pay": number or null,\n'
|
|
' "ytd_gross": number or null,\n'
|
|
' "other_deductions": {"label": number, ...},\n'
|
|
' "net_pay": number\n'
|
|
"}\n"
|
|
"\n"
|
|
"Rules:\n"
|
|
"- Report numbers as the payslip shows them; do not compute sums.\n"
|
|
"- Unknown numeric fields → 0 (for required) or null (for nullable), not empty strings.\n"
|
|
"- `rsu_vest`: notional stock value from the EARNINGS block labelled "
|
|
'"RSU Vest", "RSU Tax Offset", "RSU Excs Refund" (sum both if present), '
|
|
'"Restricted Stock Units", "Notional Pay", "GSU Vest". For Meta UK this is '
|
|
"the grossed-up RSU value — Schwab handles the sell-to-cover via share sale.\n"
|
|
"- `rsu_offset`: the matching DEDUCTION-side entry if the template uses one "
|
|
'("Shares Retained", "Notional Pay Offset"). Meta\'s template does NOT — '
|
|
"leave as 0 for Meta.\n"
|
|
"- `salary`: basic pay line (usually labelled \"Salary\" or \"Basic Pay\").\n"
|
|
"- `bonus`: bonus line (\"Perform Bonus\", \"Bonus\", \"Performance Bonus\"). 0 if absent.\n"
|
|
"- `pension_sacrifice`: absolute value of any NEGATIVE pension line in the "
|
|
'EARNINGS/PAYMENTS block (e.g. "AE Pension EE -600.20"). This is pre-tax '
|
|
"salary-sacrifice and is already subtracted from gross. Use `pension_employee` "
|
|
"instead for any POSITIVE pension deduction on the Deductions side.\n"
|
|
"- `taxable_pay`: value from the \"Taxable Pay\" line in the summary block, "
|
|
'THIS PERIOD column. For Meta this is the post-sacrifice + RSU-grossed-up base '
|
|
"that PAYE is computed on. Null if the payslip does not surface it.\n"
|
|
"- `ytd_tax_paid`, `ytd_taxable_pay`, `ytd_gross`: YTD column values from the "
|
|
"same summary block. Null if not present.\n"
|
|
"- `other_deductions` covers cycle-to-work, share-save, private medical, court "
|
|
"orders, anything not mapped above — ONLY for lines in the Deductions column "
|
|
"of a post-2022 Meta layout or a standalone deduction on other templates. Do "
|
|
"NOT add negative Payments lines here (they are already netted into gross).\n"
|
|
"- All money in GBP unless the payslip is denominated otherwise.\n"
|
|
'- If a field\'s value is ambiguous, pick "this period" (not YTD) for the main '
|
|
"fields; use YTD only for `ytd_*` fields.")
|
|
|
|
POLL_INTERVAL_SECONDS = 3
|
|
MAX_POLL_SECONDS = 600
|
|
BUSY_RETRY_DELAY_SECONDS = 10
|
|
MAX_BUSY_RETRIES = 90
|
|
DEFAULT_MAX_BUDGET_USD = 1.0
|
|
DEFAULT_TIMEOUT_SECONDS = 600
|
|
|
|
TERMINAL_STATUSES = {"completed", "failed", "timeout", "error"}
|
|
|
|
|
|
class ExtractorError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _build_prompt(pdf_bytes: bytes) -> str:
|
|
"""Shrink the prompt: prefer pdftotext output over raw base64.
|
|
|
|
Base64 of a 200KB PDF expands to ~270KB of tokens, which makes even Haiku
|
|
take 5-10 minutes per extraction. pdftotext normally yields 2-5KB of clean
|
|
text that Claude processes in seconds. We ship the PDF bytes as a fallback
|
|
only when pdftotext isn't available or fails (scanned-image PDFs, etc.).
|
|
"""
|
|
if PDFTOTEXT_PATH:
|
|
try:
|
|
import subprocess
|
|
proc = subprocess.run(
|
|
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
|
|
input=pdf_bytes,
|
|
capture_output=True,
|
|
timeout=30,
|
|
check=False,
|
|
)
|
|
text = proc.stdout.decode("utf-8", errors="replace").strip()
|
|
if text:
|
|
return f"{EXTRACTION_PROMPT}\n\nPAYSLIP_TEXT:\n{text}\n"
|
|
except (subprocess.SubprocessError, OSError):
|
|
pass
|
|
encoded = base64.b64encode(pdf_bytes).decode("ascii")
|
|
return f"{EXTRACTION_PROMPT}\n\nPDF_BASE64:\n{encoded}\n"
|
|
|
|
|
|
class ClaudeExtractor:
|
|
"""Calls claude-agent-service to extract structured fields from a payslip PDF.
|
|
|
|
The agent service serializes execution (one job at a time, 409 when busy);
|
|
we back off and retry so the caller-side queue doesn't have to know.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
bearer_token: str,
|
|
client: httpx.AsyncClient | None = None,
|
|
):
|
|
self._base_url = base_url.rstrip("/")
|
|
self._headers = {"Authorization": f"Bearer {bearer_token}"}
|
|
self._client = client or httpx.AsyncClient(timeout=60.0)
|
|
self._owns_client = client is None
|
|
|
|
async def aclose(self) -> None:
|
|
if self._owns_client:
|
|
await self._client.aclose()
|
|
|
|
async def __aenter__(self) -> "ClaudeExtractor":
|
|
return self
|
|
|
|
async def __aexit__(self, *exc: object) -> None:
|
|
await self.aclose()
|
|
|
|
async def extract(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> ExtractedPayslip:
|
|
job_id = await self._submit_job(pdf_bytes, doc_metadata)
|
|
output_lines = await self._poll_until_done(job_id)
|
|
payload = _parse_output(output_lines)
|
|
try:
|
|
return ExtractedPayslip.model_validate(payload)
|
|
except ValidationError as exc:
|
|
raise ExtractorError(f"Extracted payload failed schema validation: {exc}") from exc
|
|
|
|
async def _submit_job(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> str:
|
|
prompt = _build_prompt(pdf_bytes)
|
|
body = {
|
|
"prompt": prompt,
|
|
"agent": AGENT_PATH,
|
|
"max_budget_usd": DEFAULT_MAX_BUDGET_USD,
|
|
"timeout_seconds": DEFAULT_TIMEOUT_SECONDS,
|
|
"metadata": {
|
|
"paperless_doc_id": doc_metadata.get("id")
|
|
},
|
|
}
|
|
for _ in range(MAX_BUSY_RETRIES):
|
|
resp = await self._client.post(f"{self._base_url}/execute",
|
|
headers=self._headers,
|
|
json=body)
|
|
if resp.status_code == 409:
|
|
await asyncio.sleep(BUSY_RETRY_DELAY_SECONDS)
|
|
continue
|
|
resp.raise_for_status()
|
|
job_id = resp.json().get("job_id")
|
|
if not isinstance(job_id, str):
|
|
raise ExtractorError(f"Missing job_id in response: {resp.json()}")
|
|
return job_id
|
|
raise ExtractorError(f"Agent service remained busy after {MAX_BUSY_RETRIES} retries")
|
|
|
|
async def _poll_until_done(self, job_id: str) -> list[str]:
|
|
max_iterations = max(1, MAX_POLL_SECONDS // max(1, POLL_INTERVAL_SECONDS))
|
|
for _ in range(max_iterations):
|
|
resp = await self._client.get(f"{self._base_url}/jobs/{job_id}", headers=self._headers)
|
|
resp.raise_for_status()
|
|
job = resp.json()
|
|
status = job.get("status")
|
|
if status in TERMINAL_STATUSES:
|
|
if status != "completed":
|
|
raise ExtractorError(f"Job {job_id} terminated with status={status}: {job}")
|
|
output = job.get("output", [])
|
|
if not isinstance(output, list):
|
|
raise ExtractorError(f"Job {job_id} output is not a list: {output!r}")
|
|
return [str(line) for line in output]
|
|
await asyncio.sleep(POLL_INTERVAL_SECONDS)
|
|
raise TimeoutError(f"Job {job_id} did not complete within {MAX_POLL_SECONDS}s")
|
|
|
|
|
|
def _parse_output(output_lines: list[str]) -> dict[str, Any]:
|
|
"""Extract the JSON payload from claude CLI --output-format json stream.
|
|
|
|
The CLI emits one JSON object per line; the final 'result' message holds the
|
|
assistant's final text. We walk from the end, parse each line, and return
|
|
the first embedded JSON object we can recover from the assistant response.
|
|
"""
|
|
non_empty = [line.strip() for line in output_lines if line.strip()]
|
|
if not non_empty:
|
|
raise ExtractorError("Agent produced no output")
|
|
|
|
for line in reversed(non_empty):
|
|
try:
|
|
parsed = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
text = _extract_assistant_text(parsed)
|
|
if text is None:
|
|
continue
|
|
payload = _first_json_object(text)
|
|
if payload is not None:
|
|
return payload
|
|
|
|
# Fallback: the last line itself might be the JSON object.
|
|
try:
|
|
candidate = json.loads(non_empty[-1])
|
|
except json.JSONDecodeError as exc:
|
|
raise ExtractorError(f"Could not parse JSON from agent output: {exc}") from exc
|
|
if isinstance(candidate, dict):
|
|
return candidate
|
|
raise ExtractorError(f"Last agent line is not a JSON object: {candidate!r}")
|
|
|
|
|
|
def _extract_assistant_text(parsed: Any) -> str | None:
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
result = parsed.get("result")
|
|
if parsed.get("type") == "result" and isinstance(result, str):
|
|
return result
|
|
message = parsed.get("message")
|
|
if isinstance(message, dict):
|
|
content = message.get("content")
|
|
if isinstance(content, list):
|
|
texts = [
|
|
block.get("text", "") for block in content
|
|
if isinstance(block, dict) and block.get("type") == "text"
|
|
]
|
|
combined = "".join(str(t) for t in texts)
|
|
if combined:
|
|
return combined
|
|
if isinstance(content, str):
|
|
return content
|
|
text = parsed.get("text")
|
|
if isinstance(text, str):
|
|
return text
|
|
return None
|
|
|
|
|
|
def _first_json_object(text: str) -> dict[str, Any] | None:
|
|
start = text.find("{")
|
|
while start != -1:
|
|
depth = 0
|
|
for i in range(start, len(text)):
|
|
ch = text[i]
|
|
if ch == "{":
|
|
depth += 1
|
|
elif ch == "}":
|
|
depth -= 1
|
|
if depth == 0:
|
|
candidate = text[start:i + 1]
|
|
try:
|
|
obj = json.loads(candidate)
|
|
except json.JSONDecodeError:
|
|
break
|
|
if isinstance(obj, dict):
|
|
return obj
|
|
break
|
|
start = text.find("{", start + 1)
|
|
return None
|