broker-sync/broker_sync/pipeline.py
Viktor Barzin 6450201af0 pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL
## Context

The 2026-04-18 reconciliation ended with Wealthfolio's historical Net
Worth chart showing cliff-jumps on 5 dates — the single-day lump cash
offsets we'd posted to "zero out" phantom cash. An operational fix
replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL
rows (see code-r9n note). That made the chart smooth — but only for
today's data. Any future broker-sync run would re-introduce phantom
cash because providers emit BUY/SELL only; nothing on the cash side.

This commit bakes the match into the pipeline so **future syncs
self-balance cash at import time** and the chart stays smooth.

## This change

- broker_sync/pipeline.py
  - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount =
    qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount =
    qty * unit_price - fee). Returns None for every other activity
    type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash
    directly. The synthetic activity carries a deterministic
    external_id `cash-flow-match:<buy|sell>:<original external_id>`
    so SyncRecordStore dedup handles idempotency across runs.
  - New _with_cash_flow_match(a): expand helper — returns [a] or
    [a, match]. Pure, testable.
  - sync_provider_to_wealthfolio loops over the expansion, so each
    activity may now contribute up to two rows to the batch. `fetched`
    still counts provider-side activities only; `new_after_dedup` +
    `imported` + `failed` count expanded rows.
- tests/test_pipeline.py
  - Updated two existing pipeline integration tests to reflect the
    now-larger batch shape (3 BUYs become 6 rows after expansion).
  - 5 new unit tests for the helpers: BUY → DEPOSIT with fee,
    SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass
    through, zero-amount trades skipped, _with_cash_flow_match
    returns the right cardinality.

## What is NOT in this change

- Provider-level opt-out (e.g., Provider.emits_matching_cash_flow =
  True). No current provider emits real cash flows alongside trades
  (Trading212 only calls /orders, not /transactions), so the default
  "always match" is safe. If we ever wire a provider that pulls real
  bank-transfer dates, add the opt-out then.
- Retroactive cleanup of already-imported WF accounts — already done
  operationally today.

## Verification

### Automated

$ poetry run pytest tests/test_pipeline.py -v
7 passed in 0.40s

$ poetry run pytest -q
133 passed, 1 skipped in 8.58s

$ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py
Success: no issues found in 2 source files

$ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py
All checks passed!

### Manual — next sync

Once this image ships and broker-sync-trading212 / broker-sync-imap /
broker-sync-fidelity run, confirm:
1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N
   imported=2N failed=0 (doubled due to matches).
2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency
   after import.
3. Net Worth chart has no new cliff-jumps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00

205 lines
7.1 KiB
Python

