Commit graph

5 commits

Author SHA1 Message Date
Viktor Barzin
6450201af0 pipeline: emit matching DEPOSIT/WITHDRAWAL for every BUY/SELL
## Context

The 2026-04-18 reconciliation ended with Wealthfolio's historical Net
Worth chart showing cliff-jumps on 5 dates — the single-day lump cash
offsets we'd posted to "zero out" phantom cash. An operational fix
replaced those 6 lumps with 231 per-BUY/SELL matched DEPOSIT/WITHDRAWAL
rows (see code-r9n note). That made the chart smooth — but only for
today's data. Any future broker-sync run would re-introduce phantom
cash because providers emit BUY/SELL only; nothing on the cash side.

This commit bakes the match into the pipeline so **future syncs
self-balance cash at import time** and the chart stays smooth.

## This change

- broker_sync/pipeline.py
  - New _matched_cash_flow(a): returns a DEPOSIT for a BUY (amount =
    qty * unit_price + fee) or a WITHDRAWAL for a SELL (amount =
    qty * unit_price - fee). Returns None for every other activity
    type — DEPOSIT/WITHDRAWAL/DIVIDEND/etc. already touch cash
    directly. The synthetic activity carries a deterministic
    external_id `cash-flow-match:<buy|sell>:<original external_id>`
    so SyncRecordStore dedup handles idempotency across runs.
  - New _with_cash_flow_match(a): expand helper — returns [a] or
    [a, match]. Pure, testable.
  - sync_provider_to_wealthfolio loops over the expansion, so each
    activity may now contribute up to two rows to the batch. `fetched`
    still counts provider-side activities only; `new_after_dedup` +
    `imported` + `failed` count expanded rows.
- tests/test_pipeline.py
  - Updated two existing pipeline integration tests to reflect the
    now-larger batch shape (3 BUYs become 6 rows after expansion).
  - 5 new unit tests for the helpers: BUY → DEPOSIT with fee,
    SELL → WITHDRAWAL net of fee, DEPOSIT/WITHDRAWAL/DIVIDEND pass
    through, zero-amount trades skipped, _with_cash_flow_match
    returns the right cardinality.

## What is NOT in this change

- Provider-level opt-out (e.g., Provider.emits_matching_cash_flow =
  True). No current provider emits real cash flows alongside trades
  (Trading212 only calls /orders, not /transactions), so the default
  "always match" is safe. If we ever wire a provider that pulls real
  bank-transfer dates, add the opt-out then.
- Retroactive cleanup of already-imported WF accounts — already done
  operationally today.

## Verification

### Automated

$ poetry run pytest tests/test_pipeline.py -v
7 passed in 0.40s

$ poetry run pytest -q
133 passed, 1 skipped in 8.58s

$ poetry run mypy broker_sync/pipeline.py tests/test_pipeline.py
Success: no issues found in 2 source files

$ poetry run ruff check broker_sync/pipeline.py tests/test_pipeline.py
All checks passed!

### Manual — next sync

Once this image ships and broker-sync-trading212 / broker-sync-imap /
broker-sync-fidelity run, confirm:
1. kubectl -n broker-sync logs job/<next-run> → fetched=N new=2N
   imported=2N failed=0 (doubled due to matches).
2. WF /api/v1/holdings?accountId=<uuid> → cash ≈ £0 for every currency
   after import.
3. Net Worth chart has no new cliff-jumps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:12:49 +00:00
Viktor Barzin
6efd03570a Add imap-ingest CLI + ImapProvider: route emails to IE/Schwab parsers
Wires the IE + Schwab email parsers into an actual runnable sync. Walks
the IMAP mailbox, routes each message by sender domain:
  - *@investengine.com → invest_engine.parse_invest_engine_email
  - *@schwab.com       → schwab.parse_schwab_email
then pushes the resulting Activities through the shared pipeline.

