broker-sync/broker_sync/pipeline.py
Viktor Barzin 80ca009373 Match Wealthfolio accounts by providerAccountId, remap accountId on import
Context: Wealthfolio 3.2 generates its own UUIDs on POST /accounts, ignoring any
`id` we supply. Our logical Account.id lives on as `providerAccountId`, which
WF preserves verbatim.

Live run created six duplicate accounts because ensure_account looked up by
our `id`, never found it, and POSTed a new account on every attempt. Deleted
the duplicates manually via DELETE /accounts/{id}.

This change:
- ensure_account now returns Wealthfolio's UUID; matches existing via
  (provider, providerAccountId)
- pipeline remaps activity.account_id to the WF UUID at submission time
  but keeps dedup keyed on our stable id (WF resets must not blow away
  the whole dedup history)
- test updates to the new account-shape + dedup key expectations

poetry run pytest -q    70 passed
poetry run mypy         clean
poetry run ruff check   clean
2026-04-17 20:44:32 +00:00

146 lines
4.6 KiB
Python

"""Orchestrator: one-shot sync of a Provider's activities into Wealthfolio."""
from __future__ import annotations
import logging
from collections.abc import AsyncIterator
from dataclasses import dataclass
from datetime import datetime
from broker_sync.dedup import SyncRecordStore
from broker_sync.models import Account, Activity
from broker_sync.providers.base import Provider
from broker_sync.sinks.wealthfolio import WealthfolioSink
log = logging.getLogger(__name__)
# Wealthfolio's CSV import accepts arbitrary sizes but we batch to keep
# any single-row validation error from failing the whole run.
_BATCH_SIZE = 200
@dataclass
class SyncResult:
provider: str
fetched: int
new_after_dedup: int
imported: int
failed: int # rows that import_activities returned errors for
async def sync_provider_to_wealthfolio(
*,
provider: Provider,
sink: WealthfolioSink,
dedup: SyncRecordStore,
since: datetime | None = None,
before: datetime | None = None,
) -> SyncResult:
"""Run the fetch → dedup → import → record pipeline for one provider.
Caller owns sink lifecycle (including `login()` and `close()`).
"""
wf_account_ids = await _ensure_accounts(sink, provider.accounts())
fetched = 0
new_after_dedup = 0
imported = 0
failed = 0
# Batches are (original_account_id, remapped_for_import Activity) pairs.
# Dedup keys on our stable account_id; the import row uses Wealthfolio's UUID.
batch: list[tuple[str, Activity]] = []
async for activity in provider.fetch(since=since, before=before):
fetched += 1
if dedup.has_seen(provider.name, activity.account_id, activity.external_id):
continue
new_after_dedup += 1
_tag_notes(activity, provider.name)
original_account_id = activity.account_id
# Submit under Wealthfolio's UUID; keep dedup keyed on our id.
wf_id = wf_account_ids.get(original_account_id)
if wf_id:
activity.account_id = wf_id
batch.append((original_account_id, activity))
if len(batch) >= _BATCH_SIZE:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
batch = []
if batch:
ok, bad = await _flush_batch(sink, dedup, provider.name, batch)
imported += ok
failed += bad
log.info(
"sync complete provider=%s fetched=%d new=%d imported=%d failed=%d",
provider.name,
fetched,
new_after_dedup,
imported,
failed,
)
return SyncResult(
provider=provider.name,
fetched=fetched,
new_after_dedup=new_after_dedup,
imported=imported,
failed=failed,
)
async def _ensure_accounts(
sink: WealthfolioSink, accounts: list[Account]
) -> dict[str, str]:
"""Return {our_account_id: wealthfolio_uuid}."""
out: dict[str, str] = {}
for account in accounts:
out[account.id] = await sink.ensure_account(account)
return out
def _tag_notes(activity: Activity, provider_name: str) -> None:
"""Stamp the notes field with a dedup-friendly tag (belt-and-braces)."""
tag = f"sync:{provider_name}:{activity.external_id}"
if activity.notes:
if tag not in activity.notes:
activity.notes = f"{activity.notes} | {tag}"
else:
activity.notes = tag
async def _flush_batch(
sink: WealthfolioSink,
dedup: SyncRecordStore,
provider_name: str,
batch: list[tuple[str, Activity]],
) -> tuple[int, int]:
activities_only = [a for _, a in batch]
try:
created = await sink.import_activities(activities_only)
except Exception:
log.exception("Wealthfolio import failed for batch of %d", len(batch))
return 0, len(batch)
# Map returned Wealthfolio activity ids back to our external_ids.
by_external: dict[str, str | None] = {}
for row in created:
ext = row.get("external_id") if isinstance(row, dict) else None
wf_id = row.get("id") if isinstance(row, dict) else None
if ext:
by_external[str(ext)] = str(wf_id) if wf_id is not None else None
ok = 0
for original_account_id, a in batch:
wf_id = by_external.get(a.external_id)
dedup.record(
provider_name, original_account_id, a.external_id,
wealthfolio_activity_id=wf_id,
)
ok += 1
return ok, 0
async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]:
"""Tiny helper — drain an async iterator to a list. Mainly for tests."""
return [a async for a in iterator]