sync: ActualBudget Meta deposit overlay (Phase C)

Adds daily sync of Meta payroll deposits from ActualBudget into
payslip_ingest.external_meta_deposits, enabling the dashboard to overlay
bank deposits against payslip net_pay and surface parser drift on net.

- Migration 0007: new table external_meta_deposits, unique on
  actualbudget_tx_id, indexed on deposit_date.
- payslip_ingest.sync.actualbudget: narrow client for the
  jhonderson/actual-http-api sidecar (list accounts + transactions).
  Filters on payee regex (META|FACEBOOK, word-boundary). Idempotent
  upsert — ON CONFLICT DO NOTHING on actualbudget_tx_id. Surfaces
  clear error if the transactions endpoint is missing so the operator
  can switch to a SQLite-mount fallback.
- CLI command: `python -m payslip_ingest sync-meta-deposits` driven by
  4 env vars (ACTUALBUDGET_HTTP_API_URL, API_KEY, ENCRYPTION_PASSWORD,
  BUDGET_SYNC_ID).
- Tests: 5 — regex positive/negative, full sync insert, idempotency,
  404-endpoint failure mode.

Part of: code-860
This commit is contained in:
Viktor Barzin 2026-04-19 18:20:50 +00:00
parent 3b9c69bfd3
commit 08f28ad581
6 changed files with 492 additions and 0 deletions

View file

@ -0,0 +1,52 @@
"""Add external_meta_deposits for ActualBudget payroll reconciliation.
Daily sync pulls Meta payroll deposits from ActualBudget (the
jhonderson/actual-http-api sidecar) so the dashboard can overlay bank-
deposit reality with `payslip.net_pay`. If the delta exceeds a tolerance
threshold, the payslip parser likely got the net_pay wrong useful for
catching parser regressions without manual audit.
Idempotent on `actualbudget_tx_id` rerunning the sync only inserts new
transactions. Deletions in ActualBudget are NOT propagated here (append-
only the audit trail matters more than a live mirror).
"""
import sqlalchemy as sa
from alembic import op
revision = "0007"
down_revision = "0006"
branch_labels = None
depends_on = None
SCHEMA = "payslip_ingest"
def upgrade() -> None:
op.create_table(
"external_meta_deposits",
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
sa.Column("actualbudget_tx_id", sa.String(), nullable=False, unique=True),
sa.Column("deposit_date", sa.Date(), nullable=False),
sa.Column("amount", sa.Numeric(12, 2), nullable=False),
sa.Column("payee", sa.String(), nullable=True),
sa.Column("memo", sa.String(), nullable=True),
sa.Column("synced_at",
sa.TIMESTAMP(timezone=True),
server_default=sa.text("now()"),
nullable=False),
schema=SCHEMA,
)
op.create_index(
"ix_external_meta_deposits_deposit_date",
"external_meta_deposits",
["deposit_date"],
schema=SCHEMA,
)
def downgrade() -> None:
op.drop_index("ix_external_meta_deposits_deposit_date",
table_name="external_meta_deposits",
schema=SCHEMA)
op.drop_table("external_meta_deposits", schema=SCHEMA)

View file

@ -116,6 +116,39 @@ def migrate() -> None:
sys.exit(result.returncode)
@cli.command("sync-meta-deposits")
def sync_meta_deposits_cmd() -> None:
"""Pull Meta payroll deposits from ActualBudget into external_meta_deposits.
Reads from the jhonderson/actual-http-api sidecar. Requires env vars:
ACTUALBUDGET_HTTP_API_URL, ACTUALBUDGET_API_KEY,
ACTUALBUDGET_ENCRYPTION_PASSWORD, ACTUALBUDGET_BUDGET_SYNC_ID.
"""
asyncio.run(_sync_meta_deposits())
async def _sync_meta_deposits() -> None:
from payslip_ingest.sync.actualbudget import ActualBudgetClient, sync_meta_deposits
engine = create_engine_from_env()
session_factory = make_session_factory(engine)
client = ActualBudgetClient(
base_url=os.environ["ACTUALBUDGET_HTTP_API_URL"],
api_key=os.environ["ACTUALBUDGET_API_KEY"],
encryption_password=os.environ["ACTUALBUDGET_ENCRYPTION_PASSWORD"],
budget_sync_id=os.environ["ACTUALBUDGET_BUDGET_SYNC_ID"],
)
try:
result = await sync_meta_deposits(client, session_factory)
click.echo(f"sync complete: accounts={result.accounts_scanned} "
f"transactions={result.transactions_fetched} "
f"meta_matched={result.meta_deposits_matched} "
f"inserted={result.inserted} existing={result.skipped_existing}")
finally:
await client.aclose()
await engine.dispose()
@cli.command("backfill-cash-tax")
@click.option("--limit", type=int, default=None, help="Cap the number of rows processed.")
def backfill_cash_tax(limit: int | None) -> None:

