payslip-ingest/payslip_ingest/processor.py
Viktor Barzin 9105b6b79d extractor: track rsu_vest + rsu_offset separately from cash pay
UK payslips for equity-comp employees report RSU vests as notional pay
for HMRC only. A paired same-magnitude deduction (Shares Retained /
Stock Tax Withholding / RSU Offset) nets it back out of cash. The UK
payslip's income_tax line shows tax on the grossed-up total, but the
actual RSU tax is handled by Schwab (US broker) via share sale. No
cash flows through UK payroll for RSU.

Previously the extractor folded RSU notional into gross_pay and
income_tax, which inflated the dashboard numbers — a payslip with
£25k RSU vest looked like 2x salary with 80% tax rate.

Changes:
- schema: add rsu_vest + rsu_offset fields (default 0).
- db + alembic 0002: add two new NUMERIC(12,2) columns with server
  default 0 (backward-compatible; existing rows get 0).
- validate_totals: include rsu_offset in deductions sum so the
  gross + rsu_vest inflation is properly netted out.
- extraction prompt: explicit rules for identifying RSU lines by the
  common Meta/Sage/Workday labels, and to NOT put them in
  other_deductions.

Dashboards in a follow-up commit: cash_gross = gross_pay - rsu_vest,
effective tax rate based on cash metrics.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 23:37:25 +00:00

124 lines
4.4 KiB
Python

import json
import logging
import re
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, Protocol
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import Payslip
from payslip_ingest.extractor import ClaudeExtractor
from payslip_ingest.paperless import PaperlessClient
from payslip_ingest.schema import ExtractedPayslip, validate_totals
from payslip_ingest.tax_year import derive_tax_year
log = logging.getLogger(__name__)
# Paperless's `payslip` tag has drifted over the years — it gets sprinkled on
# annual summaries (P60), performance/bonus letters, RSU grants, comp-review
# letters. Those are legitimate financial docs but they aren't monthly payslips
# and including them would skew every chart (a P60 looks like a single payslip
# 12x normal size). We skip by title pattern before hitting Claude so we don't
# burn extraction budget on them either.
NON_PAYSLIP_TITLE_RE = re.compile(
r"p[\s._-]?60"
r"|performance.*(letter|year.end)|year.end.*letter"
r"|compensation[_ ]emea|\bpsc\b|comp[-_ ]?letter"
r"|rsu\s*grant",
re.IGNORECASE,
)
class _SessionFactory(Protocol):
def __call__(self) -> Any:
...
@dataclass
class ProcessResult:
doc_id: int
status: str
payslip_id: int | None = None
validated: bool | None = None
async def process_document(
doc_id: int,
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
paperless: PaperlessClient,
extractor: ClaudeExtractor,
) -> ProcessResult:
async with db_session_factory() as session:
existing = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
if existing.scalar() is not None:
log.info("skipping doc_id=%s — already ingested", doc_id)
return ProcessResult(doc_id=doc_id, status="skipped")
metadata = await paperless.get_document(doc_id)
title = (metadata.get("title") or "").strip()
if NON_PAYSLIP_TITLE_RE.search(title):
log.info("skipping doc_id=%s — title %r matches non-payslip pattern", doc_id, title)
return ProcessResult(doc_id=doc_id, status="skipped_non_payslip")
pdf_bytes = await paperless.download_document(doc_id)
extracted = await extractor.extract(pdf_bytes, metadata)
validated = validate_totals(extracted)
if not validated:
log.warning(
"totals mismatch for doc_id=%s gross=%s net=%s — storing validated=False",
doc_id,
extracted.gross_pay,
extracted.net_pay,
)
payslip_id = await _insert_payslip(db_session_factory, doc_id, extracted, validated)
status = "inserted" if payslip_id is not None else "skipped"
return ProcessResult(doc_id=doc_id, status=status, payslip_id=payslip_id, validated=validated)
async def _insert_payslip(
db_session_factory: async_sessionmaker[Any] | _SessionFactory,
doc_id: int,
extracted: ExtractedPayslip,
validated: bool,
) -> int | None:
raw = json.loads(extracted.model_dump_json())
async with db_session_factory() as session, session.begin():
existing = await session.execute(
select(Payslip.id).where(Payslip.paperless_doc_id == doc_id))
existing_id = existing.scalar()
if existing_id is not None:
return None
row = Payslip(
paperless_doc_id=doc_id,
pay_date=extracted.pay_date,
pay_period_start=extracted.pay_period_start,
pay_period_end=extracted.pay_period_end,
employer=extracted.employer,
currency=extracted.currency,
gross_pay=extracted.gross_pay,
income_tax=extracted.income_tax,
national_insurance=extracted.national_insurance,
pension_employee=extracted.pension_employee,
pension_employer=extracted.pension_employer,
student_loan=extracted.student_loan,
rsu_vest=extracted.rsu_vest,
rsu_offset=extracted.rsu_offset,
other_deductions=_decimals_to_float(extracted.other_deductions),
net_pay=extracted.net_pay,
tax_year=derive_tax_year(extracted.pay_date),
raw_extraction=raw,
validated=validated,
)
session.add(row)
await session.flush()
return row.id
def _decimals_to_float(mapping: dict[str, Decimal]) -> dict[str, float]:
return {k: float(v) for k, v in mapping.items()}