payslip-ingest/payslip_ingest/extractor.py
Viktor Barzin 9105b6b79d extractor: track rsu_vest + rsu_offset separately from cash pay
UK payslips for equity-comp employees report RSU vests as notional pay
for HMRC only. A paired same-magnitude deduction (Shares Retained /
Stock Tax Withholding / RSU Offset) nets it back out of cash. The UK
payslip's income_tax line shows tax on the grossed-up total, but the
actual RSU tax is handled by Schwab (US broker) via share sale. No
cash flows through UK payroll for RSU.

Previously the extractor folded RSU notional into gross_pay and
income_tax, which inflated the dashboard numbers — a payslip with
£25k RSU vest looked like 2x salary with 80% tax rate.

Changes:
- schema: add rsu_vest + rsu_offset fields (default 0).
- db + alembic 0002: add two new NUMERIC(12,2) columns with server
  default 0 (backward-compatible; existing rows get 0).
- validate_totals: include rsu_offset in deductions sum so the
  gross + rsu_vest inflation is properly netted out.
- extraction prompt: explicit rules for identifying RSU lines by the
  common Meta/Sage/Workday labels, and to NOT put them in
  other_deductions.

Dashboards in a follow-up commit: cash_gross = gross_pay - rsu_vest,
effective tax rate based on cash metrics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 23:37:25 +00:00

257 lines
9.7 KiB
Python

