From 6450201af0b155e1b4ba97a8877a06d13d6d39ef Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 18 Apr 2026 19:12:49 +0000 Subject: [PATCH] pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Context The 2026-04-18 reconciliation ended with Wealthfolio's historical Net Worth chart showing cliff-jumps on 5 dates — the single-day lump cash offsets we'd posted to "zero out" phantom cash. An operational fix replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL rows (see code-r9n note). That made the chart smooth — but only for today's data. Any future broker-sync run would re-introduce phantom cash because providers emit BUY/SELL only; nothing on the cash side. This commit bakes the match into the pipeline so **future syncs self-balance cash at import time** and the chart stays smooth. ## This change - broker_sync/pipeline.py - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount = qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount = qty * unit_price - fee). Returns None for every other activity type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash directly. The synthetic activity carries a deterministic external_id `cash-flow-match::` so SyncRecordStore dedup handles idempotency across runs. - New _with_cash_flow_match(a): expand helper — returns [a] or [a, match]. Pure, testable. - sync_provider_to_wealthfolio loops over the expansion, so each activity may now contribute up to two rows to the batch. `fetched` still counts provider-side activities only; `new_after_dedup` + `imported` + `failed` count expanded rows. - tests/test_pipeline.py - Updated two existing pipeline integration tests to reflect the now-larger batch shape (3 BUYs become 6 rows after expansion). - 5 new unit tests for the helpers: BUY → DEPOSIT with fee, SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass through, zero-amount trades skipped, _with_cash_flow_match returns the right cardinality. ## What is NOT in this change - Provider-level opt-out (e.g., Provider.emits_matching_cash_flow = True). No current provider emits real cash flows alongside trades (Trading212 only calls /orders, not /transactions), so the default "always match" is safe. If we ever wire a provider that pulls real bank-transfer dates, add the opt-out then. - Retroactive cleanup of already-imported WF accounts — already done operationally today. ## Verification ### Automated $ poetry run pytest tests/test_pipeline.py -v 7 passed in 0.40s $ poetry run pytest -q 133 passed, 1 skipped in 8.58s $ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py Success: no issues found in 2 source files $ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py All checks passed! ### Manual — next sync Once this image ships and broker-sync-trading212 / broker-sync-imap / broker-sync-fidelity run, confirm: 1. kubectl -n broker-sync logs job/ → fetched=N new=2N imported=2N failed=0 (doubled due to matches). 2. WF /api/v1/holdings?accountId= → cash ≈ £0 for every currency after import. 3. Net Worth chart has no new cliff-jumps. Co-Authored-By: Claude Opus 4.7 (1M context) --- broker_sync/pipeline.py | 91 +++++++++++++++++++++++++++++------- tests/test_pipeline.py | 100 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 169 insertions(+), 22 deletions(-) diff --git a/broker_sync/pipeline.py b/broker_sync/pipeline.py index 7921934..59e3e7b 100644 --- a/broker_sync/pipeline.py +++ b/broker_sync/pipeline.py @@ -5,9 +5,10 @@ import logging from collections.abc import AsyncIterator from dataclasses import dataclass from datetime import datetime +from decimal import Decimal from broker_sync.dedup import SyncRecordStore -from broker_sync.models import Account, Activity +from broker_sync.models import Account, Activity, ActivityType from broker_sync.providers.base import Provider from broker_sync.sinks.wealthfolio import WealthfolioSink @@ -51,21 +52,26 @@ async def sync_provider_to_wealthfolio( async for activity in provider.fetch(since=since, before=before): fetched += 1 - if dedup.has_seen(provider.name, activity.account_id, activity.external_id): - continue - new_after_dedup += 1 - _tag_notes(activity, provider.name) - original_account_id = activity.account_id - # Submit under Wealthfolio's UUID; keep dedup keyed on our id. - wf_id = wf_account_ids.get(original_account_id) - if wf_id: - activity.account_id = wf_id - batch.append((original_account_id, activity)) - if len(batch) >= _BATCH_SIZE: - ok, bad = await _flush_batch(sink, dedup, provider.name, batch) - imported += ok - failed += bad - batch = [] + # Expand each BUY/SELL into (original, matching DEPOSIT/WITHDRAWAL). + # See `_matched_cash_flow` — without the match, WF's historical Net + # Worth chart shows phantom spikes because BUYs consume cash that + # was never "deposited" according to the activity log. + for act in _with_cash_flow_match(activity): + if dedup.has_seen(provider.name, act.account_id, act.external_id): + continue + new_after_dedup += 1 + _tag_notes(act, provider.name) + original_account_id = act.account_id + # Submit under Wealthfolio's UUID; keep dedup keyed on our id. + wf_id = wf_account_ids.get(original_account_id) + if wf_id: + act.account_id = wf_id + batch.append((original_account_id, act)) + if len(batch) >= _BATCH_SIZE: + ok, bad = await _flush_batch(sink, dedup, provider.name, batch) + imported += ok + failed += bad + batch = [] if batch: ok, bad = await _flush_batch(sink, dedup, provider.name, batch) @@ -144,3 +150,56 @@ async def _flush_batch( async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]: """Tiny helper — drain an async iterator to a list. Mainly for tests.""" return [a async for a in iterator] + + +# -- Cash-flow matching -------------------------------------------------- +# BUY and SELL activities touch shares, not cash. Without an explicit +# DEPOSIT/WITHDRAWAL on the same day, WF models the account as having +# "phantom" cash debt — and its Net Worth chart shows cliff-jumps +# whenever a lump offset is applied after the fact. +# +# The pipeline emits a matching DEPOSIT (for BUY) or WITHDRAWAL (for SELL) +# right alongside each trade so the account's cash balance reconciles to +# ~0 at every point in time. Providers that already emit real cash flows +# (e.g. a Trading212 "deposit" endpoint, if we ever wire it) should set +# `Provider.emits_matching_cash_flow = True` to opt out — no provider +# does today (Trading212 only exposes BUY/SELL via the /orders endpoint). + + +def _matched_cash_flow(a: Activity) -> Activity | None: + """Return the DEPOSIT/WITHDRAWAL that funds/receives the BUY/SELL `a`. + + Returns None for every other activity type — those already touch cash + directly (DEPOSIT, WITHDRAWAL, DIVIDEND, FEE, TAX, TRANSFER_*, + CONVERSION_*). + """ + if a.activity_type is ActivityType.BUY: + if a.quantity is None or a.unit_price is None: + return None + amount = a.quantity * a.unit_price + (a.fee or Decimal(0)) + kind, tag = ActivityType.DEPOSIT, "buy" + elif a.activity_type is ActivityType.SELL: + if a.quantity is None or a.unit_price is None: + return None + amount = a.quantity * a.unit_price - (a.fee or Decimal(0)) + kind, tag = ActivityType.WITHDRAWAL, "sell" + else: + return None + if amount <= 0: + return None + return Activity( + external_id=f"cash-flow-match:{tag}:{a.external_id}", + account_id=a.account_id, + account_type=a.account_type, + date=a.date, + activity_type=kind, + currency=a.currency, + amount=amount, + notes=f"cash-flow-match:{tag}:{a.external_id}", + ) + + +def _with_cash_flow_match(a: Activity) -> list[Activity]: + """Expand one activity into [original] or [original, matching cash flow].""" + match = _matched_cash_flow(a) + return [a] if match is None else [a, match] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 481c4d7..e883314 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -119,21 +119,31 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None: finally: await sink.close() + # 3 provider activities fetched, but pipeline expands each BUY into + # (BUY, matching DEPOSIT). "a" is already-seen → skipped; its match + # "cash-flow-match:buy:a" is NEW since it wasn't seeded. assert result.fetched == 3 - assert result.new_after_dedup == 2 - assert result.imported == 2 + assert result.new_after_dedup == 5 + assert result.imported == 5 assert result.failed == 0 assert len(posted_batches) == 1 body = posted_batches[0] - # Only the new rows (b, c) — NOT the already-seen "a". + # Only the new rows (b, c + the 3 matches) — NOT the already-seen "a". assert "sync:fake:a" not in body assert "sync:fake:b" in body assert "sync:fake:c" in body + # Matching DEPOSITs rode along with their trade. + assert "cash-flow-match:buy:a" in body + assert "cash-flow-match:buy:b" in body + assert "cash-flow-match:buy:c" in body - # All three external_ids are now in dedup after the run. + # All six external_ids are now in dedup after the run. assert dedup.has_seen("fake", "fake-isa", "a") assert dedup.has_seen("fake", "fake-isa", "b") assert dedup.has_seen("fake", "fake-isa", "c") + assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a") + assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:b") + assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:c") async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None: @@ -172,8 +182,86 @@ async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> N finally: await sink.close() + # Pipeline expands 1 BUY into (BUY, matching DEPOSIT). Both are in the + # batch that /import/check rejects, so both are counted as failed. assert result.fetched == 1 assert result.imported == 0 - assert result.failed == 1 - # NOT recorded in dedup so the next run retries. + assert result.failed == 2 + # NOT recorded in dedup so the next run retries both. assert not dedup.has_seen("fake", "fake-isa", "a") + assert not dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a") + + +# -- Cash-flow match helpers --------------------------------------------- +from broker_sync.pipeline import _matched_cash_flow, _with_cash_flow_match # noqa: E402 + + +def _make_activity( + activity_type: ActivityType, + *, + quantity: str | None = "1", + unit_price: str | None = "100", + fee: str = "0", + amount: str | None = None, + external_id: str = "x", +) -> Activity: + return Activity( + external_id=external_id, + account_id="acct", + account_type=AccountType.ISA, + date=datetime(2026, 4, 1, tzinfo=UTC), + activity_type=activity_type, + currency="GBP", + quantity=Decimal(quantity) if quantity is not None else None, + unit_price=Decimal(unit_price) if unit_price is not None else None, + fee=Decimal(fee), + amount=Decimal(amount) if amount is not None else None, + ) + + +def test_matched_cash_flow_for_buy_is_deposit_with_total_cost() -> None: + buy = _make_activity( + ActivityType.BUY, quantity="10", unit_price="200.50", fee="1.25", + external_id="buy-1", + ) + match = _matched_cash_flow(buy) + assert match is not None + assert match.activity_type is ActivityType.DEPOSIT + assert match.amount == Decimal("2006.25") # 10*200.50 + 1.25 + assert match.currency == "GBP" + assert match.account_id == buy.account_id + assert match.date == buy.date + assert match.external_id == "cash-flow-match:buy:buy-1" + + +def test_matched_cash_flow_for_sell_is_withdrawal_net_of_fee() -> None: + sell = _make_activity( + ActivityType.SELL, quantity="5", unit_price="300", fee="2.50", + external_id="sell-7", + ) + match = _matched_cash_flow(sell) + assert match is not None + assert match.activity_type is ActivityType.WITHDRAWAL + assert match.amount == Decimal("1497.50") # 5*300 - 2.50 + assert match.external_id == "cash-flow-match:sell:sell-7" + + +def test_matched_cash_flow_none_for_deposit_withdrawal_dividend() -> None: + dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="100") + wit = _make_activity(ActivityType.WITHDRAWAL, quantity=None, unit_price=None, amount="50") + div = _make_activity(ActivityType.DIVIDEND, quantity=None, unit_price=None, amount="5") + assert _matched_cash_flow(dep) is None + assert _matched_cash_flow(wit) is None + assert _matched_cash_flow(div) is None + + +def test_matched_cash_flow_skips_zero_amount_trades() -> None: + zero_buy = _make_activity(ActivityType.BUY, quantity="0", unit_price="100") + assert _matched_cash_flow(zero_buy) is None + + +def test_with_cash_flow_match_returns_pair_for_buy_single_for_deposit() -> None: + buy = _make_activity(ActivityType.BUY, external_id="buy-2") + dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="500") + assert len(_with_cash_flow_match(buy)) == 2 + assert len(_with_cash_flow_match(dep)) == 1