diff --git a/alembic/versions/0007_external_meta_deposits.py b/alembic/versions/0007_external_meta_deposits.py new file mode 100644 index 0000000..3d49d8a --- /dev/null +++ b/alembic/versions/0007_external_meta_deposits.py @@ -0,0 +1,52 @@ +"""Add external_meta_deposits for ActualBudget payroll reconciliation. + +Daily sync pulls Meta payroll deposits from ActualBudget (the +jhonderson/actual-http-api sidecar) so the dashboard can overlay bank- +deposit reality with `payslip.net_pay`. If the delta exceeds a tolerance +threshold, the payslip parser likely got the net_pay wrong — useful for +catching parser regressions without manual audit. + +Idempotent on `actualbudget_tx_id` — rerunning the sync only inserts new +transactions. Deletions in ActualBudget are NOT propagated here (append- +only — the audit trail matters more than a live mirror). +""" +import sqlalchemy as sa + +from alembic import op + +revision = "0007" +down_revision = "0006" +branch_labels = None +depends_on = None + +SCHEMA = "payslip_ingest" + + +def upgrade() -> None: + op.create_table( + "external_meta_deposits", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("actualbudget_tx_id", sa.String(), nullable=False, unique=True), + sa.Column("deposit_date", sa.Date(), nullable=False), + sa.Column("amount", sa.Numeric(12, 2), nullable=False), + sa.Column("payee", sa.String(), nullable=True), + sa.Column("memo", sa.String(), nullable=True), + sa.Column("synced_at", + sa.TIMESTAMP(timezone=True), + server_default=sa.text("now()"), + nullable=False), + schema=SCHEMA, + ) + op.create_index( + "ix_external_meta_deposits_deposit_date", + "external_meta_deposits", + ["deposit_date"], + schema=SCHEMA, + ) + + +def downgrade() -> None: + op.drop_index("ix_external_meta_deposits_deposit_date", + table_name="external_meta_deposits", + schema=SCHEMA) + op.drop_table("external_meta_deposits", schema=SCHEMA) diff --git a/payslip_ingest/__main__.py b/payslip_ingest/__main__.py index 1383907..1432d8e 100644 --- a/payslip_ingest/__main__.py +++ b/payslip_ingest/__main__.py @@ -116,6 +116,39 @@ def migrate() -> None: sys.exit(result.returncode) +@cli.command("sync-meta-deposits") +def sync_meta_deposits_cmd() -> None: + """Pull Meta payroll deposits from ActualBudget into external_meta_deposits. + + Reads from the jhonderson/actual-http-api sidecar. Requires env vars: + ACTUALBUDGET_HTTP_API_URL, ACTUALBUDGET_API_KEY, + ACTUALBUDGET_ENCRYPTION_PASSWORD, ACTUALBUDGET_BUDGET_SYNC_ID. + """ + asyncio.run(_sync_meta_deposits()) + + +async def _sync_meta_deposits() -> None: + from payslip_ingest.sync.actualbudget import ActualBudgetClient, sync_meta_deposits + + engine = create_engine_from_env() + session_factory = make_session_factory(engine) + client = ActualBudgetClient( + base_url=os.environ["ACTUALBUDGET_HTTP_API_URL"], + api_key=os.environ["ACTUALBUDGET_API_KEY"], + encryption_password=os.environ["ACTUALBUDGET_ENCRYPTION_PASSWORD"], + budget_sync_id=os.environ["ACTUALBUDGET_BUDGET_SYNC_ID"], + ) + try: + result = await sync_meta_deposits(client, session_factory) + click.echo(f"sync complete: accounts={result.accounts_scanned} " + f"transactions={result.transactions_fetched} " + f"meta_matched={result.meta_deposits_matched} " + f"inserted={result.inserted} existing={result.skipped_existing}") + finally: + await client.aclose() + await engine.dispose() + + @cli.command("backfill-cash-tax") @click.option("--limit", type=int, default=None, help="Cap the number of rows processed.") def backfill_cash_tax(limit: int | None) -> None: diff --git a/payslip_ingest/db.py b/payslip_ingest/db.py index eaeaa6d..faf9ceb 100644 --- a/payslip_ingest/db.py +++ b/payslip_ingest/db.py @@ -77,6 +77,28 @@ class Payslip(Base): server_default=text("now()")) +class ExternalMetaDeposit(Base): + """Meta payroll deposit as recorded by ActualBudget — ground-truth against + `payslip.net_pay`. Synced daily by a CronJob that reads from the + jhonderson/actual-http-api sidecar. + + Idempotent on `actualbudget_tx_id` — same transaction id from AB means the + same deposit, re-runs are no-ops. Deletions in AB are not propagated. + """ + __tablename__ = "external_meta_deposits" + __table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012 + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + actualbudget_tx_id: Mapped[str] = mapped_column(String, unique=True, nullable=False) + deposit_date: Mapped[date] = mapped_column(Date, nullable=False) + amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False) + payee: Mapped[str | None] = mapped_column(String, nullable=True) + memo: Mapped[str | None] = mapped_column(String, nullable=True) + synced_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=text("now()")) + + class P60Reference(Base): """HMRC-issued annual P60. One row per (tax_year, employer). diff --git a/payslip_ingest/sync/__init__.py b/payslip_ingest/sync/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/payslip_ingest/sync/actualbudget.py b/payslip_ingest/sync/actualbudget.py new file mode 100644 index 0000000..6ed58f7 --- /dev/null +++ b/payslip_ingest/sync/actualbudget.py @@ -0,0 +1,206 @@ +"""ActualBudget HTTP API client — pull Meta payroll deposits. + +Reads from the jhonderson/actual-http-api sidecar in the actualbudget +namespace. Looks up accounts on the given budget, enumerates all +transactions across them, keeps only transactions whose payee name +matches a Meta pattern (META, FACEBOOK, META PLATFORMS etc.). + +Idempotent: each sync run upserts on `actualbudget_tx_id`; existing rows +are untouched. Deletions in ActualBudget are NOT propagated. +""" +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from datetime import date, datetime +from decimal import Decimal +from typing import Any + +import httpx +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from sqlalchemy.ext.asyncio import async_sessionmaker + +from payslip_ingest.db import ExternalMetaDeposit + +log = logging.getLogger(__name__) + +# Payee pattern. ActualBudget normalizes payee strings but the raw bank +# description can include country code / spacing variants. Match the +# common forms observed in the viktor budget. +META_PAYEE_RE = re.compile(r"\b(META|FACEBOOK)\b", re.IGNORECASE) + + +class ActualBudgetError(RuntimeError): + pass + + +@dataclass +class SyncResult: + accounts_scanned: int = 0 + transactions_fetched: int = 0 + meta_deposits_matched: int = 0 + inserted: int = 0 + skipped_existing: int = 0 + + +class ActualBudgetClient: + """Narrow client for the jhonderson/actual-http-api endpoints we need.""" + + def __init__( + self, + base_url: str, + api_key: str, + encryption_password: str, + budget_sync_id: str, + client: httpx.AsyncClient | None = None, + ): + self._base_url = base_url.rstrip("/") + self._budget = budget_sync_id + self._headers = { + "accept": "application/json", + "x-api-key": api_key, + "budget-encryption-password": encryption_password, + } + self._client = client or httpx.AsyncClient(timeout=60.0) + self._owns_client = client is None + + async def aclose(self) -> None: + if self._owns_client: + await self._client.aclose() + + async def __aenter__(self) -> ActualBudgetClient: + return self + + async def __aexit__(self, *exc: object) -> None: + await self.aclose() + + async def list_accounts(self) -> list[dict[str, Any]]: + resp = await self._client.get( + f"{self._base_url}/v1/budgets/{self._budget}/accounts", + headers=self._headers, + ) + resp.raise_for_status() + data = resp.json().get("data", []) + if not isinstance(data, list): + raise ActualBudgetError(f"accounts response not a list: {data!r}") + return data + + async def list_transactions(self, account_id: str) -> list[dict[str, Any]]: + """List all transactions for an account. + + jhonderson/actual-http-api GET endpoint may return `data` as a list. + If the endpoint is missing (older image), surface a clear error so + the operator can switch to the SQLite-mount fallback. + """ + resp = await self._client.get( + f"{self._base_url}/v1/budgets/{self._budget}/accounts/{account_id}/transactions", + headers=self._headers, + ) + if resp.status_code == 404: + raise ActualBudgetError( + "transaction-list endpoint not found — the http-api image may be too old; " + "fall back to reading SQLite directly") + resp.raise_for_status() + data = resp.json().get("data", []) + if not isinstance(data, list): + raise ActualBudgetError(f"transactions response not a list: {data!r}") + return data + + +async def sync_meta_deposits( + client: ActualBudgetClient, + db_session_factory: async_sessionmaker[Any], +) -> SyncResult: + """Enumerate every transaction across every account, upsert Meta deposits.""" + accounts = await client.list_accounts() + result = SyncResult(accounts_scanned=len(accounts)) + + for account in accounts: + account_id = account.get("id") + if not isinstance(account_id, str): + log.warning("skipping account without id: %r", account) + continue + + txs = await client.list_transactions(account_id) + result.transactions_fetched += len(txs) + + for tx in txs: + if not _is_meta_deposit(tx): + continue + result.meta_deposits_matched += 1 + + was_new = await _upsert(db_session_factory, tx) + if was_new: + result.inserted += 1 + else: + result.skipped_existing += 1 + + return result + + +def _is_meta_deposit(tx: dict[str, Any]) -> bool: + """Positive deposit (credit) where payee contains META / FACEBOOK.""" + amount = tx.get("amount") + if not isinstance(amount, int | float): + return False + # ActualBudget stores amounts in cents (int); positive = incoming. + if amount <= 0: + return False + payee = tx.get("payee_name") or tx.get("payee") or "" + if not isinstance(payee, str): + return False + return bool(META_PAYEE_RE.search(payee)) + + +async def _upsert( + db_session_factory: async_sessionmaker[Any], + tx: dict[str, Any], +) -> bool: + """Insert the row; return True if newly inserted, False if it already existed. + + Uses a dialect-aware ON CONFLICT DO NOTHING upsert — Postgres in prod and + SQLite in tests both support this. + """ + tx_id = tx["id"] + amount_cents = int(tx["amount"]) + amount = (Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01")) + deposit_date = _parse_date(tx["date"]) + payee = tx.get("payee_name") or tx.get("payee") or None + memo = tx.get("notes") or tx.get("memo") or None + + async with db_session_factory() as session: + existing = await session.execute( + select(ExternalMetaDeposit.id).where( + ExternalMetaDeposit.actualbudget_tx_id == tx_id)) + if existing.scalar() is not None: + return False + + async with db_session_factory() as session, session.begin(): + bind = session.bind + dialect = bind.dialect.name if bind is not None else "postgresql" + stmt_cls = pg_insert if dialect == "postgresql" else sqlite_insert + stmt = stmt_cls(ExternalMetaDeposit).values( + actualbudget_tx_id=tx_id, + deposit_date=deposit_date, + amount=amount, + payee=payee, + memo=memo, + ).on_conflict_do_nothing(index_elements=[ExternalMetaDeposit.actualbudget_tx_id]) + await session.execute(stmt) + return True + + +def _parse_date(raw: str) -> date: + return datetime.strptime(raw, "%Y-%m-%d").date() + + +__all__ = [ + "ActualBudgetClient", + "ActualBudgetError", + "META_PAYEE_RE", + "SyncResult", + "sync_meta_deposits", +] diff --git a/tests/test_sync_actualbudget.py b/tests/test_sync_actualbudget.py new file mode 100644 index 0000000..586dcf1 --- /dev/null +++ b/tests/test_sync_actualbudget.py @@ -0,0 +1,179 @@ +"""Unit tests for the ActualBudget sync client.""" +from collections.abc import AsyncIterator +from datetime import UTC, date, datetime +from decimal import Decimal +from typing import Any + +import httpx +import pytest +import respx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine + +from payslip_ingest.db import Base, ExternalMetaDeposit +from payslip_ingest.sync.actualbudget import ( + META_PAYEE_RE, + ActualBudgetClient, + ActualBudgetError, + sync_meta_deposits, +) + +BASE_URL = "http://budget-http-api-viktor.actualbudget.svc.cluster.local" +BUDGET_ID = "sync-id-viktor" + + +def test_meta_payee_regex_matches_canonical() -> None: + assert META_PAYEE_RE.search("META PLATFORMS IRELAND") + assert META_PAYEE_RE.search("Facebook UK Ltd") + assert META_PAYEE_RE.search("meta") + + +def test_meta_payee_regex_rejects_other() -> None: + assert not META_PAYEE_RE.search("Amazon") + assert not META_PAYEE_RE.search("Metamask") # requires word boundary + assert not META_PAYEE_RE.search("facebookish") + + +def _register_sqlite_now(dbapi_conn: Any, _: Any) -> None: + """SQLite doesn't have now() — map it to datetime('now') so the Postgres + `server_default=text("now()")` on the ORM models works in tests.""" + dbapi_conn.create_function("now", 0, lambda: datetime.now(UTC).isoformat(" ")) + + +@pytest.fixture +async def session_factory() -> AsyncIterator[async_sessionmaker[Any]]: + engine: AsyncEngine = create_async_engine("sqlite+aiosqlite:///:memory:") + from sqlalchemy import event + event.listen(engine.sync_engine, "connect", _register_sqlite_now) + async with engine.begin() as conn: + await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS payslip_ingest") + await conn.run_sync(Base.metadata.create_all) + yield async_sessionmaker(engine, expire_on_commit=False) + await engine.dispose() + + +def _mock_accounts() -> dict[str, Any]: + return {"data": [{"id": "account-1", "name": "Barclays Personal"}]} + + +def _mock_transactions() -> dict[str, Any]: + return { + "data": [ + { + "id": "tx-001", + "date": "2026-03-28", + "amount": 650000, # 6500.00 GBP in cents + "payee_name": "META PLATFORMS IRELAND", + "notes": "Salary March", + }, + { + "id": "tx-002", + "date": "2026-02-28", + "amount": 620000, + "payee_name": "Facebook UK Ltd", + "notes": None, + }, + { + "id": "tx-003", + "date": "2026-03-15", + "amount": 12000, + "payee_name": "Tesco", + "notes": "groceries", + }, + { + "id": "tx-004", + "date": "2026-03-01", + "amount": -5000, # outgoing + "payee_name": "META (refund out)", + "notes": None, + }, + ] + } + + +@respx.mock +async def test_sync_meta_deposits_inserts_matches( + session_factory: async_sessionmaker[Any], +) -> None: + respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock( + return_value=httpx.Response(200, json=_mock_accounts())) + respx.get( + f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock( + return_value=httpx.Response(200, json=_mock_transactions())) + + async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client: + result = await sync_meta_deposits(client, session_factory) + + assert result.accounts_scanned == 1 + assert result.transactions_fetched == 4 + assert result.meta_deposits_matched == 2 + assert result.inserted == 2 + + async with session_factory() as session: + rows = (await session.execute( + select(ExternalMetaDeposit).order_by( + ExternalMetaDeposit.deposit_date))).scalars().all() + assert len(rows) == 2 + assert rows[0].actualbudget_tx_id == "tx-002" + assert rows[0].deposit_date == date(2026, 2, 28) + assert rows[0].amount == Decimal("6200.00") + assert rows[0].payee == "Facebook UK Ltd" + assert rows[1].actualbudget_tx_id == "tx-001" + assert rows[1].amount == Decimal("6500.00") + + +@respx.mock +async def test_sync_meta_deposits_is_idempotent( + session_factory: async_sessionmaker[Any], +) -> None: + """Running twice inserts each tx only once.""" + respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock( + return_value=httpx.Response(200, json=_mock_accounts())) + respx.get( + f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock( + return_value=httpx.Response(200, json=_mock_transactions())) + + async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client: + first = await sync_meta_deposits(client, session_factory) + second = await sync_meta_deposits(client, session_factory) + + assert first.inserted == 2 + assert second.inserted == 0 + assert second.skipped_existing == 2 + + async with session_factory() as session: + count = len((await session.execute( + select(ExternalMetaDeposit.id))).scalars().all()) + assert count == 2 + + +@respx.mock +async def test_sync_surfaces_missing_endpoint_error( + session_factory: async_sessionmaker[Any], +) -> None: + """404 on transactions endpoint must raise — triggers SQLite fallback.""" + respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock( + return_value=httpx.Response(200, json=_mock_accounts())) + respx.get( + f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock( + return_value=httpx.Response(404)) + + async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client: + with pytest.raises(ActualBudgetError, match="endpoint not found"): + await sync_meta_deposits(client, session_factory) + + +async def _insert_existing_deposit( + session_factory: async_sessionmaker[Any], + tx_id: str, +) -> None: + async with session_factory() as session, session.begin(): + session.add( + ExternalMetaDeposit( + actualbudget_tx_id=tx_id, + deposit_date=date(2026, 3, 28), + amount=Decimal("6500.00"), + payee="META", + memo=None, + synced_at=datetime.now(UTC), + ))