"""Regex-based Meta UK payslip parser. Meta UK payslips come in three layout variants across 2019-2026: - **Variant A** (2019-mid-2022, seen as `Facebook UK Ltd`): Single-column `Description | This Period | This Year` layout. Parenthesized negatives `(152.90)` = -152.90. Date format `Date : 31 Aug 2021`. RSU labels: `RSU Gain Taxable`, `RSU Gain Nicable`, `RSU Net Cash UK`, plus a matching `RSU Net Gain` deduction. BIK items (Private Dental/Medical, EE Discount) appear as both earnings and deductions. - **Variant C** (late-2022 - 2023, `Facebook UK Limited`): Side-by-side `Payments | Deductions | Year To Date` (capital "To"). Date format `Pay Date : 30.11.2022` (dots). `Company Name : Facebook UK Limited` preamble. RSU labels use the abbreviated `RSU Gain Taxabl` / `Nicabl` and still have the `RSU Net Gain` offset. - **Variant B** (2024+, `Facebook UK Limited`): Side-by-side `Payments | Deductions | Year to Date` (lowercase "to"). Date format `Pay Date: 27/02/2026` (slashes). RSU labels are `RSU Tax Offset` + `RSU Excs Refund`; there is NO matching offset deduction — the vest grosses up Taxable Pay and PAYE is on the grossed-up figure. Parser returns `ExtractedPayslip`. On any structural miss it raises `ParserError` so the caller falls back to ClaudeExtractor. """ import re from datetime import date, datetime from decimal import Decimal from payslip_ingest.schema import ExtractedPayslip class ParserError(ValueError): """Raised when the Meta UK template cannot be matched.""" # Two amount notations: # "-1,234.56" (slashes-era) and "(1,234.56)" (variant A parenthesized) AMOUNT_RE = re.compile(r"-?\d{1,3}(?:,\d{3})*\.\d{2}|\(\d{1,3}(?:,\d{3})*\.\d{2}\)") # Pay Date / Date — three accepted formats: # "Pay Date: 27/02/2026" # "Pay Date : 30.11.2022" # "Date : 31 Aug 2021" PAY_DATE_SLASH_RE = re.compile(r"Pay Date\s*:\s*(\d{2}/\d{2}/\d{4})") PAY_DATE_DOT_RE = re.compile(r"Pay Date\s*:\s*(\d{2}\.\d{2}\.\d{4})") PAY_DATE_WORD_RE = re.compile(r"\bDate\s*:\s*(\d{1,2}\s+[A-Za-z]{3}\s+\d{4})") PERIOD_START_RE = re.compile(r"Period Start\s*:\s*(\d{2}/\d{2}/\d{4})") PERIOD_END_RE = re.compile(r"Period End\s*:\s*(\d{2}/\d{2}/\d{4})") EMPLOYER_RE = re.compile(r"Facebook UK (?:Limited|Ltd)\b") def parse_meta_uk(text: str) -> ExtractedPayslip: if not text.strip(): raise ParserError("empty text") employer_match = EMPLOYER_RE.search(text) if not employer_match: raise ParserError("does not look like a Meta UK payslip") employer = employer_match.group(0) lines = text.splitlines() if _is_variant_b_or_c(lines): return _parse_variant_bc(text, lines, employer) if _is_variant_a(lines): return _parse_variant_a(text, lines, employer) raise ParserError("neither side-by-side nor single-column header found") def _is_variant_b_or_c(lines: list[str]) -> bool: return any("Payments" in line and "Deductions" in line and re.search(r"Year [Tt]o Date", line) for line in lines) def _is_variant_a(lines: list[str]) -> bool: return any("Description" in line and "This Period" in line and "This Year" in line for line in lines) def _to_decimal(s: str) -> Decimal: s = s.strip() if s.startswith("(") and s.endswith(")"): s = "-" + s[1:-1] return Decimal(s.replace(",", "")) def _parse_date(text: str) -> date: """Try each supported format — whichever matches first wins.""" m = PAY_DATE_SLASH_RE.search(text) if m: return datetime.strptime(m.group(1), "%d/%m/%Y").date() m = PAY_DATE_DOT_RE.search(text) if m: return datetime.strptime(m.group(1), "%d.%m.%Y").date() m = PAY_DATE_WORD_RE.search(text) if m: raw = re.sub(r"\s+", " ", m.group(1)).strip() return datetime.strptime(raw, "%d %b %Y").date() raise ParserError("pay date not found") def _find_match(text: str, pattern: re.Pattern[str]) -> str | None: m = pattern.search(text) return m.group(1) if m else None def _last_amount(segment: str) -> tuple[str, Decimal | None]: """Return (label, rightmost numeric amount).""" matches = list(AMOUNT_RE.finditer(segment)) if not matches: return segment.strip(), None last = matches[-1] label = segment[:last.start()].strip() return label, _to_decimal(last.group()) # -------------------------------------------------------------------------- # Variant B / C — side-by-side Payments | Deductions | Year to/To Date # -------------------------------------------------------------------------- PAYMENTS_KNOWN = { "Salary", "Perform Bonus", "Bonus", "AE Pension EE", "AE Pension", "RSU Tax Offset", "RSU Excs Refund", "RSU Gain Taxabl", "RSU Gain Nicabl", "RSU Gain Taxable", "RSU Gain Nicable", "RSU Net Cash", "RSU Net Cash UK", } DEDUCTIONS_KNOWN = { "Tax paid", "Tax", "Employee NIC", "National Insurance", "Student Loans", "Student Loan", "RSU Net Gain", } RSU_VEST_LABELS = { "RSU Tax Offset", "RSU Excs Refund", "RSU Gain Taxabl", "RSU Gain Nicabl", "RSU Gain Taxable", "RSU Gain Nicable", "RSU Net Cash", "RSU Net Cash UK", } def _parse_variant_bc(text: str, lines: list[str], employer: str) -> ExtractedPayslip: header_idx, d_col, y_col = _find_bc_header(lines) payments, payments_order, deductions = _collect_bc_rows(lines, header_idx, d_col, y_col) gross_pay, net_pay = _parse_bc_totals_row(lines, header_idx, d_col, y_col) summary = _parse_bc_summary_block(lines) ae_pension = payments.get("AE Pension EE", payments.get("AE Pension", Decimal("0"))) pension_sacrifice = abs(ae_pension) if ae_pension < 0 else Decimal("0") rsu_vest = sum((payments.get(label, Decimal("0")) for label in RSU_VEST_LABELS), start=Decimal("0")) rsu_offset = deductions.get("RSU Net Gain", Decimal("0")) income_tax = deductions.get("Tax paid", deductions.get("Tax", Decimal("0"))) nic = deductions.get("Employee NIC", deductions.get("National Insurance", Decimal("0"))) student_loan = deductions.get("Student Loans", deductions.get("Student Loan", Decimal("0"))) other_deductions = {k: v for k, v in deductions.items() if k not in DEDUCTIONS_KNOWN} del payments_order # retained for future debugging; not used in validation pay_date = _parse_date(text) period_start_s = _find_match(text, PERIOD_START_RE) period_end_s = _find_match(text, PERIOD_END_RE) period_start = datetime.strptime(period_start_s, "%d/%m/%Y").date() if period_start_s else None period_end = datetime.strptime(period_end_s, "%d/%m/%Y").date() if period_end_s else None return ExtractedPayslip( pay_date=pay_date, pay_period_start=period_start, pay_period_end=period_end, employer=employer, currency="GBP", gross_pay=gross_pay, income_tax=income_tax, national_insurance=nic, pension_employee=Decimal("0"), pension_employer=Decimal("0"), student_loan=student_loan, rsu_vest=rsu_vest, rsu_offset=rsu_offset, salary=payments.get("Salary", Decimal("0")), bonus=payments.get("Perform Bonus", payments.get("Bonus", Decimal("0"))), pension_sacrifice=pension_sacrifice, taxable_pay=summary.get("taxable_pay"), ytd_tax_paid=summary.get("ytd_tax_paid"), ytd_taxable_pay=summary.get("ytd_taxable_pay"), ytd_gross=summary.get("ytd_gross"), other_deductions=other_deductions, net_pay=net_pay, ) def _find_bc_header(lines: list[str]) -> tuple[int, int, int]: for i, line in enumerate(lines): if ("Payments" in line and "Deductions" in line and re.search(r"Year [Tt]o Date", line)): # Columns anchored on left edge of "Deductions" / "Year [Tt]o Date" ytd_match = re.search(r"Year [Tt]o Date", line) assert ytd_match is not None return i, line.index("Deductions"), ytd_match.start() raise ParserError("variant B/C header not found") def _collect_bc_rows( lines: list[str], header_idx: int, d_col: int, y_col: int, ) -> tuple[dict[str, Decimal], list[tuple[str, Decimal]], dict[str, Decimal]]: payments: dict[str, Decimal] = {} order: list[tuple[str, Decimal]] = [] deductions: dict[str, Decimal] = {} for i in range(header_idx + 1, len(lines)): line = lines[i].rstrip() if "Total Payment" in line: return payments, order, deductions if not line.strip(): continue p_seg = line[:d_col] if len(line) > d_col else line d_seg = line[d_col:y_col] if len(line) > d_col else "" p_label, p_amount = _last_amount(p_seg) if p_label and p_amount is not None: payments[p_label] = p_amount order.append((p_label, p_amount)) d_label, d_amount = _last_amount(d_seg) if d_label and d_amount is not None: # RSU Net Gain can show as negative on the YTD side duplication; # normalize to absolute value on the deductions side. if d_label == "RSU Net Gain": d_amount = abs(d_amount) deductions[d_label] = d_amount return payments, order, deductions def _parse_bc_totals_row( lines: list[str], header_idx: int, d_col: int, y_col: int, ) -> tuple[Decimal, Decimal]: del y_col # "Net Pay:" aligns with the Amount column, not the left edge of YTD for i in range(header_idx + 1, len(lines)): line = lines[i] if "Total Payment" not in line: continue p_seg = line[:d_col] if len(line) > d_col else line _, gross_pay = _last_amount(p_seg) net_pay_idx = line.find("Net Pay") if net_pay_idx < 0: raise ParserError("Net Pay missing from totals row") _, net_pay = _last_amount(line[net_pay_idx:]) if gross_pay is None: raise ParserError("Total Payment amount missing") if net_pay is None: raise ParserError("Net Pay amount missing from totals row") return gross_pay, net_pay raise ParserError("totals row not found") def _parse_bc_summary_block(lines: list[str]) -> dict[str, Decimal]: """Pull Taxable Pay (this period + YTD), Tax Paid (YTD), Total Gross (YTD).""" result: dict[str, Decimal] = {} for line in lines: stripped = line.lstrip() if stripped.startswith("Taxable Pay:"): nums = AMOUNT_RE.findall(line) if len(nums) >= 1: result["taxable_pay"] = _to_decimal(nums[0]) if len(nums) >= 2: result["ytd_taxable_pay"] = _to_decimal(nums[1]) elif stripped.startswith("Total Gross:"): nums = AMOUNT_RE.findall(line) if len(nums) >= 2: result["ytd_gross"] = _to_decimal(nums[1]) elif stripped.startswith("Tax Paid:"): nums = AMOUNT_RE.findall(line) if len(nums) >= 2: result["ytd_tax_paid"] = _to_decimal(nums[1]) return result # -------------------------------------------------------------------------- # Variant A — single-column Description | This Period | This Year # -------------------------------------------------------------------------- VARIANT_A_PAYMENTS_KNOWN = { "Salary", "Bonus", "Perform Bonus", "Relocation Bonus", "AE Pension EE", "AE Pension", "Laundry Expense", "Transportation Allowance", "EE Edu Assist", "RSU Gain Taxable", "RSU Gain Nicable", "RSU Gain Taxabl", "RSU Gain Nicabl", "RSU Net Cash", "RSU Net Cash UK", # BIK earnings mirrored on the deduction side — we exclude them from # bonus/other_earnings so they don't double-count. "Private Dental Insurance", "Private Medical Insurance", "EE Discount BIK", } VARIANT_A_DEDUCTIONS_KNOWN = { "Tax", "National Insurance", "Student Loans", "Student Loan", "RSU Net Gain", "EE Discount BIK", } VARIANT_A_RSU_LABELS = { "RSU Gain Taxable", "RSU Gain Nicable", "RSU Gain Taxabl", "RSU Gain Nicabl", "RSU Net Cash", "RSU Net Cash UK", } # "Taxable Pay : This Period £15323.16 : To Date £52446.53" TAXABLE_PAY_A_RE = re.compile(r"Taxable Pay\s*:\s*This Period\s*£([\d,]+\.\d{2})") NET_PAY_A_RE = re.compile(r"Net Pay\s+(-?[\d,]+\.\d{2})") def _parse_variant_a(text: str, lines: list[str], employer: str) -> ExtractedPayslip: header_idx = _find_variant_a_header(lines) payments, deductions = _collect_a_blocks(lines, header_idx) gross_pay = _parse_a_gross(lines, header_idx, payments) net_pay = _parse_a_net(text) ae_pension = payments.get("AE Pension EE", payments.get("AE Pension", Decimal("0"))) pension_sacrifice = abs(ae_pension) if ae_pension < 0 else Decimal("0") rsu_vest = sum((payments.get(label, Decimal("0")) for label in VARIANT_A_RSU_LABELS), start=Decimal("0")) rsu_offset = deductions.get("RSU Net Gain", Decimal("0")) income_tax = deductions.get("Tax", Decimal("0")) nic = deductions.get("National Insurance", Decimal("0")) student_loan = deductions.get("Student Loans", deductions.get("Student Loan", Decimal("0"))) other_deductions = {k: v for k, v in deductions.items() if k not in VARIANT_A_DEDUCTIONS_KNOWN} bonus = payments.get("Perform Bonus", payments.get("Bonus", Decimal("0"))) taxable_pay_s = _find_match(text, TAXABLE_PAY_A_RE) taxable_pay = _to_decimal(taxable_pay_s) if taxable_pay_s else None pay_date = _parse_date(text) return ExtractedPayslip( pay_date=pay_date, pay_period_start=None, pay_period_end=None, employer=employer, currency="GBP", gross_pay=gross_pay, income_tax=income_tax, national_insurance=nic, pension_employee=Decimal("0"), pension_employer=Decimal("0"), student_loan=student_loan, rsu_vest=rsu_vest, rsu_offset=rsu_offset, salary=payments.get("Salary", Decimal("0")), bonus=bonus, pension_sacrifice=pension_sacrifice, taxable_pay=taxable_pay, ytd_tax_paid=None, ytd_taxable_pay=None, ytd_gross=None, other_deductions=other_deductions, net_pay=net_pay, ) def _find_variant_a_header(lines: list[str]) -> int: for i, line in enumerate(lines): if "Description" in line and "This Period" in line and "This Year" in line: return i raise ParserError("variant A header not found") def _collect_a_blocks( lines: list[str], header_idx: int, ) -> tuple[dict[str, Decimal], dict[str, Decimal]]: """Split variant A rows into Payments vs Deductions by the two `Total` anchors. Layout: header → payments rows → `Total ` → deductions rows → `Total ` → `Net Pay `. We collect rows into whichever block we're currently in. """ payments: dict[str, Decimal] = {} deductions: dict[str, Decimal] = {} block = payments total_count = 0 for i in range(header_idx + 1, len(lines)): raw = lines[i].rstrip() if not raw.strip(): continue stripped = raw.strip() if stripped.startswith("Total ") or stripped.startswith("Total\t"): total_count += 1 if total_count == 1: block = deductions continue if total_count == 2: break if "Net Pay" in raw: break matches = list(AMOUNT_RE.finditer(raw)) if not matches: continue label = raw[:matches[0].start()].strip() if not label: continue # "This Period" value is the first amount; "This Year" is the second. # If only one amount is present, it's a YTD-only row (e.g. Relocation # Bonus which doesn't apply this period) — skip it for the period totals. if len(matches) < 2: continue amount = _to_decimal(matches[0].group()) block[label] = amount return payments, deductions def _parse_a_gross( lines: list[str], header_idx: int, payments: dict[str, Decimal], ) -> Decimal: """Pull the first `Total ` after the header — that's gross pay.""" for i in range(header_idx + 1, len(lines)): stripped = lines[i].strip() if stripped.startswith("Total "): nums = AMOUNT_RE.findall(stripped) if nums: return _to_decimal(nums[0]) # Fallback: sum payments values if the Total line is missing. if payments: return sum(payments.values(), start=Decimal("0")) raise ParserError("Total (gross pay) row not found in variant A") def _parse_a_net(text: str) -> Decimal: m = NET_PAY_A_RE.search(text) if not m: raise ParserError("Net Pay line not found in variant A") return _to_decimal(m.group(1))