broker-sync imap-ingest — new CLI command taking IMAP_HOST/USER/PASSWORD/
DIRECTORY (mirrors the old wealthfolio-sync image's env shape so the
Terraform CronJob's existing env wiring works unchanged).

Verified: poetry run pytest -q → 109 passed + 1 skipped; mypy strict
clean (37 files); ruff + yapf clean.
2026-04-17 22:12:05 +00:00
Viktor Barzin
b363032e42 sinks: feed /import/check enrichment into /import body
/import/check hydrates each ActivityImport with resolved assetId,
exchangeMic, quoteCcy, instrumentType, quoteMode. The /import endpoint
on Wealthfolio 3.2 does NOT re-resolve — passing an un-enriched row
returns 200 OK but silently drops the activity (activities=[] in the
response).

The first live run returned `imported=63 failed=0` but nothing reached
the database. Fixed by posting the hydrated rows from the check response
to /import instead of the original.

Requires the test to also return list-shaped check responses (matches
the upstream Json<Vec<ActivityImport>> signature on the Rust side).

poetry run pytest -q     70 passed
poetry run mypy          clean
poetry run ruff check    clean
2026-04-17 20:54:17 +00:00
Viktor Barzin
80ca009373 Match Wealthfolio accounts by providerAccountId, remap accountId on import
Context: Wealthfolio 3.2 generates its own UUIDs on POST /accounts, ignoring any
`id` we supply. Our logical Account.id lives on as `providerAccountId`, which
WF preserves verbatim.

Live run created six duplicate accounts because ensure_account looked up by
our `id`, never found it, and POSTed a new account on every attempt. Deleted
the duplicates manually via DELETE /accounts/{id}.

This change:
- ensure_account now returns Wealthfolio's UUID; matches existing via
  (provider, providerAccountId)
- pipeline remaps activity.account_id to the WF UUID at submission time
  but keeps dedup keyed on our stable id (WF resets must not blow away
  the whole dedup history)
- test updates to the new account-shape + dedup key expectations

poetry run pytest -q    70 passed
poetry run mypy         clean
poetry run ruff check   clean
2026-04-17 20:44:32 +00:00
Viktor Barzin
6fc2ac5322 Add sync pipeline + trading212 CLI subcommand
Some checks are pending
CI / test (push) Waiting to run
CI / build (push) Blocked by required conditions
CI / deploy (push) Blocked by required conditions
Context
-------
Closes the gap between "Trading212 provider yields Activities" and
"activities land in Wealthfolio with dedup". One generic pipeline
function works for every provider (Phase 2 IMAP ingest and Phase 3
CSV drop will reuse it).

This change
-----------
- `broker_sync/pipeline.py` — sync_provider_to_wealthfolio():
  ensure accounts exist in Wealthfolio, fetch, dedup against the local
  SQLite store, batch into Wealthfolio's CSV import at 200 rows each,
  record successful imports in the dedup store with the returned
  Wealthfolio activity id. Failed batches don't touch the dedup store
  so the next run retries.
- Notes field stamped with `sync:<provider>:<external_id>` for human
  auditability — NOT used for dedup (the SQLite store owns that).
- `broker_sync/cli.py` — new `trading212` subcommand driven by
  T212_API_KEYS_JSON + WF_* + BROKER_SYNC_DATA_DIR env vars. Two modes:
  `steady` fetches last 7 days; `backfill` pulls all history. Exits 0
  on clean run, 1 if any batch failed, 2 on config errors.
- Pipeline tests with MockTransport: dedup-skip-then-import happy path
  (verifies imported CSV contains only the unseen rows and all three
  are recorded after the run); import-rejected path (verifies the
  failed row is NOT recorded so the next run retries).

Test plan
---------
## Automated
- poetry run pytest -q  →  70 passed
- poetry run mypy broker_sync tests  →  Success: no issues found in 29 source files
- poetry run ruff check .  →  All checks passed!
- poetry run broker-sync trading212 --help  →  shows all env vars + mode flag

## Manual Verification
Live smoke test blocked on:
1. Vault secret/broker-sync seeded (wf_base_url, wf_username, wf_password,
   trading212_api_keys).
2. Terraform stack applied (infra/stacks/broker-sync/ — staged, not yet applied).
3. Image pushed to viktorbarzin/broker-sync on DockerHub via GHA.

Once those land:
    kubectl -n broker-sync create job t212-backfill \
      --from=cronjob/broker-sync-trading212 -- \
      broker-sync trading212 --mode=backfill
2026-04-17 19:45:43 +00:00