View file

@ -77,6 +77,28 @@ class Payslip(Base):
server_default=text("now()"))
class ExternalMetaDeposit(Base):
"""Meta payroll deposit as recorded by ActualBudget — ground-truth against
`payslip.net_pay`. Synced daily by a CronJob that reads from the
jhonderson/actual-http-api sidecar.
Idempotent on `actualbudget_tx_id` same transaction id from AB means the
same deposit, re-runs are no-ops. Deletions in AB are not propagated.
"""
__tablename__ = "external_meta_deposits"
__table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
actualbudget_tx_id: Mapped[str] = mapped_column(String, unique=True, nullable=False)
deposit_date: Mapped[date] = mapped_column(Date, nullable=False)
amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
payee: Mapped[str | None] = mapped_column(String, nullable=True)
memo: Mapped[str | None] = mapped_column(String, nullable=True)
synced_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True),
nullable=False,
server_default=text("now()"))
class P60Reference(Base):
"""HMRC-issued annual P60. One row per (tax_year, employer).

View file

View file

@ -0,0 +1,206 @@
"""ActualBudget HTTP API client — pull Meta payroll deposits.
Reads from the jhonderson/actual-http-api sidecar in the actualbudget
namespace. Looks up accounts on the given budget, enumerates all
transactions across them, keeps only transactions whose payee name
matches a Meta pattern (META, FACEBOOK, META PLATFORMS etc.).
Idempotent: each sync run upserts on `actualbudget_tx_id`; existing rows
are untouched. Deletions in ActualBudget are NOT propagated.
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from datetime import date, datetime
from decimal import Decimal
from typing import Any
import httpx
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from payslip_ingest.db import ExternalMetaDeposit
log = logging.getLogger(__name__)
# Payee pattern. ActualBudget normalizes payee strings but the raw bank
# description can include country code / spacing variants. Match the
# common forms observed in the viktor budget.
META_PAYEE_RE = re.compile(r"\b(META|FACEBOOK)\b", re.IGNORECASE)
class ActualBudgetError(RuntimeError):
pass
@dataclass
class SyncResult:
accounts_scanned: int = 0
transactions_fetched: int = 0
meta_deposits_matched: int = 0
inserted: int = 0
skipped_existing: int = 0
class ActualBudgetClient:
"""Narrow client for the jhonderson/actual-http-api endpoints we need."""
def __init__(
self,
base_url: str,
api_key: str,
encryption_password: str,
budget_sync_id: str,
client: httpx.AsyncClient | None = None,
):
self._base_url = base_url.rstrip("/")
self._budget = budget_sync_id
self._headers = {
"accept": "application/json",
"x-api-key": api_key,
"budget-encryption-password": encryption_password,
}
self._client = client or httpx.AsyncClient(timeout=60.0)
self._owns_client = client is None
async def aclose(self) -> None:
if self._owns_client:
await self._client.aclose()
async def __aenter__(self) -> ActualBudgetClient:
return self
async def __aexit__(self, *exc: object) -> None:
await self.aclose()
async def list_accounts(self) -> list[dict[str, Any]]:
resp = await self._client.get(
f"{self._base_url}/v1/budgets/{self._budget}/accounts",
headers=self._headers,
)
resp.raise_for_status()
data = resp.json().get("data", [])
if not isinstance(data, list):
raise ActualBudgetError(f"accounts response not a list: {data!r}")
return data
async def list_transactions(self, account_id: str) -> list[dict[str, Any]]:
"""List all transactions for an account.
jhonderson/actual-http-api GET endpoint may return `data` as a list.
If the endpoint is missing (older image), surface a clear error so
the operator can switch to the SQLite-mount fallback.
"""
resp = await self._client.get(
f"{self._base_url}/v1/budgets/{self._budget}/accounts/{account_id}/transactions",
headers=self._headers,
)
if resp.status_code == 404:
raise ActualBudgetError(
"transaction-list endpoint not found — the http-api image may be too old; "
"fall back to reading SQLite directly")
resp.raise_for_status()
data = resp.json().get("data", [])
if not isinstance(data, list):
raise ActualBudgetError(f"transactions response not a list: {data!r}")
return data
async def sync_meta_deposits(
client: ActualBudgetClient,
db_session_factory: async_sessionmaker[Any],
) -> SyncResult:
"""Enumerate every transaction across every account, upsert Meta deposits."""
accounts = await client.list_accounts()
result = SyncResult(accounts_scanned=len(accounts))
for account in accounts:
account_id = account.get("id")
if not isinstance(account_id, str):
log.warning("skipping account without id: %r", account)
continue
txs = await client.list_transactions(account_id)
result.transactions_fetched += len(txs)
for tx in txs:
if not _is_meta_deposit(tx):
continue
result.meta_deposits_matched += 1
was_new = await _upsert(db_session_factory, tx)
if was_new:
result.inserted += 1
else:
result.skipped_existing += 1
return result
def _is_meta_deposit(tx: dict[str, Any]) -> bool:
"""Positive deposit (credit) where payee contains META / FACEBOOK."""
amount = tx.get("amount")
if not isinstance(amount, int | float):
return False
# ActualBudget stores amounts in cents (int); positive = incoming.
if amount <= 0:
return False
payee = tx.get("payee_name") or tx.get("payee") or ""
if not isinstance(payee, str):
return False
return bool(META_PAYEE_RE.search(payee))
async def _upsert(
db_session_factory: async_sessionmaker[Any],
tx: dict[str, Any],
) -> bool:
"""Insert the row; return True if newly inserted, False if it already existed.
Uses a dialect-aware ON CONFLICT DO NOTHING upsert Postgres in prod and
SQLite in tests both support this.
"""
tx_id = tx["id"]
amount_cents = int(tx["amount"])
amount = (Decimal(amount_cents) / Decimal(100)).quantize(Decimal("0.01"))
deposit_date = _parse_date(tx["date"])
payee = tx.get("payee_name") or tx.get("payee") or None
memo = tx.get("notes") or tx.get("memo") or None
async with db_session_factory() as session:
existing = await session.execute(
select(ExternalMetaDeposit.id).where(
ExternalMetaDeposit.actualbudget_tx_id == tx_id))
if existing.scalar() is not None:
return False
async with db_session_factory() as session, session.begin():
bind = session.bind
dialect = bind.dialect.name if bind is not None else "postgresql"
stmt_cls = pg_insert if dialect == "postgresql" else sqlite_insert
stmt = stmt_cls(ExternalMetaDeposit).values(
actualbudget_tx_id=tx_id,
deposit_date=deposit_date,
amount=amount,
payee=payee,
memo=memo,
).on_conflict_do_nothing(index_elements=[ExternalMetaDeposit.actualbudget_tx_id])
await session.execute(stmt)
return True
def _parse_date(raw: str) -> date:
return datetime.strptime(raw, "%Y-%m-%d").date()
__all__ = [
"ActualBudgetClient",
"ActualBudgetError",
"META_PAYEE_RE",
"SyncResult",
"sync_meta_deposits",
]

