broker-sync/broker_sync/pipeline.py

206 lines
7.1 KiB
Python
Raw Normal View History

Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
"""Orchestrator: one-shot sync of a Provider's activities into Wealthfolio."""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import datetime
pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL ## Context The 2026-04-18 reconciliation ended with Wealthfolio's historical Net Worth chart showing cliff-jumps on 5 dates — the single-day lump cash offsets we'd posted to "zero out" phantom cash. An operational fix replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL rows (see code-r9n note). That made the chart smooth — but only for today's data. Any future broker-sync run would re-introduce phantom cash because providers emit BUY/SELL only; nothing on the cash side. This commit bakes the match into the pipeline so **future syncs self-balance cash at import time** and the chart stays smooth. ## This change - broker_sync/pipeline.py - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount = qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount = qty * unit_price - fee). Returns None for every other activity type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash directly. The synthetic activity carries a deterministic external_id `cash-flow-match:<buy|sell>:<original external_id>` so SyncRecordStore dedup handles idempotency across runs. - New _with_cash_flow_match(a): expand helper — returns [a] or [a, match]. Pure, testable. - sync_provider_to_wealthfolio loops over the expansion, so each activity may now contribute up to two rows to the batch. `fetched` still counts provider-side activities only; `new_after_dedup` + `imported` + `failed` count expanded rows. - tests/test_pipeline.py - Updated two existing pipeline integration tests to reflect the now-larger batch shape (3 BUYs become 6 rows after expansion). - 5 new unit tests for the helpers: BUY → DEPOSIT with fee, SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass through, zero-amount trades skipped, _with_cash_flow_match returns the right cardinality. ## What is NOT in this change - Provider-level opt-out (e.g., Provider.emits_matching_cash_flow = True). No current provider emits real cash flows alongside trades (Trading212 only calls /orders, not /transactions), so the default "always match" is safe. If we ever wire a provider that pulls real bank-transfer dates, add the opt-out then. - Retroactive cleanup of already-imported WF accounts — already done operationally today. ## Verification ### Automated $ poetry run pytest tests/test_pipeline.py -v 7 passed in 0.40s $ poetry run pytest -q 133 passed, 1 skipped in 8.58s $ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py Success: no issues found in 2 source files $ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py All checks passed! ### Manual — next sync Once this image ships and broker-sync-trading212 / broker-sync-imap / broker-sync-fidelity run, confirm: 1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N imported=2N failed=0 (doubled due to matches). 2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency after import. 3. Net Worth chart has no new cliff-jumps. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00
from decimal import Decimal
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
from broker_sync.dedup import SyncRecordStore
pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL ## Context The 2026-04-18 reconciliation ended with Wealthfolio's historical Net Worth chart showing cliff-jumps on 5 dates — the single-day lump cash offsets we'd posted to "zero out" phantom cash. An operational fix replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL rows (see code-r9n note). That made the chart smooth — but only for today's data. Any future broker-sync run would re-introduce phantom cash because providers emit BUY/SELL only; nothing on the cash side. This commit bakes the match into the pipeline so **future syncs self-balance cash at import time** and the chart stays smooth. ## This change - broker_sync/pipeline.py - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount = qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount = qty * unit_price - fee). Returns None for every other activity type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash directly. The synthetic activity carries a deterministic external_id `cash-flow-match:<buy|sell>:<original external_id>` so SyncRecordStore dedup handles idempotency across runs. - New _with_cash_flow_match(a): expand helper — returns [a] or [a, match]. Pure, testable. - sync_provider_to_wealthfolio loops over the expansion, so each activity may now contribute up to two rows to the batch. `fetched` still counts provider-side activities only; `new_after_dedup` + `imported` + `failed` count expanded rows. - tests/test_pipeline.py - Updated two existing pipeline integration tests to reflect the now-larger batch shape (3 BUYs become 6 rows after expansion). - 5 new unit tests for the helpers: BUY → DEPOSIT with fee, SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass through, zero-amount trades skipped, _with_cash_flow_match returns the right cardinality. ## What is NOT in this change - Provider-level opt-out (e.g., Provider.emits_matching_cash_flow = True). No current provider emits real cash flows alongside trades (Trading212 only calls /orders, not /transactions), so the default "always match" is safe. If we ever wire a provider that pulls real bank-transfer dates, add the opt-out then. - Retroactive cleanup of already-imported WF accounts — already done operationally today. ## Verification ### Automated $ poetry run pytest tests/test_pipeline.py -v 7 passed in 0.40s $ poetry run pytest -q 133 passed, 1 skipped in 8.58s $ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py Success: no issues found in 2 source files $ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py All checks passed! ### Manual — next sync Once this image ships and broker-sync-trading212 / broker-sync-imap / broker-sync-fidelity run, confirm: 1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N imported=2N failed=0 (doubled due to matches). 2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency after import. 3. Net Worth chart has no new cliff-jumps. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00
from broker_sync.models import Account, Activity, ActivityType
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
from broker_sync.providers.base import Provider
from broker_sync.sinks.wealthfolio import WealthfolioSink
log = logging.getLogger(__name__)
# Wealthfolio's CSV import accepts arbitrary sizes but we batch to keep
# any single-row validation error from failing the whole run.
_BATCH_SIZE = 200
@dataclass
class SyncResult:
provider: str
fetched: int
new_after_dedup: int
imported: int
failed: int # rows that import_activities returned errors for
async def sync_provider_to_wealthfolio(
*,
provider: Provider,
sink: WealthfolioSink,
dedup: SyncRecordStore,
since: datetime | None = None,
before: datetime | None = None,
) -> SyncResult:
"""Run the fetch → dedup → import → record pipeline for one provider.
Caller owns sink lifecycle (including `login()` and `close()`).
"""
wf_account_ids = await _ensure_accounts(sink, provider.accounts())
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
fetched = 0
new_after_dedup = 0
imported = 0
failed = 0
# Batches are (original_account_id, remapped_for_import Activity) pairs.
# Dedup keys on our stable account_id; the import row uses Wealthfolio's UUID.
batch: list[tuple[str, Activity]] = []
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
async for activity in provider.fetch(since=since, before=before):
fetched += 1
pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL ## Context The 2026-04-18 reconciliation ended with Wealthfolio's historical Net Worth chart showing cliff-jumps on 5 dates — the single-day lump cash offsets we'd posted to "zero out" phantom cash. An operational fix replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL rows (see code-r9n note). That made the chart smooth — but only for today's data. Any future broker-sync run would re-introduce phantom cash because providers emit BUY/SELL only; nothing on the cash side. This commit bakes the match into the pipeline so **future syncs self-balance cash at import time** and the chart stays smooth. ## This change - broker_sync/pipeline.py - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount = qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount = qty * unit_price - fee). Returns None for every other activity type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash directly. The synthetic activity carries a deterministic external_id `cash-flow-match:<buy|sell>:<original external_id>` so SyncRecordStore dedup handles idempotency across runs. - New _with_cash_flow_match(a): expand helper — returns [a] or [a, match]. Pure, testable. - sync_provider_to_wealthfolio loops over the expansion, so each activity may now contribute up to two rows to the batch. `fetched` still counts provider-side activities only; `new_after_dedup` + `imported` + `failed` count expanded rows. - tests/test_pipeline.py - Updated two existing pipeline integration tests to reflect the now-larger batch shape (3 BUYs become 6 rows after expansion). - 5 new unit tests for the helpers: BUY → DEPOSIT with fee, SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass through, zero-amount trades skipped, _with_cash_flow_match returns the right cardinality. ## What is NOT in this change - Provider-level opt-out (e.g., Provider.emits_matching_cash_flow = True). No current provider emits real cash flows alongside trades (Trading212 only calls /orders, not /transactions), so the default "always match" is safe. If we ever wire a provider that pulls real bank-transfer dates, add the opt-out then. - Retroactive cleanup of already-imported WF accounts — already done operationally today. ## Verification ### Automated $ poetry run pytest tests/test_pipeline.py -v 7 passed in 0.40s $ poetry run pytest -q 133 passed, 1 skipped in 8.58s $ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py Success: no issues found in 2 source files $ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py All checks passed! ### Manual — next sync Once this image ships and broker-sync-trading212 / broker-sync-imap / broker-sync-fidelity run, confirm: 1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N imported=2N failed=0 (doubled due to matches). 2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency after import. 3. Net Worth chart has no new cliff-jumps. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00
# Expand each BUY/SELL into (original, matching DEPOSIT/WITHDRAWAL).
# See `_matched_cash_flow` — without the match, WF's historical Net
# Worth chart shows phantom spikes because BUYs consume cash that
# was never "deposited" according to the activity log.
for act in _with_cash_flow_match(activity):
if dedup.has_seen(provider.name, act.account_id, act.external_id):
continue
new_after_dedup += 1
_tag_notes(act, provider.name)
original_account_id = act.account_id
# Submit under Wealthfolio's UUID; keep dedup keyed on our id.
wf_id = wf_account_ids.get(original_account_id)
if wf_id:
act.account_id = wf_id
batch.append((original_account_id, act))
if len(batch) >= _BATCH_SIZE:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
batch = []
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
if batch:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
log.info(
"sync complete provider=%s fetched=%d new=%d imported=%d failed=%d",
provider.name,
fetched,
new_after_dedup,
imported,
failed,
)
return SyncResult(
provider=provider.name,
fetched=fetched,
new_after_dedup=new_after_dedup,
imported=imported,
failed=failed,
)
async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> dict[str, str]:
"""Return {our_account_id: wealthfolio_uuid}."""
out: dict[str, str] = {}
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
for account in accounts:
out[account.id] = await sink.ensure_account(account)
return out
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
def _tag_notes(activity: Activity, provider_name: str) -> None:
"""Stamp the notes field with a dedup-friendly tag (belt-and-braces)."""
tag = f"sync:{provider_name}:{activity.external_id}"
if activity.notes:
if tag not in activity.notes:
activity.notes = f"{activity.notes} | {tag}"
else:
activity.notes = tag
async def _flush_batch(
sink: WealthfolioSink,
dedup: SyncRecordStore,
provider_name: str,
batch: list[tuple[str, Activity]],
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
) -> tuple[int, int]:
activities_only = [a for _, a in batch]
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
try:
created = await sink.import_activities(activities_only)
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
except Exception:
log.exception("Wealthfolio import failed for batch of %d", len(batch))
return 0, len(batch)
# Map returned Wealthfolio activity ids back to our external_ids.
by_external: dict[str, str | None] = {}
for row in created:
ext = row.get("external_id") if isinstance(row, dict) else None
wf_id = row.get("id") if isinstance(row, dict) else None
if ext:
by_external[str(ext)] = str(wf_id) if wf_id is not None else None
ok = 0
for original_account_id, a in batch:
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
wf_id = by_external.get(a.external_id)
dedup.record(
provider_name,
original_account_id,
a.external_id,
wealthfolio_activity_id=wf_id,
)
Add sync pipeline + trading212 CLI subcommand Context ------- Closes the gap between "Trading212 provider yields Activities" and "activities land in Wealthfolio with dedup". One generic pipeline function works for every provider (Phase 2 IMAP ingest and Phase 3 CSV drop will reuse it). This change ----------- - `broker_sync/pipeline.py` — sync_provider_to_wealthfolio(): ensure accounts exist in Wealthfolio, fetch, dedup against the local SQLite store, batch into Wealthfolio's CSV import at 200 rows each, record successful imports in the dedup store with the returned Wealthfolio activity id. Failed batches don't touch the dedup store so the next run retries. - Notes field stamped with `sync:<provider>:<external_id>` for human auditability — NOT used for dedup (the SQLite store owns that). - `broker_sync/cli.py` — new `trading212` subcommand driven by T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes: `steady` fetches last 7 days; `backfill` pulls all history. Exits 0 on clean run, 1 if any batch failed, 2 on config errors. - Pipeline tests with MockTransport: dedup-skip-then-import happy path (verifies imported CSV contains only the unseen rows and all three are recorded after the run); import-rejected path (verifies the failed row is NOT recorded so the next run retries). Test plan --------- ## Automated - poetry run pytest -q → 70 passed - poetry run mypy broker_sync tests → Success: no issues found in 29 source files - poetry run ruff check . → All checks passed! - poetry run broker-sync trading212 --help → shows all env vars + mode flag ## Manual Verification Live smoke test blocked on: 1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password, trading212_api_keys). 2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied). 3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA. Once those land: kubectl -n broker-sync create job t212-backfill \ --from=cronjob/broker-sync-trading212 -- \ broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00
ok += 1
return ok, 0
async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]:
"""Tiny helper — drain an async iterator to a list. Mainly for tests."""
return [a async for a in iterator]
pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL ## Context The 2026-04-18 reconciliation ended with Wealthfolio's historical Net Worth chart showing cliff-jumps on 5 dates — the single-day lump cash offsets we'd posted to "zero out" phantom cash. An operational fix replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL rows (see code-r9n note). That made the chart smooth — but only for today's data. Any future broker-sync run would re-introduce phantom cash because providers emit BUY/SELL only; nothing on the cash side. This commit bakes the match into the pipeline so **future syncs self-balance cash at import time** and the chart stays smooth. ## This change - broker_sync/pipeline.py - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount = qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount = qty * unit_price - fee). Returns None for every other activity type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash directly. The synthetic activity carries a deterministic external_id `cash-flow-match:<buy|sell>:<original external_id>` so SyncRecordStore dedup handles idempotency across runs. - New _with_cash_flow_match(a): expand helper — returns [a] or [a, match]. Pure, testable. - sync_provider_to_wealthfolio loops over the expansion, so each activity may now contribute up to two rows to the batch. `fetched` still counts provider-side activities only; `new_after_dedup` + `imported` + `failed` count expanded rows. - tests/test_pipeline.py - Updated two existing pipeline integration tests to reflect the now-larger batch shape (3 BUYs become 6 rows after expansion). - 5 new unit tests for the helpers: BUY → DEPOSIT with fee, SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass through, zero-amount trades skipped, _with_cash_flow_match returns the right cardinality. ## What is NOT in this change - Provider-level opt-out (e.g., Provider.emits_matching_cash_flow = True). No current provider emits real cash flows alongside trades (Trading212 only calls /orders, not /transactions), so the default "always match" is safe. If we ever wire a provider that pulls real bank-transfer dates, add the opt-out then. - Retroactive cleanup of already-imported WF accounts — already done operationally today. ## Verification ### Automated $ poetry run pytest tests/test_pipeline.py -v 7 passed in 0.40s $ poetry run pytest -q 133 passed, 1 skipped in 8.58s $ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py Success: no issues found in 2 source files $ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py All checks passed! ### Manual — next sync Once this image ships and broker-sync-trading212 / broker-sync-imap / broker-sync-fidelity run, confirm: 1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N imported=2N failed=0 (doubled due to matches). 2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency after import. 3. Net Worth chart has no new cliff-jumps. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00
# -- Cash-flow matching --------------------------------------------------
# BUY and SELL activities touch shares, not cash. Without an explicit
# DEPOSIT/WITHDRAWAL on the same day, WF models the account as having
# "phantom" cash debt — and its Net Worth chart shows cliff-jumps
# whenever a lump offset is applied after the fact.
#
# The pipeline emits a matching DEPOSIT (for BUY) or WITHDRAWAL (for SELL)
# right alongside each trade so the account's cash balance reconciles to
# ~0 at every point in time. Providers that already emit real cash flows
# (e.g. a Trading212 "deposit" endpoint, if we ever wire it) should set
# `Provider.emits_matching_cash_flow = True` to opt out — no provider
# does today (Trading212 only exposes BUY/SELL via the /orders endpoint).
def _matched_cash_flow(a: Activity) -> Activity | None:
"""Return the DEPOSIT/WITHDRAWAL that funds/receives the BUY/SELL `a`.
Returns None for every other activity type those already touch cash
directly (DEPOSIT, WITHDRAWAL, DIVIDEND, FEE, TAX, TRANSFER_*,
CONVERSION_*).
"""
if a.activity_type is ActivityType.BUY:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price + (a.fee or Decimal(0))
kind, tag = ActivityType.DEPOSIT, "buy"
elif a.activity_type is ActivityType.SELL:
if a.quantity is None or a.unit_price is None:
return None
amount = a.quantity * a.unit_price - (a.fee or Decimal(0))
kind, tag = ActivityType.WITHDRAWAL, "sell"
else:
return None
if amount <= 0:
return None
return Activity(
external_id=f"cash-flow-match:{tag}:{a.external_id}",
account_id=a.account_id,
account_type=a.account_type,
date=a.date,
activity_type=kind,
currency=a.currency,
amount=amount,
notes=f"cash-flow-match:{tag}:{a.external_id}",
)
def _with_cash_flow_match(a: Activity) -> list[Activity]:
"""Expand one activity into [original] or [original, matching cash flow]."""
match = _matched_cash_flow(a)
return [a] if match is None else [a, match]