"""Orchestrator: one-shot sync of a Provider's activities into Wealthfolio."""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from broker_sync.dedup import SyncRecordStore
from broker_sync.models import Account, Activity, ActivityType
from broker_sync.providers.base import Provider
from broker_sync.sinks.wealthfolio import WealthfolioSink
log = logging.getLogger(__name__)
# Wealthfolio's CSV import accepts arbitrary sizes but we batch to keep
# any single-row validation error from failing the whole run.
_BATCH_SIZE = 200
@dataclass
class SyncResult:
provider: str
fetched: int
new_after_dedup: int
imported: int
failed: int # rows that import_activities returned errors for
async def sync_provider_to_wealthfolio(
*,
provider: Provider,
sink: WealthfolioSink,
dedup: SyncRecordStore,
since: datetime | None = None,
before: datetime | None = None,
) -> SyncResult:
"""Run the fetch → dedup → import → record pipeline for one provider.
Caller owns sink lifecycle (including `login()` and `close()`).
"""
wf_account_ids = await _ensure_accounts(sink, provider.accounts())
fetched = 0
new_after_dedup = 0
imported = 0
failed = 0
# Batches are (original_account_id, remapped_for_import Activity) pairs.
# Dedup keys on our stable account_id; the import row uses Wealthfolio's UUID.
batch: list[tuple[str, Activity]] = []
async for activity in provider.fetch(since=since, before=before):
fetched += 1
# Expand each BUY/SELL into (original, matching DEPOSIT/WITHDRAWAL).
# See `_matched_cash_flow` — without the match, WF's historical Net
# Worth chart shows phantom spikes because BUYs consume cash that
# was never "deposited" according to the activity log.
for act in _with_cash_flow_match(activity):
if dedup.has_seen(provider.name, act.account_id, act.external_id):
continue
new_after_dedup += 1
_tag_notes(act, provider.name)
original_account_id = act.account_id
# Submit under Wealthfolio's UUID; keep dedup keyed on our id.
wf_id = wf_account_ids.get(original_account_id)
if wf_id:
act.account_id = wf_id
batch.append((original_account_id, act))
if len(batch) >= _BATCH_SIZE:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
batch = []
if batch:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
log.info(
"sync complete provider=%s fetched=%d new=%d imported=%d failed=%d",
provider.name,
fetched,
new_after_dedup,
imported,
failed,
)
return SyncResult(
provider=provider.name,
fetched=fetched,
new_after_dedup=new_after_dedup,
imported=imported,
failed=failed,
)
async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> dict[str, str]:
"""Return {our_account_id: wealthfolio_uuid}."""
out: dict[str, str] = {}
for account in accounts:
out[account.id] = await sink.ensure_account(account)
return out
def _tag_notes(activity: Activity, provider_name: str) -> None:
"""Stamp the notes field with a dedup-friendly tag (belt-and-braces)."""
tag = f"sync:{provider_name}:{activity.external_id}"
if activity.notes:
if tag not in activity.notes:
activity.notes = f"{activity.notes} | {tag}"
else:
activity.notes = tag
async def _flush_batch(
sink: WealthfolioSink,
dedup: SyncRecordStore,
provider_name: str,
batch: list[tuple[str, Activity]],
) -> tuple[int, int]:
activities_only = [a for _, a in batch]
try:
created = await sink.import_activities(activities_only)
except Exception:
log.exception("Wealthfolio import failed for batch of %d", len(batch))
return 0, len(batch)
# Map returned Wealthfolio activity ids back to our external_ids.
by_external: dict[str, str | None] = {}
for row in created:
ext = row.get("external_id") if isinstance(row, dict) else None
wf_id = row.get("id") if isinstance(row, dict) else None
if ext:
by_external[str(ext)] = str(wf_id) if wf_id is not None else None
ok = 0
for original_account_id, a in batch:
wf_id = by_external.get(a.external_id)
dedup.record(
provider_name,
original_account_id,
a.external_id,
wealthfolio_activity_id=wf_id,
)
ok += 1
return ok, 0
async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]:
"""Tiny helper — drain an async iterator to a list. Mainly for tests."""
return [a async for a in iterator]
# -- Cash-flow matching --------------------------------------------------
# BUY and SELL activities touch shares, not cash. Without an explicit
# DEPOSIT/WITHDRAWAL on the same day, WF models the account as having
# "phantom" cash debt — and its Net Worth chart shows cliff-jumps
# whenever a lump offset is applied after the fact.
#
# The pipeline emits a matching DEPOSIT (for BUY) or WITHDRAWAL (for SELL)
# right alongside each trade so the account's cash balance reconciles to
# ~0 at every point in time. Providers that already emit real cash flows
# (e.g. a Trading212 "deposit" endpoint, if we ever wire it) should set
# `Provider.emits_matching_cash_flow = True` to opt out — no provider
# does today (Trading212 only exposes BUY/SELL via the /orders endpoint).
def _matched_cash_flow(a: Activity) -> Activity | None:
"""Return the DEPOSIT/WITHDRAWAL that funds/receives the BUY/SELL `a`.
Returns None for every other activity type — those already touch cash
directly (DEPOSIT, WITHDRAWAL, DIVIDEND, FEE, TAX, TRANSFER_*,
CONVERSION_*).
"""
if a.activity_type is ActivityType.BUY:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price + (a.fee or Decimal(0))
kind, tag = ActivityType.DEPOSIT, "buy"
elif a.activity_type is ActivityType.SELL:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price - (a.fee or Decimal(0))
kind, tag = ActivityType.WITHDRAWAL, "sell"
else:
return None
if amount <= 0:
return None
return Activity(
external_id=f"cash-flow-match:{tag}:{a.external_id}",
account_id=a.account_id,
account_type=a.account_type,
date=a.date,
activity_type=kind,
currency=a.currency,
amount=amount,
notes=f"cash-flow-match:{tag}:{a.external_id}",
)
def _with_cash_flow_match(a: Activity) -> list[Activity]:
"""Expand one activity into [original] or [original, matching cash flow]."""
match = _matched_cash_flow(a)
return [a] if match is None else [a, match]