pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL

## Context

The 2026-04-18 reconciliation ended with Wealthfolio's historical Net
Worth chart showing cliff-jumps on 5 dates — the single-day lump cash
offsets we'd posted to "zero out" phantom cash. An operational fix
replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL
rows (see code-r9n note). That made the chart smooth — but only for
today's data. Any future broker-sync run would re-introduce phantom
cash because providers emit BUY/SELL only; nothing on the cash side.

This commit bakes the match into the pipeline so **future syncs
self-balance cash at import time** and the chart stays smooth.

## This change

- broker_sync/pipeline.py
  - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount =
    qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount =
    qty * unit_price - fee). Returns None for every other activity
    type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash
    directly. The synthetic activity carries a deterministic
    external_id `cash-flow-match:<buy|sell>:<original external_id>`
    so SyncRecordStore dedup handles idempotency across runs.
  - New _with_cash_flow_match(a): expand helper — returns [a] or
    [a, match]. Pure, testable.
  - sync_provider_to_wealthfolio loops over the expansion, so each
    activity may now contribute up to two rows to the batch. `fetched`
    still counts provider-side activities only; `new_after_dedup` +
    `imported` + `failed` count expanded rows.
- tests/test_pipeline.py
  - Updated two existing pipeline integration tests to reflect the
    now-larger batch shape (3 BUYs become 6 rows after expansion).
  - 5 new unit tests for the helpers: BUY → DEPOSIT with fee,
    SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass
    through, zero-amount trades skipped, _with_cash_flow_match
    returns the right cardinality.

## What is NOT in this change

- Provider-level opt-out (e.g., Provider.emits_matching_cash_flow =
  True). No current provider emits real cash flows alongside trades
  (Trading212 only calls /orders, not /transactions), so the default
  "always match" is safe. If we ever wire a provider that pulls real
  bank-transfer dates, add the opt-out then.
- Retroactive cleanup of already-imported WF accounts — already done
  operationally today.

## Verification

### Automated

$ poetry run pytest tests/test_pipeline.py -v
7 passed in 0.40s

$ poetry run pytest -q
133 passed, 1 skipped in 8.58s

$ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py
Success: no issues found in 2 source files

$ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py
All checks passed!

### Manual — next sync

Once this image ships and broker-sync-trading212 / broker-sync-imap /
broker-sync-fidelity run, confirm:
1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N
   imported=2N failed=0 (doubled due to matches).
2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency
   after import.
3. Net Worth chart has no new cliff-jumps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-04-18 19:12:49 +00:00
parent 7c9be544dc
commit 6450201af0
2 changed files with 169 additions and 22 deletions

View file

