## Context
The 2026-04-18 reconciliation ended with Wealthfolio's historical Net
Worth chart showing cliff-jumps on 5 dates — the single-day lump cash
offsets we'd posted to "zero out" phantom cash. An operational fix
replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL
rows (see code-r9n note). That made the chart smooth — but only for
today's data. Any future broker-sync run would re-introduce phantom
cash because providers emit BUY/SELL only; nothing on the cash side.
This commit bakes the match into the pipeline so **future syncs
self-balance cash at import time** and the chart stays smooth.
## This change
- broker_sync/pipeline.py
- New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount =
qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount =
qty * unit_price - fee). Returns None for every other activity
type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash
directly. The synthetic activity carries a deterministic
external_id `cash-flow-match:<buy|sell>:<original external_id>`
so SyncRecordStore dedup handles idempotency across runs.
- New _with_cash_flow_match(a): expand helper — returns [a] or
[a, match]. Pure, testable.
- sync_provider_to_wealthfolio loops over the expansion, so each
activity may now contribute up to two rows to the batch. `fetched`
still counts provider-side activities only; `new_after_dedup` +
`imported` + `failed` count expanded rows.
- tests/test_pipeline.py
- Updated two existing pipeline integration tests to reflect the
now-larger batch shape (3 BUYs become 6 rows after expansion).
- 5 new unit tests for the helpers: BUY → DEPOSIT with fee,
SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass
through, zero-amount trades skipped, _with_cash_flow_match
returns the right cardinality.
## What is NOT in this change
- Provider-level opt-out (e.g., Provider.emits_matching_cash_flow =
True). No current provider emits real cash flows alongside trades
(Trading212 only calls /orders, not /transactions), so the default
"always match" is safe. If we ever wire a provider that pulls real
bank-transfer dates, add the opt-out then.
- Retroactive cleanup of already-imported WF accounts — already done
operationally today.
## Verification
### Automated
$ poetry run pytest tests/test_pipeline.py -v
7 passed in 0.40s
$ poetry run pytest -q
133 passed, 1 skipped in 8.58s
$ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py
Success: no issues found in 2 source files
$ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py
All checks passed!
### Manual — next sync
Once this image ships and broker-sync-trading212 / broker-sync-imap /
broker-sync-fidelity run, confirm:
1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N
imported=2N failed=0 (doubled due to matches).
2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency
after import.
3. Net Worth chart has no new cliff-jumps.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
267 lines
9.3 KiB
Python
267 lines
9.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from collections.abc import AsyncIterator
|
|
from datetime import UTC, datetime
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
import httpx
|
|
|
|
from broker_sync.dedup import SyncRecordStore
|
|
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
|
from broker_sync.pipeline import sync_provider_to_wealthfolio
|
|
from broker_sync.sinks.wealthfolio import WealthfolioSink
|
|
|
|
|
|
class _FakeProvider:
|
|
name = "fake"
|
|
|
|
def __init__(self, accounts: list[Account], activities: list[Activity]) -> None:
|
|
self._accounts = accounts
|
|
self._activities = activities
|
|
|
|
def accounts(self) -> list[Account]:
|
|
return list(self._accounts)
|
|
|
|
async def fetch(
|
|
self,
|
|
*,
|
|
since: datetime | None = None,
|
|
before: datetime | None = None,
|
|
) -> AsyncIterator[Activity]:
|
|
for a in self._activities:
|
|
yield a
|
|
|
|
|
|
def _buy(external_id: str, account_id: str = "fake-isa") -> Activity:
|
|
return Activity(
|
|
external_id=external_id,
|
|
account_id=account_id,
|
|
account_type=AccountType.ISA,
|
|
date=datetime(2026, 4, 1, tzinfo=UTC),
|
|
activity_type=ActivityType.BUY,
|
|
symbol="VUAG",
|
|
quantity=Decimal("1"),
|
|
unit_price=Decimal("100"),
|
|
currency="GBP",
|
|
)
|
|
|
|
|
|
def _sink(transport: httpx.MockTransport, session_path: Path) -> WealthfolioSink:
|
|
session_path.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
|
return WealthfolioSink(
|
|
base_url="https://wf.test",
|
|
username="x",
|
|
password="x",
|
|
session_path=session_path,
|
|
transport=transport,
|
|
)
|
|
|
|
|
|
async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None:
|
|
account = Account(
|
|
id="fake-isa",
|
|
name="Fake ISA",
|
|
account_type=AccountType.ISA,
|
|
currency="GBP",
|
|
provider="fake",
|
|
)
|
|
provider = _FakeProvider([account], [_buy("a"), _buy("b"), _buy("c")])
|
|
|
|
posted_batches: list[str] = []
|
|
|
|
async def handler(req: httpx.Request) -> httpx.Response:
|
|
if req.method == "GET" and req.url.path == "/api/v1/accounts":
|
|
# Return account with Wealthfolio-assigned UUID + our providerAccountId.
|
|
return httpx.Response(
|
|
200,
|
|
json=[{
|
|
"id": "wf-uuid-fake-isa",
|
|
"provider": "fake",
|
|
"providerAccountId": "fake-isa",
|
|
}],
|
|
)
|
|
if req.url.path == "/api/v1/activities/import/check":
|
|
body = json.loads(req.content)
|
|
# Echo each activity back marked valid (mimic Wealthfolio's
|
|
# hydrate step).
|
|
return httpx.Response(200,
|
|
json=[{
|
|
**a, "isValid": True,
|
|
"errors": None
|
|
} for a in body["activities"]])
|
|
if req.url.path == "/api/v1/activities/import":
|
|
body = req.content.decode()
|
|
posted_batches.append(body)
|
|
return httpx.Response(
|
|
200,
|
|
json={
|
|
"activities": [{
|
|
"id": f"wf-{i}",
|
|
"external_id": ext
|
|
} for i, ext in enumerate(["a", "b", "c"])]
|
|
},
|
|
)
|
|
return httpx.Response(500)
|
|
|
|
sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json")
|
|
dedup = SyncRecordStore(tmp_path / "sync.db")
|
|
# Seed one already-seen to confirm dedup.
|
|
dedup.record("fake", "fake-isa", "a", wealthfolio_activity_id="wf-old")
|
|
|
|
try:
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
)
|
|
finally:
|
|
await sink.close()
|
|
|
|
# 3 provider activities fetched, but pipeline expands each BUY into
|
|
# (BUY, matching DEPOSIT). "a" is already-seen → skipped; its match
|
|
# "cash-flow-match:buy:a" is NEW since it wasn't seeded.
|
|
assert result.fetched == 3
|
|
assert result.new_after_dedup == 5
|
|
assert result.imported == 5
|
|
assert result.failed == 0
|
|
assert len(posted_batches) == 1
|
|
body = posted_batches[0]
|
|
# Only the new rows (b, c + the 3 matches) — NOT the already-seen "a".
|
|
assert "sync:fake:a" not in body
|
|
assert "sync:fake:b" in body
|
|
assert "sync:fake:c" in body
|
|
# Matching DEPOSITs rode along with their trade.
|
|
assert "cash-flow-match:buy:a" in body
|
|
assert "cash-flow-match:buy:b" in body
|
|
assert "cash-flow-match:buy:c" in body
|
|
|
|
# All six external_ids are now in dedup after the run.
|
|
assert dedup.has_seen("fake", "fake-isa", "a")
|
|
assert dedup.has_seen("fake", "fake-isa", "b")
|
|
assert dedup.has_seen("fake", "fake-isa", "c")
|
|
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a")
|
|
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:b")
|
|
assert dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:c")
|
|
|
|
|
|
async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None:
|
|
account = Account(
|
|
id="fake-isa",
|
|
name="Fake ISA",
|
|
account_type=AccountType.ISA,
|
|
currency="GBP",
|
|
provider="fake",
|
|
)
|
|
provider = _FakeProvider([account], [_buy("a")])
|
|
|
|
async def handler(req: httpx.Request) -> httpx.Response:
|
|
if req.method == "GET" and req.url.path == "/api/v1/accounts":
|
|
return httpx.Response(
|
|
200,
|
|
json=[{
|
|
"id": "wf-uuid-fake-isa",
|
|
"provider": "fake",
|
|
"providerAccountId": "fake-isa",
|
|
}],
|
|
)
|
|
if req.url.path == "/api/v1/activities/import/check":
|
|
return httpx.Response(400, json={"errors": ["bad row"]})
|
|
return httpx.Response(500)
|
|
|
|
sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json")
|
|
dedup = SyncRecordStore(tmp_path / "sync.db")
|
|
|
|
try:
|
|
result = await sync_provider_to_wealthfolio(
|
|
provider=provider,
|
|
sink=sink,
|
|
dedup=dedup,
|
|
)
|
|
finally:
|
|
await sink.close()
|
|
|
|
# Pipeline expands 1 BUY into (BUY, matching DEPOSIT). Both are in the
|
|
# batch that /import/check rejects, so both are counted as failed.
|
|
assert result.fetched == 1
|
|
assert result.imported == 0
|
|
assert result.failed == 2
|
|
# NOT recorded in dedup so the next run retries both.
|
|
assert not dedup.has_seen("fake", "fake-isa", "a")
|
|
assert not dedup.has_seen("fake", "fake-isa", "cash-flow-match:buy:a")
|
|
|
|
|
|
# -- Cash-flow match helpers ---------------------------------------------
|
|
from broker_sync.pipeline import _matched_cash_flow, _with_cash_flow_match # noqa: E402
|
|
|
|
|
|
def _make_activity(
|
|
activity_type: ActivityType,
|
|
*,
|
|
quantity: str | None = "1",
|
|
unit_price: str | None = "100",
|
|
fee: str = "0",
|
|
amount: str | None = None,
|
|
external_id: str = "x",
|
|
) -> Activity:
|
|
return Activity(
|
|
external_id=external_id,
|
|
account_id="acct",
|
|
account_type=AccountType.ISA,
|
|
date=datetime(2026, 4, 1, tzinfo=UTC),
|
|
activity_type=activity_type,
|
|
currency="GBP",
|
|
quantity=Decimal(quantity) if quantity is not None else None,
|
|
unit_price=Decimal(unit_price) if unit_price is not None else None,
|
|
fee=Decimal(fee),
|
|
amount=Decimal(amount) if amount is not None else None,
|
|
)
|
|
|
|
|
|
def test_matched_cash_flow_for_buy_is_deposit_with_total_cost() -> None:
|
|
buy = _make_activity(
|
|
ActivityType.BUY, quantity="10", unit_price="200.50", fee="1.25",
|
|
external_id="buy-1",
|
|
)
|
|
match = _matched_cash_flow(buy)
|
|
assert match is not None
|
|
assert match.activity_type is ActivityType.DEPOSIT
|
|
assert match.amount == Decimal("2006.25") # 10*200.50 + 1.25
|
|
assert match.currency == "GBP"
|
|
assert match.account_id == buy.account_id
|
|
assert match.date == buy.date
|
|
assert match.external_id == "cash-flow-match:buy:buy-1"
|
|
|
|
|
|
def test_matched_cash_flow_for_sell_is_withdrawal_net_of_fee() -> None:
|
|
sell = _make_activity(
|
|
ActivityType.SELL, quantity="5", unit_price="300", fee="2.50",
|
|
external_id="sell-7",
|
|
)
|
|
match = _matched_cash_flow(sell)
|
|
assert match is not None
|
|
assert match.activity_type is ActivityType.WITHDRAWAL
|
|
assert match.amount == Decimal("1497.50") # 5*300 - 2.50
|
|
assert match.external_id == "cash-flow-match:sell:sell-7"
|
|
|
|
|
|
def test_matched_cash_flow_none_for_deposit_withdrawal_dividend() -> None:
|
|
dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="100")
|
|
wit = _make_activity(ActivityType.WITHDRAWAL, quantity=None, unit_price=None, amount="50")
|
|
div = _make_activity(ActivityType.DIVIDEND, quantity=None, unit_price=None, amount="5")
|
|
assert _matched_cash_flow(dep) is None
|
|
assert _matched_cash_flow(wit) is None
|
|
assert _matched_cash_flow(div) is None
|
|
|
|
|
|
def test_matched_cash_flow_skips_zero_amount_trades() -> None:
|
|
zero_buy = _make_activity(ActivityType.BUY, quantity="0", unit_price="100")
|
|
assert _matched_cash_flow(zero_buy) is None
|
|
|
|
|
|
def test_with_cash_flow_match_returns_pair_for_buy_single_for_deposit() -> None:
|
|
buy = _make_activity(ActivityType.BUY, external_id="buy-2")
|
|
dep = _make_activity(ActivityType.DEPOSIT, quantity=None, unit_price=None, amount="500")
|
|
assert len(_with_cash_flow_match(buy)) == 2
|
|
assert len(_with_cash_flow_match(dep)) == 1
|