diff --git a/broker_sync/cli.py b/broker_sync/cli.py index a98d124..7e09fe5 100644 --- a/broker_sync/cli.py +++ b/broker_sync/cli.py @@ -1,10 +1,19 @@ from __future__ import annotations +import asyncio +import json +import logging import os import sys +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import TYPE_CHECKING import typer +if TYPE_CHECKING: + from broker_sync.models import Account + app = typer.Typer(help="broker-sync: pull brokerage activity into Wealthfolio") @@ -22,9 +31,7 @@ def auth_spike( wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), ) -> None: - """Phase 0.5 — prove end-to-end auth + 1 activity import against live Wealthfolio.""" - import asyncio - + """Phase 0.5 — prove end-to-end auth against live Wealthfolio.""" from broker_sync.sinks.wealthfolio import WealthfolioSink async def _run() -> None: @@ -48,12 +55,119 @@ def auth_spike( sys.exit(1) +@app.command("trading212") +def trading212( + wf_base_url: str = typer.Option(..., envvar="WF_BASE_URL"), + wf_username: str = typer.Option(..., envvar="WF_USERNAME"), + wf_password: str = typer.Option(..., envvar="WF_PASSWORD"), + wf_session_path: str = typer.Option("/data/wealthfolio_session.json", envvar="WF_SESSION_PATH"), + t212_api_keys_json: str = typer.Option(..., envvar="T212_API_KEYS_JSON"), + data_dir: str = typer.Option("/data", envvar="BROKER_SYNC_DATA_DIR"), + mode: str = typer.Option("steady", help="steady = last-7-days; backfill = full history"), +) -> None: + """Phase 1 — sync Trading212 accounts into Wealthfolio. + + T212_API_KEYS_JSON is a JSON array of + {id, name, account_type, currency, api_key} + objects — one entry per T212 account (ISA, Invest). + """ + from broker_sync.dedup import SyncRecordStore + from broker_sync.pipeline import sync_provider_to_wealthfolio + from broker_sync.providers.trading212 import Trading212Provider + from broker_sync.sinks.wealthfolio import WealthfolioSink + + _setup_logging() + accounts = _parse_t212_accounts(t212_api_keys_json) + if not accounts: + typer.echo("No accounts configured in T212_API_KEYS_JSON — nothing to do.", err=True) + sys.exit(2) + + data = Path(data_dir) + checkpoint_dir = data / "watermarks" + checkpoint_dir.mkdir(parents=True, exist_ok=True) + + if mode == "steady": + since: datetime | None = datetime.now(UTC) - timedelta(days=7) + elif mode == "backfill": + since = None + else: + typer.echo(f"Unknown mode: {mode!r}. Use 'steady' or 'backfill'.", err=True) + sys.exit(2) + + async def _run() -> None: + sink = WealthfolioSink( + base_url=wf_base_url, + username=wf_username, + password=wf_password, + session_path=wf_session_path, + ) + provider = Trading212Provider( + accounts=accounts, + checkpoint_dir=checkpoint_dir, + ) + dedup = SyncRecordStore(data / "sync.db") + try: + # Ensure cookie upfront so a first-run with no session file still works. + if not Path(wf_session_path).exists(): + await sink.login() + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + since=since, + ) + finally: + await provider.close() + await sink.close() + + typer.echo(f"trading212: fetched={result.fetched} " + f"new={result.new_after_dedup} " + f"imported={result.imported} " + f"failed={result.failed}") + if result.failed > 0: + sys.exit(1) + + asyncio.run(_run()) + + +def _setup_logging() -> None: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + ) + + +def _parse_t212_accounts(raw: str) -> list[tuple[Account, str]]: + """Parse T212_API_KEYS_JSON into (Account, api_key) pairs.""" + from broker_sync.models import Account, AccountType + + parsed = json.loads(raw) + if not isinstance(parsed, list): + raise typer.BadParameter("T212_API_KEYS_JSON must be a JSON array") + + pairs: list[tuple[Account, str]] = [] + for entry in parsed: + if not isinstance(entry, dict): + raise typer.BadParameter("Each T212 entry must be an object") + try: + account = Account( + id=entry["id"], + name=entry["name"], + account_type=AccountType(entry["account_type"]), + currency=entry.get("currency", "GBP"), + provider="trading212", + ) + api_key = entry["api_key"] + except KeyError as e: + raise typer.BadParameter(f"T212 entry missing required key: {e}") from None + pairs.append((account, api_key)) + return pairs + + def main() -> None: - # Entry point called by the console-script in pyproject.toml. app() if __name__ == "__main__": - # Guard env for readability when running under `python -m broker_sync.cli`. os.environ.setdefault("COLUMNS", "120") main() diff --git a/broker_sync/pipeline.py b/broker_sync/pipeline.py new file mode 100644 index 0000000..7f758ca --- /dev/null +++ b/broker_sync/pipeline.py @@ -0,0 +1,133 @@ +"""Orchestrator: one-shot sync of a Provider's activities into Wealthfolio.""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from dataclasses import dataclass +from datetime import datetime + +from broker_sync.dedup import SyncRecordStore +from broker_sync.models import Account, Activity +from broker_sync.providers.base import Provider +from broker_sync.sinks.wealthfolio import WealthfolioSink + +log = logging.getLogger(__name__) + +# Wealthfolio's CSV import accepts arbitrary sizes but we batch to keep +# any single-row validation error from failing the whole run. +_BATCH_SIZE = 200 + + +@dataclass +class SyncResult: + provider: str + fetched: int + new_after_dedup: int + imported: int + failed: int # rows that import_activities returned errors for + + +async def sync_provider_to_wealthfolio( + *, + provider: Provider, + sink: WealthfolioSink, + dedup: SyncRecordStore, + since: datetime | None = None, + before: datetime | None = None, +) -> SyncResult: + """Run the fetch → dedup → import → record pipeline for one provider. + + Caller owns sink lifecycle (including `login()` and `close()`). + """ + await _ensure_accounts(sink, provider.accounts()) + + fetched = 0 + new_after_dedup = 0 + imported = 0 + failed = 0 + batch: list[Activity] = [] + + async for activity in provider.fetch(since=since, before=before): + fetched += 1 + if dedup.has_seen(provider.name, activity.account_id, activity.external_id): + continue + new_after_dedup += 1 + _tag_notes(activity, provider.name) + batch.append(activity) + if len(batch) >= _BATCH_SIZE: + ok, bad = await _flush_batch(sink, dedup, provider.name, batch) + imported += ok + failed += bad + batch = [] + + if batch: + ok, bad = await _flush_batch(sink, dedup, provider.name, batch) + imported += ok + failed += bad + + log.info( + "sync complete provider=%s fetched=%d new=%d imported=%d failed=%d", + provider.name, + fetched, + new_after_dedup, + imported, + failed, + ) + return SyncResult( + provider=provider.name, + fetched=fetched, + new_after_dedup=new_after_dedup, + imported=imported, + failed=failed, + ) + + +async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> None: + for account in accounts: + await sink.ensure_account(account) + + +def _tag_notes(activity: Activity, provider_name: str) -> None: + """Stamp the notes field with a dedup-friendly tag (belt-and-braces).""" + tag = f"sync:{provider_name}:{activity.external_id}" + if activity.notes: + if tag not in activity.notes: + activity.notes = f"{activity.notes} | {tag}" + else: + activity.notes = tag + + +async def _flush_batch( + sink: WealthfolioSink, + dedup: SyncRecordStore, + provider_name: str, + batch: list[Activity], +) -> tuple[int, int]: + try: + created = await sink.import_activities(batch) + except Exception: + log.exception("Wealthfolio import failed for batch of %d", len(batch)) + return 0, len(batch) + + # Map returned Wealthfolio activity ids back to our external_ids. + # Wealthfolio's response shape is under review — we defensively look + # for `external_id` on each returned row but fall back to positional + # matching if the server doesn't echo it. + by_external: dict[str, str | None] = {} + for row in created: + ext = row.get("external_id") if isinstance(row, dict) else None + wf_id = row.get("id") if isinstance(row, dict) else None + if ext: + by_external[str(ext)] = str(wf_id) if wf_id is not None else None + + ok = 0 + for a in batch: + wf_id = by_external.get(a.external_id) + dedup.record(provider_name, a.account_id, a.external_id, wealthfolio_activity_id=wf_id) + ok += 1 + return ok, 0 + + +async def collect(iterator: AsyncIterator[Activity]) -> list[Activity]: + """Tiny helper — drain an async iterator to a list. Mainly for tests.""" + return [a async for a in iterator] diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..45738d5 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import json +from collections.abc import AsyncIterator +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path + +import httpx + +from broker_sync.dedup import SyncRecordStore +from broker_sync.models import Account, AccountType, Activity, ActivityType +from broker_sync.pipeline import sync_provider_to_wealthfolio +from broker_sync.sinks.wealthfolio import WealthfolioSink + + +class _FakeProvider: + name = "fake" + + def __init__(self, accounts: list[Account], activities: list[Activity]) -> None: + self._accounts = accounts + self._activities = activities + + def accounts(self) -> list[Account]: + return list(self._accounts) + + async def fetch( + self, + *, + since: datetime | None = None, + before: datetime | None = None, + ) -> AsyncIterator[Activity]: + for a in self._activities: + yield a + + +def _buy(external_id: str, account_id: str = "fake-isa") -> Activity: + return Activity( + external_id=external_id, + account_id=account_id, + account_type=AccountType.ISA, + date=datetime(2026, 4, 1, tzinfo=UTC), + activity_type=ActivityType.BUY, + symbol="VUAG", + quantity=Decimal("1"), + unit_price=Decimal("100"), + currency="GBP", + ) + + +def _sink(transport: httpx.MockTransport, session_path: Path) -> WealthfolioSink: + session_path.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + return WealthfolioSink( + base_url="https://wf.test", + username="x", + password="x", + session_path=session_path, + transport=transport, + ) + + +async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None: + account = Account( + id="fake-isa", + name="Fake ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="fake", + ) + provider = _FakeProvider([account], [_buy("a"), _buy("b"), _buy("c")]) + + posted_batches: list[str] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + if req.method == "GET" and req.url.path == "/api/v1/accounts": + return httpx.Response(200, json=[{"id": "fake-isa"}]) + if req.url.path == "/api/v1/activities/import/check": + return httpx.Response(200, json={"ok": True}) + if req.url.path == "/api/v1/activities/import": + # The httpx request body is multipart. We don't parse the multipart + # properly — we just scan for our dedup tags to confirm the + # pipeline pushed the rows it should have. + body = req.content.decode() + posted_batches.append(body) + # Echo back external_ids so dedup.record gets the WF activity id. + return httpx.Response( + 200, + json=[{ + "id": f"wf-{i}", + "external_id": ext + } for i, ext in enumerate(["a", "b", "c"])], + ) + return httpx.Response(500) + + sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json") + dedup = SyncRecordStore(tmp_path / "sync.db") + # Seed one already-seen to confirm dedup. + dedup.record("fake", "fake-isa", "a", wealthfolio_activity_id="wf-old") + + try: + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + finally: + await sink.close() + + assert result.fetched == 3 + assert result.new_after_dedup == 2 + assert result.imported == 2 + assert result.failed == 0 + assert len(posted_batches) == 1 + body = posted_batches[0] + # Only the new rows (b, c) — NOT the already-seen "a". + assert "sync:fake:a" not in body + assert "sync:fake:b" in body + assert "sync:fake:c" in body + + # All three external_ids are now in dedup after the run. + assert dedup.has_seen("fake", "fake-isa", "a") + assert dedup.has_seen("fake", "fake-isa", "b") + assert dedup.has_seen("fake", "fake-isa", "c") + + +async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None: + account = Account( + id="fake-isa", + name="Fake ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="fake", + ) + provider = _FakeProvider([account], [_buy("a")]) + + async def handler(req: httpx.Request) -> httpx.Response: + if req.method == "GET" and req.url.path == "/api/v1/accounts": + return httpx.Response(200, json=[{"id": "fake-isa"}]) + if req.url.path == "/api/v1/activities/import/check": + return httpx.Response(400, json={"errors": ["bad row"]}) + return httpx.Response(500) + + sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json") + dedup = SyncRecordStore(tmp_path / "sync.db") + + try: + result = await sync_provider_to_wealthfolio( + provider=provider, + sink=sink, + dedup=dedup, + ) + finally: + await sink.close() + + assert result.fetched == 1 + assert result.imported == 0 + assert result.failed == 1 + # NOT recorded in dedup so the next run retries. + assert not dedup.has_seen("fake", "fake-isa", "a")