from datetime import date from decimal import Decimal from pathlib import Path from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest from payslip_ingest import processor from payslip_ingest.processor import process_document from payslip_ingest.schema import ExtractedPayslip FIXTURES = Path(__file__).parent / "fixtures" def _sample_extraction() -> ExtractedPayslip: return ExtractedPayslip( pay_date=date(2026, 3, 28), pay_period_start=date(2026, 3, 1), pay_period_end=date(2026, 3, 31), employer="Acme Ltd", currency="GBP", gross_pay=Decimal("5000.00"), income_tax=Decimal("800.00"), national_insurance=Decimal("350.00"), pension_employee=Decimal("250.00"), pension_employer=Decimal("150.00"), student_loan=Decimal("100.00"), rsu_vest=Decimal("0.00"), rsu_offset=Decimal("0.00"), other_deductions={"cycle_to_work": Decimal("50.00")}, net_pay=Decimal("3450.00"), ) class _FakeSession: """Minimal AsyncSession stand-in that records flushes and execute calls.""" def __init__(self, existing_ids: list[int | None]): self._existing_ids = existing_ids self.added: list[Any] = [] self.begin_calls = 0 async def __aenter__(self) -> "_FakeSession": return self async def __aexit__(self, *exc: object) -> None: return None def begin(self) -> "_FakeSession": self.begin_calls += 1 return self async def execute(self, stmt: Any) -> Any: result = MagicMock() # scalar() returns None when we treat the row as missing. result.scalar.return_value = self._existing_ids.pop(0) if self._existing_ids else None return result def add(self, row: Any) -> None: row.id = 1 self.added.append(row) async def flush(self) -> None: return None class _SessionFactory: def __init__(self, sessions: list[_FakeSession]): self._sessions = list(sessions) self.used: list[_FakeSession] = [] def __call__(self) -> _FakeSession: session = self._sessions.pop(0) self.used.append(session) return session @pytest.fixture() def paperless() -> AsyncMock: mock = AsyncMock() mock.get_document.return_value = {"id": 42, "title": "Payslip"} mock.download_document.return_value = b"PDFDATA" return mock @pytest.fixture() def extractor() -> AsyncMock: mock = AsyncMock() mock.extract.return_value = _sample_extraction() return mock async def test_process_document_inserts_new(paperless: AsyncMock, extractor: AsyncMock) -> None: factory = _SessionFactory([_FakeSession(existing_ids=[]), _FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert result.validated is True paperless.get_document.assert_awaited_once_with(42) paperless.download_document.assert_awaited_once_with(42) extractor.extract.assert_awaited_once() inserted_row = factory.used[1].added[0] assert inserted_row.paperless_doc_id == 42 assert inserted_row.tax_year == "2025/26" async def test_process_document_skips_existing(paperless: AsyncMock, extractor: AsyncMock) -> None: factory = _SessionFactory([_FakeSession(existing_ids=[99])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "skipped" paperless.get_document.assert_not_called() extractor.extract.assert_not_called() @pytest.mark.parametrize("title", [ "p60-meta-2025", "20001_Tax_254680_P60_2021_To_2022", "2024_Performance@_Year-end Letter_Viktor Barzin_1", "254680_Viktor_Barzin_18 Compensation_EMEA_20230311_2022 YE PSC", "2024-comp-letter", "RSU Grant Agreement 2024", ]) async def test_process_document_skips_non_payslip_by_title(paperless: AsyncMock, extractor: AsyncMock, title: str) -> None: paperless.get_document.return_value = {"id": 42, "title": title} factory = _SessionFactory([_FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "skipped_non_payslip" paperless.download_document.assert_not_called() extractor.extract.assert_not_called() @pytest.mark.parametrize("title", [ "Payslip_2026-02-27", "20001_PY_254680_Jan_2022", "UKPY_254680_31_Jul_2019", ]) async def test_process_document_keeps_real_payslips(paperless: AsyncMock, extractor: AsyncMock, title: str) -> None: paperless.get_document.return_value = {"id": 42, "title": title} factory = _SessionFactory([_FakeSession(existing_ids=[]), _FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" extractor.extract.assert_awaited_once() async def test_process_document_flags_validation_failure(paperless: AsyncMock, extractor: AsyncMock) -> None: bad = _sample_extraction() bad_dict = bad.model_dump() bad_dict["net_pay"] = Decimal("9999.00") extractor.extract.return_value = ExtractedPayslip.model_validate(bad_dict) factory = _SessionFactory([_FakeSession(existing_ids=[]), _FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert result.validated is False assert factory.used[1].added[0].validated is False async def test_regex_parser_short_circuits_claude(paperless: AsyncMock, extractor: AsyncMock, monkeypatch: pytest.MonkeyPatch) -> None: """When pdftotext output matches the Meta template, Claude must not run.""" meta_text = (FIXTURES / "meta_uk_2026_02.txt").read_text(encoding="utf-8") monkeypatch.setattr(processor, "_pdftotext", lambda _: meta_text) factory = _SessionFactory([_FakeSession(existing_ids=[]), _FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert result.validated is True assert result.extractor == "meta_uk_regex" extractor.extract.assert_not_called() # Salary / bonus / pension_sacrifice from the regex parser should land on the row. row = factory.used[1].added[0] assert row.salary == Decimal("10003.33") assert row.pension_sacrifice == Decimal("600.20") assert row.rsu_vest == Decimal("30479.76") assert row.taxable_pay == Decimal("72096.92") async def test_regex_miss_falls_back_to_claude(paperless: AsyncMock, extractor: AsyncMock, monkeypatch: pytest.MonkeyPatch) -> None: """When pdftotext output doesn't match Meta, Claude is invoked.""" monkeypatch.setattr(processor, "_pdftotext", lambda _: "Some other employer's payslip\n") factory = _SessionFactory([_FakeSession(existing_ids=[]), _FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert result.extractor == "claude" extractor.extract.assert_awaited_once() async def test_rejects_implausible_pay_date(paperless: AsyncMock, extractor: AsyncMock) -> None: """Reject 1900-01-01-style hallucinations before they poison the DB.""" bad = _sample_extraction() bad_dict = bad.model_dump() bad_dict["pay_date"] = date(1900, 1, 1) extractor.extract.return_value = ExtractedPayslip.model_validate(bad_dict) factory = _SessionFactory([_FakeSession(existing_ids=[])]) with pytest.raises(ValueError, match="implausible pay_date"): await process_document(42, factory, paperless, extractor) async def test_skips_p60_by_content_when_title_is_null(paperless: AsyncMock, extractor: AsyncMock, monkeypatch: pytest.MonkeyPatch) -> None: """P60s get the `payslip` tag sometimes, and some have no title in Paperless. The title filter can't catch them, so we also check the pdftotext output for the `P60 End of Year Certificate` signature before hitting the extractor. """ paperless.get_document.return_value = {"id": 42, "title": None} monkeypatch.setattr(processor, "_pdftotext", lambda _: "P60 End of Year Certificate\nTax year to 5 April 2021\n") factory = _SessionFactory([_FakeSession(existing_ids=[])]) result = await process_document(42, factory, paperless, extractor) assert result.status == "skipped_non_payslip" extractor.extract.assert_not_called() async def test_rejects_zero_gross_zero_net(paperless: AsyncMock, extractor: AsyncMock) -> None: """Reject the other common hallucination: all zeros on a non-payslip.""" bad = _sample_extraction() bad_dict = bad.model_dump() bad_dict["gross_pay"] = Decimal("0") bad_dict["net_pay"] = Decimal("0") extractor.extract.return_value = ExtractedPayslip.model_validate(bad_dict) factory = _SessionFactory([_FakeSession(existing_ids=[])]) with pytest.raises(ValueError, match="zero gross and net"): await process_document(42, factory, paperless, extractor) async def test_p60_tag_routes_to_p60_handler(paperless: AsyncMock, extractor: AsyncMock, monkeypatch: pytest.MonkeyPatch) -> None: """A doc carrying the P60 tag id goes to _handle_p60 (not the payslip path).""" p60_text = (FIXTURES / "meta_uk_p60_2024_25.txt").read_text(encoding="utf-8") monkeypatch.setattr(processor, "_pdftotext", lambda _: p60_text) paperless.get_document.return_value = {"id": 42, "title": "P60 2024-25", "tags": [7]} # Two sessions: one for combined dedup, one for the P60 insert. factory = _SessionFactory([ _FakeSession(existing_ids=[]), _FakeSession(existing_ids=[]), ]) result = await process_document(42, factory, paperless, extractor, p60_tag_id=7) assert result.status == "inserted" assert result.extractor == "p60_regex" assert result.p60_id == 1 # Extractor (Claude) must not be called for a P60. extractor.extract.assert_not_called() inserted_row = factory.used[1].added[0] assert inserted_row.tax_year == "2024/25" assert inserted_row.gross_pay == Decimal("232630.34") assert inserted_row.income_tax == Decimal("95820.11") async def test_p60_tag_absent_follows_payslip_path(paperless: AsyncMock, extractor: AsyncMock, monkeypatch: pytest.MonkeyPatch) -> None: """A regular payslip (no P60 tag) still goes through the payslip path.""" meta_text = (FIXTURES / "meta_uk_2026_02.txt").read_text(encoding="utf-8") monkeypatch.setattr(processor, "_pdftotext", lambda _: meta_text) paperless.get_document.return_value = {"id": 42, "title": "Payslip", "tags": [3]} factory = _SessionFactory([ _FakeSession(existing_ids=[]), _FakeSession(existing_ids=[]), ]) result = await process_document(42, factory, paperless, extractor, p60_tag_id=7) assert result.status == "inserted" assert result.extractor == "meta_uk_regex" assert result.p60_id is None async def test_bonus_dedup_zeros_repeat_within_tax_year(paperless: AsyncMock, extractor: AsyncMock) -> None: """Same bonus amount already seen in tax_year → insert as 0. Meta pays one perf bonus per tax year (2 historically). A duplicate amount usually means the extractor misread the YTD figure as current period — we keep the first occurrence and suppress subsequent matches. """ sample = _sample_extraction().model_dump() sample["bonus"] = Decimal("25000.00") extractor.extract.return_value = ExtractedPayslip.model_validate(sample) # Insert session has 2 execute calls: paperless_doc_id dedup, bonus dedup. # existing_ids=[None, 1] → paperless_doc_id not found; bonus already seen (id=1). factory = _SessionFactory([ _FakeSession(existing_ids=[]), _FakeSession(existing_ids=[None, 1]), ]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert factory.used[1].added[0].bonus == Decimal("0") async def test_bonus_dedup_keeps_first_occurrence(paperless: AsyncMock, extractor: AsyncMock) -> None: """First occurrence of a bonus in a tax_year is preserved.""" sample = _sample_extraction().model_dump() sample["bonus"] = Decimal("7777.77") extractor.extract.return_value = ExtractedPayslip.model_validate(sample) factory = _SessionFactory([ _FakeSession(existing_ids=[]), _FakeSession(existing_ids=[None, None]), ]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert factory.used[1].added[0].bonus == Decimal("7777.77") async def test_bonus_dedup_skips_when_bonus_zero(paperless: AsyncMock, extractor: AsyncMock) -> None: """Bonus=0 rows skip the dedup query — avoids a pointless DB hit.""" sample = _sample_extraction().model_dump() sample["bonus"] = Decimal("0") extractor.extract.return_value = ExtractedPayslip.model_validate(sample) # existing_ids only contains 1 entry — if dedup ran we'd exhaust it. factory = _SessionFactory([ _FakeSession(existing_ids=[]), _FakeSession(existing_ids=[None]), ]) result = await process_document(42, factory, paperless, extractor) assert result.status == "inserted" assert factory.used[1].added[0].bonus == Decimal("0")