import asyncio
import base64
import json
import shutil
from typing import Any
import httpx
from pydantic import ValidationError
from payslip_ingest.schema import ExtractedPayslip
AGENT_PATH = ".claude/agents/payslip-extractor"
PDFTOTEXT_PATH = shutil.which("pdftotext")
EXTRACTION_PROMPT = (
"You are extracting fields from a UK payslip PDF. Return ONLY a single JSON object "
"matching this exact schema — no prose, no markdown fences.\n"
"\n"
"Schema:\n"
"{\n"
' "pay_date": "YYYY-MM-DD",\n'
' "pay_period_start": "YYYY-MM-DD or null",\n'
' "pay_period_end": "YYYY-MM-DD or null",\n'
' "employer": "string or null",\n'
' "currency": "GBP",\n'
' "gross_pay": number,\n'
' "income_tax": number,\n'
' "national_insurance": number,\n'
' "pension_employee": number,\n'
' "pension_employer": number,\n'
' "student_loan": number,\n'
' "rsu_vest": number,\n'
' "rsu_offset": number,\n'
' "other_deductions": {"label": number, ...},\n'
' "net_pay": number\n'
"}\n"
"\n"
"Rules:\n"
"- Report numbers as the payslip shows them; do not compute sums.\n"
"- Unknown numeric fields → 0, not null.\n"
"- `rsu_vest`: any notional/reporting entry in the EARNINGS block labelled "
'"RSU Vest", "Restricted Stock Units", "Stock Value", "Notional Pay", '
'"Share Award", "Equity Vest", "GSU Vest". For Meta UK payslips this is '
"the grossed-up RSU value reported for HMRC only; Schwab handles actual "
"tax withholding via share sale.\n"
"- `rsu_offset`: the matching DEDUCTION that nets the RSU out of cash pay — "
'labels vary: "Shares Retained", "Stock Tax Withholding", "RSU Offset", '
'"Notional Pay Offset", "Shares Withheld". For Meta this is typically equal '
"in magnitude to rsu_vest so cash net is unaffected.\n"
"- If either rsu_vest or rsu_offset is present, BOTH should be populated; "
"do NOT put them in `other_deductions`.\n"
"- `other_deductions` covers cycle-to-work, share-save, benefits-in-kind, court orders, "
"anything not in the main fields (and NOT RSU — those have dedicated fields).\n"
"- All money in GBP unless the payslip is denominated otherwise.\n"
'- If a field\'s value is ambiguous, pick the value from the "this period" column, not YTD.')
POLL_INTERVAL_SECONDS = 3
MAX_POLL_SECONDS = 600
BUSY_RETRY_DELAY_SECONDS = 10
MAX_BUSY_RETRIES = 90
DEFAULT_MAX_BUDGET_USD = 1.0
DEFAULT_TIMEOUT_SECONDS = 600
TERMINAL_STATUSES = {"completed", "failed", "timeout", "error"}
class ExtractorError(RuntimeError):
pass
def _build_prompt(pdf_bytes: bytes) -> str:
"""Shrink the prompt: prefer pdftotext output over raw base64.
Base64 of a 200KB PDF expands to ~270KB of tokens, which makes even Haiku
take 5-10 minutes per extraction. pdftotext normally yields 2-5KB of clean
text that Claude processes in seconds. We ship the PDF bytes as a fallback
only when pdftotext isn't available or fails (scanned-image PDFs, etc.).
"""
if PDFTOTEXT_PATH:
try:
import subprocess
proc = subprocess.run(
[PDFTOTEXT_PATH, "-layout", "-enc", "UTF-8", "-", "-"],
input=pdf_bytes,
capture_output=True,
timeout=30,
check=False,
)
text = proc.stdout.decode("utf-8", errors="replace").strip()
if text:
return f"{EXTRACTION_PROMPT}\n\nPAYSLIP_TEXT:\n{text}\n"
except (subprocess.SubprocessError, OSError):
pass
encoded = base64.b64encode(pdf_bytes).decode("ascii")
return f"{EXTRACTION_PROMPT}\n\nPDF_BASE64:\n{encoded}\n"
class ClaudeExtractor:
"""Calls claude-agent-service to extract structured fields from a payslip PDF.
The agent service serializes execution (one job at a time, 409 when busy);
we back off and retry so the caller-side queue doesn't have to know.
"""
def __init__(
self,
base_url: str,
bearer_token: str,
client: httpx.AsyncClient | None = None,
):
self._base_url = base_url.rstrip("/")
self._headers = {"Authorization": f"Bearer {bearer_token}"}
self._client = client or httpx.AsyncClient(timeout=60.0)
self._owns_client = client is None
async def aclose(self) -> None:
if self._owns_client:
await self._client.aclose()
async def __aenter__(self) -> "ClaudeExtractor":
return self
async def __aexit__(self, *exc: object) -> None:
await self.aclose()
async def extract(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> ExtractedPayslip:
job_id = await self._submit_job(pdf_bytes, doc_metadata)
output_lines = await self._poll_until_done(job_id)
payload = _parse_output(output_lines)
try:
return ExtractedPayslip.model_validate(payload)
except ValidationError as exc:
raise ExtractorError(f"Extracted payload failed schema validation: {exc}") from exc
async def _submit_job(self, pdf_bytes: bytes, doc_metadata: dict[str, Any]) -> str:
prompt = _build_prompt(pdf_bytes)
body = {
"prompt": prompt,
"agent": AGENT_PATH,
"max_budget_usd": DEFAULT_MAX_BUDGET_USD,
"timeout_seconds": DEFAULT_TIMEOUT_SECONDS,
"metadata": {
"paperless_doc_id": doc_metadata.get("id")
},
}
for _ in range(MAX_BUSY_RETRIES):
resp = await self._client.post(f"{self._base_url}/execute",
headers=self._headers,
json=body)
if resp.status_code == 409:
await asyncio.sleep(BUSY_RETRY_DELAY_SECONDS)
continue
resp.raise_for_status()
job_id = resp.json().get("job_id")
if not isinstance(job_id, str):
raise ExtractorError(f"Missing job_id in response: {resp.json()}")
return job_id
raise ExtractorError(f"Agent service remained busy after {MAX_BUSY_RETRIES} retries")
async def _poll_until_done(self, job_id: str) -> list[str]:
max_iterations = max(1, MAX_POLL_SECONDS // max(1, POLL_INTERVAL_SECONDS))
for _ in range(max_iterations):
resp = await self._client.get(f"{self._base_url}/jobs/{job_id}", headers=self._headers)
resp.raise_for_status()
job = resp.json()
status = job.get("status")
if status in TERMINAL_STATUSES:
if status != "completed":
raise ExtractorError(f"Job {job_id} terminated with status={status}: {job}")
output = job.get("output", [])
if not isinstance(output, list):
raise ExtractorError(f"Job {job_id} output is not a list: {output!r}")
return [str(line) for line in output]
await asyncio.sleep(POLL_INTERVAL_SECONDS)
raise TimeoutError(f"Job {job_id} did not complete within {MAX_POLL_SECONDS}s")
def _parse_output(output_lines: list[str]) -> dict[str, Any]:
"""Extract the JSON payload from claude CLI --output-format json stream.
The CLI emits one JSON object per line; the final 'result' message holds the
assistant's final text. We walk from the end, parse each line, and return
the first embedded JSON object we can recover from the assistant response.
"""
non_empty = [line.strip() for line in output_lines if line.strip()]
if not non_empty:
raise ExtractorError("Agent produced no output")
for line in reversed(non_empty):
try:
parsed = json.loads(line)
except json.JSONDecodeError:
continue
text = _extract_assistant_text(parsed)
if text is None:
continue
payload = _first_json_object(text)
if payload is not None:
return payload
# Fallback: the last line itself might be the JSON object.
try:
candidate = json.loads(non_empty[-1])
except json.JSONDecodeError as exc:
raise ExtractorError(f"Could not parse JSON from agent output: {exc}") from exc
if isinstance(candidate, dict):
return candidate
raise ExtractorError(f"Last agent line is not a JSON object: {candidate!r}")
def _extract_assistant_text(parsed: Any) -> str | None:
if not isinstance(parsed, dict):
return None
result = parsed.get("result")
if parsed.get("type") == "result" and isinstance(result, str):
return result
message = parsed.get("message")
if isinstance(message, dict):
content = message.get("content")
if isinstance(content, list):
texts = [
block.get("text", "") for block in content
if isinstance(block, dict) and block.get("type") == "text"
]
combined = "".join(str(t) for t in texts)
if combined:
return combined
if isinstance(content, str):
return content
text = parsed.get("text")
if isinstance(text, str):
return text
return None
def _first_json_object(text: str) -> dict[str, Any] | None:
start = text.find("{")
while start != -1:
depth = 0
for i in range(start, len(text)):
ch = text[i]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
candidate = text[start:i + 1]
try:
obj = json.loads(candidate)
except json.JSONDecodeError:
break
if isinstance(obj, dict):
return obj
break
start = text.find("{", start + 1)
return None