@ -5,9 +5,10 @@ import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from decimal import Decimal
from broker_sync.dedup import SyncRecordStore from broker_sync.dedup import SyncRecordStore
from broker_sync.models import Account, Activity from broker_sync.models import Account, Activity, ActivityType
from broker_sync.providers.base import Provider from broker_sync.providers.base import Provider
from broker_sync.sinks.wealthfolio import WealthfolioSink from broker_sync.sinks.wealthfolio import WealthfolioSink
@ -51,21 +52,26 @@ async def sync_provider_to_wealthfolio(
async for activity in provider.fetch(since=since, before=before): async for activity in provider.fetch(since=since, before=before):
fetched += 1 fetched += 1
if dedup.has_seen(provider.name, activity.account_id, activity.external_id): # Expand each BUY/SELL into (original, matching DEPOSIT/WITHDRAWAL).
continue # See `_matched_cash_flow` — without the match, WF's historical Net
new_after_dedup += 1 # Worth chart shows phantom spikes because BUYs consume cash that
_tag_notes(activity, provider.name) # was never "deposited" according to the activity log.
original_account_id = activity.account_id for act in _with_cash_flow_match(activity):
# Submit under Wealthfolio's UUID; keep dedup keyed on our id. if dedup.has_seen(provider.name, act.account_id, act.external_id):
wf_id = wf_account_ids.get(original_account_id) continue
if wf_id: new_after_dedup += 1
activity.account_id = wf_id _tag_notes(act, provider.name)
batch.append((original_account_id, activity)) original_account_id = act.account_id
if len(batch) >= _BATCH_SIZE: # Submit under Wealthfolio's UUID; keep dedup keyed on our id.
ok, bad = await _flush_batch(sink, dedup, provider.name, batch) wf_id = wf_account_ids.get(original_account_id)
imported += ok if wf_id:
failed += bad act.account_id = wf_id
batch = [] batch.append((original_account_id, act))
if len(batch) >= _BATCH_SIZE:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
batch = []
if batch: if batch:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch) ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
@ -144,3 +150,56 @@ async def _flush_batch(
async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]: async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]:
"""Tiny helper — drain an async iterator to a list. Mainly for tests.""" """Tiny helper — drain an async iterator to a list. Mainly for tests."""
return [a async for a in iterator] return [a async for a in iterator]
# -- Cash-flow matching --------------------------------------------------
# BUY and SELL activities touch shares, not cash. Without an explicit
# DEPOSIT/WITHDRAWAL on the same day, WF models the account as having
# "phantom" cash debt — and its Net Worth chart shows cliff-jumps
# whenever a lump offset is applied after the fact.
#
# The pipeline emits a matching DEPOSIT (for BUY) or WITHDRAWAL (for SELL)
# right alongside each trade so the account's cash balance reconciles to
# ~0 at every point in time. Providers that already emit real cash flows
# (e.g. a Trading212 "deposit" endpoint, if we ever wire it) should set
# `Provider.emits_matching_cash_flow = True` to opt out — no provider
# does today (Trading212 only exposes BUY/SELL via the /orders endpoint).
def _matched_cash_flow(a: Activity) -> Activity | None:
"""Return the DEPOSIT/WITHDRAWAL that funds/receives the BUY/SELL `a`.
Returns None for every other activity type those already touch cash
directly (DEPOSIT, WITHDRAWAL, DIVIDEND, FEE, TAX, TRANSFER_*,
CONVERSION_*).
"""
if a.activity_type is ActivityType.BUY:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price + (a.fee or Decimal(0))
kind, tag = ActivityType.DEPOSIT, "buy"
elif a.activity_type is ActivityType.SELL:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price - (a.fee or Decimal(0))
kind, tag = ActivityType.WITHDRAWAL, "sell"
else:
return None
if amount <= 0:
return None
return Activity(
external_id=f"cash-flow-match:{tag}:{a.external_id}",
account_id=a.account_id,
account_type=a.account_type,
date=a.date,
activity_type=kind,
currency=a.currency,
amount=amount,
notes=f"cash-flow-match:{tag}:{a.external_id}",
)
def _with_cash_flow_match(a: Activity) -> list[Activity]:
"""Expand one activity into [original] or [original, matching cash flow]."""
match = _matched_cash_flow(a)
return [a] if match is None else [a, match]

View file

