payslip-ingest/payslip_ingest/parsers/meta_uk.py
Viktor Barzin 3b9c69bfd3 backfill: cash_income_tax back-fill for variant-A NULL rows
Phase B of RSU tax spike fix. Vest-month spikes on the dashboard trace to
variant-A slips (2019–mid-2022) where `cash_income_tax` is NULL — the
dashboard's COALESCE fallback returns full PAYE, masquerading as cash tax.

Three changes:

1. Widen variant-A Taxable Pay regex. Original pattern only matched
   `Taxable Pay : This Period £...`; add case-insensitive variants that
   tolerate missing/different colons, elided "This", and uppercase labels.
   Covers older 2019-2020 templates that failed the previous match.

2. New `backfill_cash_income_tax` module — walks every NULL-cash-tax row
   with rsu_vest > 0, re-downloads the PDF from Paperless, runs the
   widened regex parser, falls back to Claude for taxable_pay extraction
   if regex still misses, and derives cash_income_tax pro-rata. Records
   provenance in new `cash_income_tax_source` column (regex/claude/
   fallback_null). Idempotent — only touches NULL rows.

3. Migration 0006 adds the `cash_income_tax_source` audit column.

CLI: `python -m payslip_ingest backfill-cash-tax [--limit N]`. Meant to
run as a one-shot K8s Job after `alembic upgrade head`.

Part of: code-860
2026-04-19 18:15:18 +00:00

524 lines
19 KiB
Python