View file

@ -0,0 +1,179 @@
"""Unit tests for the ActualBudget sync client."""
from collections.abc import AsyncIterator
from datetime import UTC, date, datetime
from decimal import Decimal
from typing import Any
import httpx
import pytest
import respx
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, create_async_engine
from payslip_ingest.db import Base, ExternalMetaDeposit
from payslip_ingest.sync.actualbudget import (
META_PAYEE_RE,
ActualBudgetClient,
ActualBudgetError,
sync_meta_deposits,
)
BASE_URL = "http://budget-http-api-viktor.actualbudget.svc.cluster.local"
BUDGET_ID = "sync-id-viktor"
def test_meta_payee_regex_matches_canonical() -> None:
assert META_PAYEE_RE.search("META PLATFORMS IRELAND")
assert META_PAYEE_RE.search("Facebook UK Ltd")
assert META_PAYEE_RE.search("meta")
def test_meta_payee_regex_rejects_other() -> None:
assert not META_PAYEE_RE.search("Amazon")
assert not META_PAYEE_RE.search("Metamask") # requires word boundary
assert not META_PAYEE_RE.search("facebookish")
def _register_sqlite_now(dbapi_conn: Any, _: Any) -> None:
"""SQLite doesn't have now() — map it to datetime('now') so the Postgres
`server_default=text("now()")` on the ORM models works in tests."""
dbapi_conn.create_function("now", 0, lambda: datetime.now(UTC).isoformat(" "))
@pytest.fixture
async def session_factory() -> AsyncIterator[async_sessionmaker[Any]]:
engine: AsyncEngine = create_async_engine("sqlite+aiosqlite:///:memory:")
from sqlalchemy import event
event.listen(engine.sync_engine, "connect", _register_sqlite_now)
async with engine.begin() as conn:
await conn.exec_driver_sql("ATTACH DATABASE ':memory:' AS payslip_ingest")
await conn.run_sync(Base.metadata.create_all)
yield async_sessionmaker(engine, expire_on_commit=False)
await engine.dispose()
def _mock_accounts() -> dict[str, Any]:
return {"data": [{"id": "account-1", "name": "Barclays Personal"}]}
def _mock_transactions() -> dict[str, Any]:
return {
"data": [
{
"id": "tx-001",
"date": "2026-03-28",
"amount": 650000, # 6500.00 GBP in cents
"payee_name": "META PLATFORMS IRELAND",
"notes": "Salary March",
},
{
"id": "tx-002",
"date": "2026-02-28",
"amount": 620000,
"payee_name": "Facebook UK Ltd",
"notes": None,
},
{
"id": "tx-003",
"date": "2026-03-15",
"amount": 12000,
"payee_name": "Tesco",
"notes": "groceries",
},
{
"id": "tx-004",
"date": "2026-03-01",
"amount": -5000, # outgoing
"payee_name": "META (refund out)",
"notes": None,
},
]
}
@respx.mock
async def test_sync_meta_deposits_inserts_matches(
session_factory: async_sessionmaker[Any],
) -> None:
respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock(
return_value=httpx.Response(200, json=_mock_accounts()))
respx.get(
f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock(
return_value=httpx.Response(200, json=_mock_transactions()))
async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client:
result = await sync_meta_deposits(client, session_factory)
assert result.accounts_scanned == 1
assert result.transactions_fetched == 4
assert result.meta_deposits_matched == 2
assert result.inserted == 2
async with session_factory() as session:
rows = (await session.execute(
select(ExternalMetaDeposit).order_by(
ExternalMetaDeposit.deposit_date))).scalars().all()
assert len(rows) == 2
assert rows[0].actualbudget_tx_id == "tx-002"
assert rows[0].deposit_date == date(2026, 2, 28)
assert rows[0].amount == Decimal("6200.00")
assert rows[0].payee == "Facebook UK Ltd"
assert rows[1].actualbudget_tx_id == "tx-001"
assert rows[1].amount == Decimal("6500.00")
@respx.mock
async def test_sync_meta_deposits_is_idempotent(
session_factory: async_sessionmaker[Any],
) -> None:
"""Running twice inserts each tx only once."""
respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock(
return_value=httpx.Response(200, json=_mock_accounts()))
respx.get(
f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock(
return_value=httpx.Response(200, json=_mock_transactions()))
async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client:
first = await sync_meta_deposits(client, session_factory)
second = await sync_meta_deposits(client, session_factory)
assert first.inserted == 2
assert second.inserted == 0
assert second.skipped_existing == 2
async with session_factory() as session:
count = len((await session.execute(
select(ExternalMetaDeposit.id))).scalars().all())
assert count == 2
@respx.mock
async def test_sync_surfaces_missing_endpoint_error(
session_factory: async_sessionmaker[Any],
) -> None:
"""404 on transactions endpoint must raise — triggers SQLite fallback."""
respx.get(f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts").mock(
return_value=httpx.Response(200, json=_mock_accounts()))
respx.get(
f"{BASE_URL}/v1/budgets/{BUDGET_ID}/accounts/account-1/transactions").mock(
return_value=httpx.Response(404))
async with ActualBudgetClient(BASE_URL, "k", "pwd", BUDGET_ID) as client:
with pytest.raises(ActualBudgetError, match="endpoint not found"):
await sync_meta_deposits(client, session_factory)
async def _insert_existing_deposit(
session_factory: async_sessionmaker[Any],
tx_id: str,
) -> None:
async with session_factory() as session, session.begin():
session.add(
ExternalMetaDeposit(
actualbudget_tx_id=tx_id,
deposit_date=date(2026, 3, 28),
amount=Decimal("6500.00"),
payee="META",
memo=None,
synced_at=datetime.now(UTC),
))