@ -119,21 +119,31 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None:
finally: finally:
await sink.close() await sink.close()
# 3 provider activities fetched, but pipeline expands each BUY into
# (BUY, matching DEPOSIT). "a" is already-seen → skipped; its match
# "cash-flow-match:buy:a" is NEW since it wasn't seeded.
assert result.fetched == 3 assert result.fetched == 3
assert result.new_after_dedup == 2 assert result.new_after_dedup == 5
assert result.imported == 2 assert result.imported == 5
assert result.failed == 0 assert result.failed == 0
assert len(posted_batches) == 1 assert len(posted_batches) == 1
body = posted_batches[0] body = posted_batches[0]
# Only the new rows (b, c) — NOT the already-seen "a". # Only the new rows (b, c + the 3 matches) — NOT the already-seen "a".
assert "sync:fake:a" not in body assert "sync:fake:a" not in body
assert "sync:fake:b" in body assert "sync:fake:b" in body
assert "sync:fake:c" in body assert "sync:fake:c" in body
# Matching DEPOSITs rode along with their trade.
assert "cash-flow-match:buy:a" in body
assert "cash-flow-match:buy:b" in body
assert "cash-flow-match:buy:c" in body
# All three external_ids are now in dedup after the run. # All six external_ids are now in dedup after the run.
assert dedup.has_seen("fake", "fake-isa", "a") assert dedup.has_seen("fake", "fake-isa", "a")
assert dedup.has_seen("fake", "fake-isa", "b") assert dedup.has_seen("fake", "fake-isa", "b")
assert dedup.has_seen("fake", "fake-isa", "c") assert dedup.has_seen("fake", "fake-isa", "c")
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a")
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:b")
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:c")
async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None: async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None:
@ -172,8 +182,86 @@ async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> N
finally: finally:
await sink.close() await sink.close()
# Pipeline expands 1 BUY into (BUY, matching DEPOSIT). Both are in the
# batch that /import/check rejects, so both are counted as failed.
assert result.fetched == 1 assert result.fetched == 1
assert result.imported == 0 assert result.imported == 0
assert result.failed == 1 assert result.failed == 2
# NOT recorded in dedup so the next run retries. # NOT recorded in dedup so the next run retries both.
assert not dedup.has_seen("fake", "fake-isa", "a") assert not dedup.has_seen("fake", "fake-isa", "a")
assert not dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a")
# -- Cash-flow match helpers ---------------------------------------------
from broker_sync.pipeline import _matched_cash_flow, _with_cash_flow_match # noqa: E402
def _make_activity(
activity_type: ActivityType,
*,
quantity: str | None = "1",
unit_price: str | None = "100",
fee: str = "0",
amount: str | None = None,
external_id: str = "x",
) -> Activity:
return Activity(
external_id=external_id,
account_id="acct",
account_type=AccountType.ISA,
date=datetime(2026, 4, 1, tzinfo=UTC),
activity_type=activity_type,
currency="GBP",
quantity=Decimal(quantity) if quantity is not None else None,
unit_price=Decimal(unit_price) if unit_price is not None else None,
fee=Decimal(fee),
amount=Decimal(amount) if amount is not None else None,
)
def test_matched_cash_flow_for_buy_is_deposit_with_total_cost() -> None:
buy = _make_activity(
ActivityType.BUY, quantity="10", unit_price="200.50", fee="1.25",
external_id="buy-1",
)
match = _matched_cash_flow(buy)
assert match is not None
assert match.activity_type is ActivityType.DEPOSIT
assert match.amount == Decimal("2006.25") # 10*200.50 + 1.25
assert match.currency == "GBP"
assert match.account_id == buy.account_id
assert match.date == buy.date
assert match.external_id == "cash-flow-match:buy:buy-1"
def test_matched_cash_flow_for_sell_is_withdrawal_net_of_fee() -> None:
sell = _make_activity(
ActivityType.SELL, quantity="5", unit_price="300", fee="2.50",
external_id="sell-7",
)
match = _matched_cash_flow(sell)
assert match is not None
assert match.activity_type is ActivityType.WITHDRAWAL
assert match.amount == Decimal("1497.50") # 5*300 - 2.50
assert match.external_id == "cash-flow-match:sell:sell-7"
def test_matched_cash_flow_none_for_deposit_withdrawal_dividend() -> None:
dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="100")
wit = _make_activity(ActivityType.WITHDRAWAL, quantity=None, unit_price=None, amount="50")
div = _make_activity(ActivityType.DIVIDEND, quantity=None, unit_price=None, amount="5")
assert _matched_cash_flow(dep) is None
assert _matched_cash_flow(wit) is None
assert _matched_cash_flow(div) is None
def test_matched_cash_flow_skips_zero_amount_trades() -> None:
zero_buy = _make_activity(ActivityType.BUY, quantity="0", unit_price="100")
assert _matched_cash_flow(zero_buy) is None
def test_with_cash_flow_match_returns_pair_for_buy_single_for_deposit() -> None:
buy = _make_activity(ActivityType.BUY, external_id="buy-2")
dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="500")
assert len(_with_cash_flow_match(buy)) == 2
assert len(_with_cash_flow_match(dep)) == 1