"""Regex-based Meta UK payslip parser.
Meta UK payslips come in three layout variants across 2019-2026:
- **Variant A** (2019-mid-2022, seen as `Facebook UK Ltd`):
Single-column `Description | This Period | This Year` layout. Parenthesized
negatives `(152.90)` = -152.90. Date format `Date : 31 Aug 2021`. RSU
labels: `RSU Gain Taxable`, `RSU Gain Nicable`, `RSU Net Cash UK`, plus a
matching `RSU Net Gain` deduction. BIK items (Private Dental/Medical,
EE Discount) appear as both earnings and deductions.
- **Variant C** (late-2022 - 2023, `Facebook UK Limited`):
Side-by-side `Payments | Deductions | Year To Date` (capital "To"). Date
format `Pay Date : 30.11.2022` (dots). `Company Name : Facebook UK Limited`
preamble. RSU labels use the abbreviated `RSU Gain Taxabl` / `Nicabl` and
still have the `RSU Net Gain` offset.
- **Variant B** (2024+, `Facebook UK Limited`):
Side-by-side `Payments | Deductions | Year to Date` (lowercase "to"). Date
format `Pay Date: 27/02/2026` (slashes). RSU labels are `RSU Tax Offset`
+ `RSU Excs Refund`; there is NO matching offset deduction — the vest
grosses up Taxable Pay and PAYE is on the grossed-up figure.
Parser returns `ExtractedPayslip`. On any structural miss it raises
`ParserError` so the caller falls back to ClaudeExtractor.
"""
import re
from datetime import date, datetime
from decimal import Decimal
from payslip_ingest.schema import ExtractedPayslip
class ParserError(ValueError):
"""Raised when the Meta UK template cannot be matched."""
# Two amount notations:
# "-1,234.56" (slashes-era) and "(1,234.56)" (variant A parenthesized)
AMOUNT_RE = re.compile(r"-?\d{1,3}(?:,\d{3})*\.\d{2}|\(\d{1,3}(?:,\d{3})*\.\d{2}\)")
# Pay Date / Date — three accepted formats:
# "Pay Date: 27/02/2026"
# "Pay Date : 30.11.2022"
# "Date : 31 Aug 2021"
PAY_DATE_SLASH_RE = re.compile(r"Pay Date\s*:\s*(\d{2}/\d{2}/\d{4})")
PAY_DATE_DOT_RE = re.compile(r"Pay Date\s*:\s*(\d{2}\.\d{2}\.\d{4})")
PAY_DATE_WORD_RE = re.compile(r"\bDate\s*:\s*(\d{1,2}\s+[A-Za-z]{3}\s+\d{4})")
PERIOD_START_RE = re.compile(r"Period Start\s*:\s*(\d{2}/\d{2}/\d{4})")
PERIOD_END_RE = re.compile(r"Period End\s*:\s*(\d{2}/\d{2}/\d{4})")
EMPLOYER_RE = re.compile(r"Facebook UK (?:Limited|Ltd)\b")
def parse_meta_uk(text: str) -> ExtractedPayslip:
if not text.strip():
raise ParserError("empty text")
employer_match = EMPLOYER_RE.search(text)
if not employer_match:
raise ParserError("does not look like a Meta UK payslip")
employer = employer_match.group(0)
lines = text.splitlines()
if _is_variant_b_or_c(lines):
return _parse_variant_bc(text, lines, employer)
if _is_variant_a(lines):
return _parse_variant_a(text, lines, employer)
raise ParserError("neither side-by-side nor single-column header found")
def _is_variant_b_or_c(lines: list[str]) -> bool:
return any("Payments" in line and "Deductions" in line and re.search(r"Year [Tt]o Date", line)
for line in lines)
def _is_variant_a(lines: list[str]) -> bool:
return any("Description" in line and "This Period" in line and "This Year" in line
for line in lines)
def _to_decimal(s: str) -> Decimal:
s = s.strip()
if s.startswith("(") and s.endswith(")"):
s = "-" + s[1:-1]
return Decimal(s.replace(",", ""))
def _parse_date(text: str) -> date:
"""Try each supported format — whichever matches first wins."""
m = PAY_DATE_SLASH_RE.search(text)
if m:
return datetime.strptime(m.group(1), "%d/%m/%Y").date()
m = PAY_DATE_DOT_RE.search(text)
if m:
return datetime.strptime(m.group(1), "%d.%m.%Y").date()
m = PAY_DATE_WORD_RE.search(text)
if m:
raw = re.sub(r"\s+", " ", m.group(1)).strip()
return datetime.strptime(raw, "%d %b %Y").date()
raise ParserError("pay date not found")
def _find_match(text: str, pattern: re.Pattern[str]) -> str | None:
m = pattern.search(text)
return m.group(1) if m else None
def _last_amount(segment: str) -> tuple[str, Decimal | None]:
"""Return (label, rightmost numeric amount)."""
matches = list(AMOUNT_RE.finditer(segment))
if not matches:
return segment.strip(), None
last = matches[-1]
label = segment[:last.start()].strip()
return label, _to_decimal(last.group())
# --------------------------------------------------------------------------
# Variant B / C — side-by-side Payments | Deductions | Year to/To Date
# --------------------------------------------------------------------------
PAYMENTS_KNOWN = {
"Salary",
"Perform Bonus",
"Bonus",
"AE Pension EE",
"AE Pension",
"RSU Tax Offset",
"RSU Excs Refund",
"RSU Gain Taxabl",
"RSU Gain Nicabl",
"RSU Gain Taxable",
"RSU Gain Nicable",
"RSU Net Cash",
"RSU Net Cash UK",
}
DEDUCTIONS_KNOWN = {
"Tax paid",
"Tax",
"Employee NIC",
"National Insurance",
"Student Loans",
"Student Loan",
"RSU Net Gain",
}
RSU_VEST_LABELS = {
"RSU Tax Offset",
"RSU Excs Refund",
"RSU Gain Taxabl",
"RSU Gain Nicabl",
"RSU Gain Taxable",
"RSU Gain Nicable",
"RSU Net Cash",
"RSU Net Cash UK",
}
def _parse_variant_bc(text: str, lines: list[str], employer: str) -> ExtractedPayslip:
header_idx, d_col, y_col = _find_bc_header(lines)
payments, payments_order, deductions, ytd = _collect_bc_rows(lines, header_idx, d_col, y_col)
gross_pay, net_pay = _parse_bc_totals_row(lines, header_idx, d_col, y_col)
summary = _parse_bc_summary_block(lines)
ae_pension = payments.get("AE Pension EE", payments.get("AE Pension", Decimal("0")))
pension_sacrifice = abs(ae_pension) if ae_pension < 0 else Decimal("0")
rsu_vest = sum((payments.get(label, Decimal("0")) for label in RSU_VEST_LABELS),
start=Decimal("0"))
rsu_offset = deductions.get("RSU Net Gain", Decimal("0"))
income_tax = deductions.get("Tax paid", deductions.get("Tax", Decimal("0")))
nic = deductions.get("Employee NIC", deductions.get("National Insurance", Decimal("0")))
student_loan = deductions.get("Student Loans", deductions.get("Student Loan", Decimal("0")))
taxable_pay = summary.get("taxable_pay")
cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay)
other_deductions = {k: v for k, v in deductions.items() if k not in DEDUCTIONS_KNOWN}
del payments_order # retained for future debugging; not used in validation
pay_date = _parse_date(text)
period_start_s = _find_match(text, PERIOD_START_RE)
period_end_s = _find_match(text, PERIOD_END_RE)
period_start = datetime.strptime(period_start_s, "%d/%m/%Y").date() if period_start_s else None
period_end = datetime.strptime(period_end_s, "%d/%m/%Y").date() if period_end_s else None
return ExtractedPayslip(
pay_date=pay_date,
pay_period_start=period_start,
pay_period_end=period_end,
employer=employer,
currency="GBP",
gross_pay=gross_pay,
income_tax=income_tax,
national_insurance=nic,
pension_employee=Decimal("0"),
pension_employer=Decimal("0"),
student_loan=student_loan,
rsu_vest=rsu_vest,
rsu_offset=rsu_offset,
salary=payments.get("Salary", Decimal("0")),
bonus=payments.get("Perform Bonus", payments.get("Bonus", Decimal("0"))),
pension_sacrifice=pension_sacrifice,
taxable_pay=taxable_pay,
ytd_tax_paid=summary.get("ytd_tax_paid"),
ytd_taxable_pay=summary.get("ytd_taxable_pay"),
ytd_gross=summary.get("ytd_gross"),
cash_income_tax=cash_income_tax,
ytd_rsu_tax_offset=ytd.get("RSU Tax Offset"),
ytd_rsu_excs_refund=ytd.get("RSU Excs Refund"),
other_deductions=other_deductions,
net_pay=net_pay,
)
def _cash_income_tax(
income_tax: Decimal,
gross_pay: Decimal,
pension_sacrifice: Decimal,
taxable_pay: Decimal | None,
) -> Decimal:
"""Derived pro-rata PAYE attributable to cash pay.
Meta variant-B grosses up Taxable Pay for RSU and computes PAYE on the
grossed-up figure, so `income_tax` on the slip is total PAYE (cash + RSU).
The cash-attributable share is `income_tax * cash_base / taxable_pay`,
where `cash_base = gross_pay - pension_sacrifice`.
Variant A doesn't surface `taxable_pay` — fall back to the full figure
(it predates the variant-B grossing behaviour anyway).
"""
if taxable_pay is None or taxable_pay == 0:
return income_tax
return (income_tax * (gross_pay - pension_sacrifice) / taxable_pay).quantize(Decimal("0.01"))
def _find_bc_header(lines: list[str]) -> tuple[int, int, int]:
for i, line in enumerate(lines):
if ("Payments" in line and "Deductions" in line and re.search(r"Year [Tt]o Date", line)):
# Columns anchored on left edge of "Deductions" / "Year [Tt]o Date"
ytd_match = re.search(r"Year [Tt]o Date", line)
assert ytd_match is not None
return i, line.index("Deductions"), ytd_match.start()
raise ParserError("variant B/C header not found")
def _collect_bc_rows(
lines: list[str],
header_idx: int,
d_col: int,
y_col: int,
) -> tuple[dict[str, Decimal], list[tuple[str, Decimal]], dict[str, Decimal], dict[str, Decimal]]:
payments: dict[str, Decimal] = {}
order: list[tuple[str, Decimal]] = []
deductions: dict[str, Decimal] = {}
ytd: dict[str, Decimal] = {}
for i in range(header_idx + 1, len(lines)):
line = lines[i].rstrip()
if "Total Payment" in line:
return payments, order, deductions, ytd
if not line.strip():
continue
p_seg = line[:d_col] if len(line) > d_col else line
d_seg = line[d_col:y_col] if len(line) > d_col else ""
y_seg = line[y_col:] if len(line) > y_col else ""
p_label, p_amount = _last_amount(p_seg)
if p_label and p_amount is not None:
payments[p_label] = p_amount
order.append((p_label, p_amount))
d_label, d_amount = _last_amount(d_seg)
if d_label and d_amount is not None:
# RSU Net Gain can show as negative on the YTD side duplication;
# normalize to absolute value on the deductions side.
if d_label == "RSU Net Gain":
d_amount = abs(d_amount)
deductions[d_label] = d_amount
y_label, y_amount = _last_amount(y_seg)
if y_label and y_amount is not None:
ytd[y_label] = y_amount
return payments, order, deductions, ytd
def _parse_bc_totals_row(
lines: list[str],
header_idx: int,
d_col: int,
y_col: int,
) -> tuple[Decimal, Decimal]:
del y_col # "Net Pay:" aligns with the Amount column, not the left edge of YTD
for i in range(header_idx + 1, len(lines)):
line = lines[i]
if "Total Payment" not in line:
continue
p_seg = line[:d_col] if len(line) > d_col else line
_, gross_pay = _last_amount(p_seg)
net_pay_idx = line.find("Net Pay")
if net_pay_idx < 0:
raise ParserError("Net Pay missing from totals row")
_, net_pay = _last_amount(line[net_pay_idx:])
if gross_pay is None:
raise ParserError("Total Payment amount missing")
if net_pay is None:
raise ParserError("Net Pay amount missing from totals row")
return gross_pay, net_pay
raise ParserError("totals row not found")
def _parse_bc_summary_block(lines: list[str]) -> dict[str, Decimal]:
"""Pull Taxable Pay (this period + YTD), Tax Paid (YTD), Total Gross (YTD)."""
result: dict[str, Decimal] = {}
for line in lines:
stripped = line.lstrip()
if stripped.startswith("Taxable Pay:"):
nums = AMOUNT_RE.findall(line)
if len(nums) >= 1:
result["taxable_pay"] = _to_decimal(nums[0])
if len(nums) >= 2:
result["ytd_taxable_pay"] = _to_decimal(nums[1])
elif stripped.startswith("Total Gross:"):
nums = AMOUNT_RE.findall(line)
if len(nums) >= 2:
result["ytd_gross"] = _to_decimal(nums[1])
elif stripped.startswith("Tax Paid:"):
nums = AMOUNT_RE.findall(line)
if len(nums) >= 2:
result["ytd_tax_paid"] = _to_decimal(nums[1])
return result
# --------------------------------------------------------------------------
# Variant A — single-column Description | This Period | This Year
# --------------------------------------------------------------------------
VARIANT_A_PAYMENTS_KNOWN = {
"Salary",
"Bonus",
"Perform Bonus",
"Relocation Bonus",
"AE Pension EE",
"AE Pension",
"Laundry Expense",
"Transportation Allowance",
"EE Edu Assist",
"RSU Gain Taxable",
"RSU Gain Nicable",
"RSU Gain Taxabl",
"RSU Gain Nicabl",
"RSU Net Cash",
"RSU Net Cash UK",
# BIK earnings mirrored on the deduction side — we exclude them from
# bonus/other_earnings so they don't double-count.
"Private Dental Insurance",
"Private Medical Insurance",
"EE Discount BIK",
}
VARIANT_A_DEDUCTIONS_KNOWN = {
"Tax",
"National Insurance",
"Student Loans",
"Student Loan",
"RSU Net Gain",
}
VARIANT_A_RSU_LABELS = {
"RSU Gain Taxable",
"RSU Gain Nicable",
"RSU Gain Taxabl",
"RSU Gain Nicabl",
"RSU Net Cash",
"RSU Net Cash UK",
}
# Variant A Taxable Pay line — multiple template variants:
# "Taxable Pay : This Period £15323.16 : To Date £52446.53" (canonical post-2021)
# "Taxable Pay This Period £1234.56" (older, no colons)
# "TAXABLE PAY : This Period £1234.56" (uppercase on some 2019-2020 slips)
# "Taxable Pay : Period £1234.56" ("This" elided)
# Case-insensitive, tolerant of separators. Ordered most-specific first.
TAXABLE_PAY_A_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"Taxable\s+Pay\s*[:\s]+(?:This\s+)?Period\s*£([\d,]+\.\d{2})", re.IGNORECASE),
re.compile(r"Taxable\s+Pay\s*[:\s]+£([\d,]+\.\d{2})", re.IGNORECASE),
]
NET_PAY_A_RE = re.compile(r"Net Pay\s+(-?[\d,]+\.\d{2})")
def _match_variant_a_taxable_pay(line: str) -> Decimal | None:
"""Try each variant-A Taxable Pay pattern in order — first match wins."""
for pat in TAXABLE_PAY_A_PATTERNS:
m = pat.search(line)
if m:
return _to_decimal(m.group(1))
return None
def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPayslip:
header_idx = _find_variant_a_header(lines)
payments, deductions = _collect_a_blocks(lines, header_idx)
gross_pay = _parse_a_gross(lines, header_idx, payments)
net_pay = _parse_a_net(text)
ae_pension = payments.get("AE Pension EE", payments.get("AE Pension", Decimal("0")))
pension_sacrifice = abs(ae_pension) if ae_pension < 0 else Decimal("0")
rsu_vest = sum((payments.get(label, Decimal("0")) for label in VARIANT_A_RSU_LABELS),
start=Decimal("0"))
rsu_offset = deductions.get("RSU Net Gain", Decimal("0"))
income_tax = deductions.get("Tax", Decimal("0"))
nic = deductions.get("National Insurance", Decimal("0"))
student_loan = deductions.get("Student Loans", deductions.get("Student Loan", Decimal("0")))
other_deductions = {k: v for k, v in deductions.items() if k not in VARIANT_A_DEDUCTIONS_KNOWN}
bonus = payments.get("Perform Bonus", payments.get("Bonus", Decimal("0")))
taxable_pay = _match_variant_a_taxable_pay(text)
cash_income_tax = _cash_income_tax(income_tax, gross_pay, pension_sacrifice, taxable_pay)
pay_date = _parse_date(text)
return ExtractedPayslip(
pay_date=pay_date,
pay_period_start=None,
pay_period_end=None,
employer=employer,
currency="GBP",
gross_pay=gross_pay,
income_tax=income_tax,
national_insurance=nic,
pension_employee=Decimal("0"),
pension_employer=Decimal("0"),
student_loan=student_loan,
rsu_vest=rsu_vest,
rsu_offset=rsu_offset,
salary=payments.get("Salary", Decimal("0")),
bonus=bonus,
pension_sacrifice=pension_sacrifice,
taxable_pay=taxable_pay,
ytd_tax_paid=None,
ytd_taxable_pay=None,
ytd_gross=None,
cash_income_tax=cash_income_tax,
ytd_rsu_tax_offset=None,
ytd_rsu_excs_refund=None,
other_deductions=other_deductions,
net_pay=net_pay,
)
def _find_variant_a_header(lines: list[str]) -> int:
for i, line in enumerate(lines):
if "Description" in line and "This Period" in line and "This Year" in line:
return i
raise ParserError("variant A header not found")
def _collect_a_blocks(
lines: list[str],
header_idx: int,
) -> tuple[dict[str, Decimal], dict[str, Decimal]]:
"""Split variant A rows into Payments vs Deductions by the two `Total` anchors.
Layout: header → payments rows → `Total <gross>` → deductions rows →
`Total <deductions>` → `Net Pay <net>`. We collect rows into whichever
block we're currently in.
"""
payments: dict[str, Decimal] = {}
deductions: dict[str, Decimal] = {}
block = payments
total_count = 0
for i in range(header_idx + 1, len(lines)):
raw = lines[i].rstrip()
if not raw.strip():
continue
stripped = raw.strip()
if stripped.startswith("Total ") or stripped.startswith("Total\t"):
total_count += 1
if total_count == 1:
block = deductions
continue
if total_count == 2:
break
if "Net Pay" in raw:
break
matches = list(AMOUNT_RE.finditer(raw))
if not matches:
continue
label = raw[:matches[0].start()].strip()
if not label:
continue
# "This Period" value is the first amount; "This Year" is the second.
# If only one amount is present, it's a YTD-only row (e.g. Relocation
# Bonus which doesn't apply this period) — skip it for the period totals.
if len(matches) < 2:
continue
amount = _to_decimal(matches[0].group())
block[label] = amount
return payments, deductions
def _parse_a_gross(
lines: list[str],
header_idx: int,
payments: dict[str, Decimal],
) -> Decimal:
"""Pull the first `Total <amount>` after the header — that's gross pay."""
for i in range(header_idx + 1, len(lines)):
stripped = lines[i].strip()
if stripped.startswith("Total "):
nums = AMOUNT_RE.findall(stripped)
if nums:
return _to_decimal(nums[0])
# Fallback: sum payments values if the Total line is missing.
if payments:
return sum(payments.values(), start=Decimal("0"))
raise ParserError("Total (gross pay) row not found in variant A")
def _parse_a_net(text: str) -> Decimal:
m = NET_PAY_A_RE.search(text)
if not m:
raise ParserError("Net Pay line not found in variant A")
return _to_decimal(m.group(1))