"""Orchestrator: one-shot sync of a Provider's activities into Wealthfolio.""" from __future__ import annotations import logging from collections.abc import AsyncIterator from dataclasses import dataclass from datetime import datetime from broker_sync.dedup import SyncRecordStore from broker_sync.models import Account, Activity from broker_sync.providers.base import Provider from broker_sync.sinks.wealthfolio import WealthfolioSink log = logging.getLogger(__name__) # Wealthfolio's CSV import accepts arbitrary sizes but we batch to keep # any single-row validation error from failing the whole run. _BATCH_SIZE = 200 @dataclass class SyncResult: provider: str fetched: int new_after_dedup: int imported: int failed: int # rows that import_activities returned errors for async def sync_provider_to_wealthfolio( *, provider: Provider, sink: WealthfolioSink, dedup: SyncRecordStore, since: datetime | None = None, before: datetime | None = None, ) -> SyncResult: """Run the fetch → dedup → import → record pipeline for one provider. Caller owns sink lifecycle (including `login()` and `close()`). """ await _ensure_accounts(sink, provider.accounts()) fetched = 0 new_after_dedup = 0 imported = 0 failed = 0 batch: list[Activity] = [] async for activity in provider.fetch(since=since, before=before): fetched += 1 if dedup.has_seen(provider.name, activity.account_id, activity.external_id): continue new_after_dedup += 1 _tag_notes(activity, provider.name) batch.append(activity) if len(batch) >= _BATCH_SIZE: ok, bad = await _flush_batch(sink, dedup, provider.name, batch) imported += ok failed += bad batch = [] if batch: ok, bad = await _flush_batch(sink, dedup, provider.name, batch) imported += ok failed += bad log.info( "sync complete provider=%s fetched=%d new=%d imported=%d failed=%d", provider.name, fetched, new_after_dedup, imported, failed, ) return SyncResult( provider=provider.name, fetched=fetched, new_after_dedup=new_after_dedup, imported=imported, failed=failed, ) async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> None: for account in accounts: await sink.ensure_account(account) def _tag_notes(activity: Activity, provider_name: str) -> None: """Stamp the notes field with a dedup-friendly tag (belt-and-braces).""" tag = f"sync:{provider_name}:{activity.external_id}" if activity.notes: if tag not in activity.notes: activity.notes = f"{activity.notes} | {tag}" else: activity.notes = tag async def _flush_batch( sink: WealthfolioSink, dedup: SyncRecordStore, provider_name: str, batch: list[Activity], ) -> tuple[int, int]: try: created = await sink.import_activities(batch) except Exception: log.exception("Wealthfolio import failed for batch of %d", len(batch)) return 0, len(batch) # Map returned Wealthfolio activity ids back to our external_ids. # Wealthfolio's response shape is under review — we defensively look # for `external_id` on each returned row but fall back to positional # matching if the server doesn't echo it. by_external: dict[str, str | None] = {} for row in created: ext = row.get("external_id") if isinstance(row, dict) else None wf_id = row.get("id") if isinstance(row, dict) else None if ext: by_external[str(ext)] = str(wf_id) if wf_id is not None else None ok = 0 for a in batch: wf_id = by_external.get(a.external_id) dedup.record(provider_name, a.account_id, a.external_id, wealthfolio_activity_id=wf_id) ok += 1 return ok, 0 async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]: """Tiny helper — drain an async iterator to a list. Mainly for tests.""" return [a async for a in iterator]