From 80ca00937338e127d626ec779b4d170f7420c21d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 20:44:32 +0000 Subject: [PATCH] Match Wealthfolio accounts by providerAccountId, remap accountId on import Context: Wealthfolio 3.2 generates its own UUIDs on POST /accounts, ignoring any `id` we supply. Our logical Account.id lives on as `providerAccountId`, which WF preserves verbatim. Live run created six duplicate accounts because ensure_account looked up by our `id`, never found it, and POSTed a new account on every attempt. Deleted the duplicates manually via DELETE /accounts/{id}. This change: - ensure_account now returns Wealthfolio's UUID; matches existing via (provider, providerAccountId) - pipeline remaps activity.account_id to the WF UUID at submission time but keeps dedup keyed on our stable id (WF resets must not blow away the whole dedup history) - test updates to the new account-shape + dedup key expectations poetry run pytest -q 70 passed poetry run mypy clean poetry run ruff check clean --- broker_sync/pipeline.py | 37 +++++++++++++++++++++----------- broker_sync/sinks/wealthfolio.py | 30 +++++++++++++++++++++----- tests/sinks/test_wealthfolio.py | 31 ++++++++++++++++++++------ tests/test_pipeline.py | 22 ++++++++++++++----- 4 files changed, 91 insertions(+), 29 deletions(-) diff --git a/broker_sync/pipeline.py b/broker_sync/pipeline.py index 7f758ca..12caca7 100644 --- a/broker_sync/pipeline.py +++ b/broker_sync/pipeline.py @@ -39,13 +39,15 @@ async def sync_provider_to_wealthfolio( Caller owns sink lifecycle (including `login()` and `close()`). """ - await _ensure_accounts(sink, provider.accounts()) + wf_account_ids = await _ensure_accounts(sink, provider.accounts()) fetched = 0 new_after_dedup = 0 imported = 0 failed = 0 - batch: list[Activity] = [] + # Batches are (original_account_id, remapped_for_import Activity) pairs. + # Dedup keys on our stable account_id; the import row uses Wealthfolio's UUID. + batch: list[tuple[str, Activity]] = [] async for activity in provider.fetch(since=since, before=before): fetched += 1 @@ -53,7 +55,12 @@ async def sync_provider_to_wealthfolio( continue new_after_dedup += 1 _tag_notes(activity, provider.name) - batch.append(activity) + original_account_id = activity.account_id + # Submit under Wealthfolio's UUID; keep dedup keyed on our id. + wf_id = wf_account_ids.get(original_account_id) + if wf_id: + activity.account_id = wf_id + batch.append((original_account_id, activity)) if len(batch) >= _BATCH_SIZE: ok, bad = await _flush_batch(sink, dedup, provider.name, batch) imported += ok @@ -82,9 +89,14 @@ async def sync_provider_to_wealthfolio( ) -async def _ensure_accounts(sink: WealthfolioSink, accounts: list[Account]) -> None: +async def _ensure_accounts( + sink: WealthfolioSink, accounts: list[Account] +) -> dict[str, str]: + """Return {our_account_id: wealthfolio_uuid}.""" + out: dict[str, str] = {} for account in accounts: - await sink.ensure_account(account) + out[account.id] = await sink.ensure_account(account) + return out def _tag_notes(activity: Activity, provider_name: str) -> None: @@ -101,18 +113,16 @@ async def _flush_batch( sink: WealthfolioSink, dedup: SyncRecordStore, provider_name: str, - batch: list[Activity], + batch: list[tuple[str, Activity]], ) -> tuple[int, int]: + activities_only = [a for _, a in batch] try: - created = await sink.import_activities(batch) + created = await sink.import_activities(activities_only) except Exception: log.exception("Wealthfolio import failed for batch of %d", len(batch)) return 0, len(batch) # Map returned Wealthfolio activity ids back to our external_ids. - # Wealthfolio's response shape is under review — we defensively look - # for `external_id` on each returned row but fall back to positional - # matching if the server doesn't echo it. by_external: dict[str, str | None] = {} for row in created: ext = row.get("external_id") if isinstance(row, dict) else None @@ -121,9 +131,12 @@ async def _flush_batch( by_external[str(ext)] = str(wf_id) if wf_id is not None else None ok = 0 - for a in batch: + for original_account_id, a in batch: wf_id = by_external.get(a.external_id) - dedup.record(provider_name, a.account_id, a.external_id, wealthfolio_activity_id=wf_id) + dedup.record( + provider_name, original_account_id, a.external_id, + wealthfolio_activity_id=wf_id, + ) ok += 1 return ok, 0 diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index 807263f..adb5305 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -120,17 +120,30 @@ class WealthfolioSink: assert isinstance(raw, list) return raw - async def ensure_account(self, account: Account) -> None: + async def ensure_account(self, account: Account) -> str: + """Idempotently create the account and return Wealthfolio's UUID for it. + + Wealthfolio generates its own UUIDs on POST /accounts, ignoring any + `id` we supply. We identify accounts by (provider, providerAccountId) + which Wealthfolio DOES preserve verbatim. Our own Account.id is + used as the providerAccountId. + """ existing = await self.list_accounts() - if any(a.get("id") == account.id for a in existing): - return - # Wealthfolio 3.2's NewAccount is camelCase with required booleans. + for a in existing: + if ( + a.get("provider") == account.provider + and a.get("providerAccountId") == account.id + ): + wf_id = a.get("id") + assert isinstance(wf_id, str) + return wf_id + + # NewAccount is camelCase with required booleans. # See apps/server/src/models.rs#NewAccount. resp = await self._request( "POST", _ACCOUNTS_PATH, json={ - "id": account.id, "name": account.name, "accountType": str(account.account_type), "currency": account.currency, @@ -143,6 +156,13 @@ class WealthfolioSink: }, ) resp.raise_for_status() + created = resp.json() + wf_id = created.get("id") + if not isinstance(wf_id, str): + raise WealthfolioError( + f"POST /accounts returned no id: {created}" + ) + return wf_id # -- activity import -- diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index 049f6aa..b5e2fb8 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -127,7 +127,7 @@ async def test_401_triggers_single_reauth_and_retry(tmp_path: Path) -> None: # -- Account ensure -- -async def test_ensure_account_no_op_if_exists(tmp_path: Path) -> None: +async def test_ensure_account_returns_existing_wf_id(tmp_path: Path) -> None: sp = tmp_path / "s.json" sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) @@ -135,11 +135,15 @@ async def test_ensure_account_no_op_if_exists(tmp_path: Path) -> None: async def handler(req: httpx.Request) -> httpx.Response: if req.method == "GET" and req.url.path == "/api/v1/accounts": + # Wealthfolio stores its own UUID for id; providerAccountId is + # what we gave it. return httpx.Response( 200, json=[{ - "id": "t212-isa", - "name": "Trading212 ISA" + "id": "uuid-wf-123", + "name": "Trading212 ISA", + "provider": "trading212", + "providerAccountId": "t212-isa", }], ) if req.method == "POST": @@ -154,7 +158,8 @@ async def test_ensure_account_no_op_if_exists(tmp_path: Path) -> None: currency="GBP", provider="trading212", ) - await sink.ensure_account(acc) + wf_id = await sink.ensure_account(acc) + assert wf_id == "uuid-wf-123" assert posts == [] # no create @@ -169,7 +174,14 @@ async def test_ensure_account_creates_if_missing(tmp_path: Path) -> None: return httpx.Response(200, json=[]) if req.method == "POST" and req.url.path == "/api/v1/accounts": posted.append(json.loads(req.content)) - return httpx.Response(200, json={"id": "t212-isa"}) + return httpx.Response( + 200, + json={ + "id": "uuid-new-456", + "provider": "trading212", + "providerAccountId": "t212-isa", + }, + ) return httpx.Response(500) sink = _client(httpx.MockTransport(handler), sp) @@ -180,9 +192,14 @@ async def test_ensure_account_creates_if_missing(tmp_path: Path) -> None: currency="GBP", provider="trading212", ) - await sink.ensure_account(acc) + wf_id = await sink.ensure_account(acc) + assert wf_id == "uuid-new-456" assert len(posted) == 1 - assert posted[0]["id"] == "t212-isa" + # We no longer send our logical id — Wealthfolio would ignore it. We + # DO send our id as providerAccountId so Wealthfolio preserves it for + # future matching. + assert "id" not in posted[0] + assert posted[0]["providerAccountId"] == "t212-isa" assert posted[0]["accountType"] == "ISA" assert posted[0]["currency"] == "GBP" assert posted[0]["isActive"] is True diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 45738d5..a59b6cd 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -73,13 +73,18 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None: async def handler(req: httpx.Request) -> httpx.Response: if req.method == "GET" and req.url.path == "/api/v1/accounts": - return httpx.Response(200, json=[{"id": "fake-isa"}]) + # Return account with Wealthfolio-assigned UUID + our providerAccountId. + return httpx.Response( + 200, + json=[{ + "id": "wf-uuid-fake-isa", + "provider": "fake", + "providerAccountId": "fake-isa", + }], + ) if req.url.path == "/api/v1/activities/import/check": return httpx.Response(200, json={"ok": True}) if req.url.path == "/api/v1/activities/import": - # The httpx request body is multipart. We don't parse the multipart - # properly — we just scan for our dedup tags to confirm the - # pipeline pushed the rows it should have. body = req.content.decode() posted_batches.append(body) # Echo back external_ids so dedup.record gets the WF activity id. @@ -135,7 +140,14 @@ async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> N async def handler(req: httpx.Request) -> httpx.Response: if req.method == "GET" and req.url.path == "/api/v1/accounts": - return httpx.Response(200, json=[{"id": "fake-isa"}]) + return httpx.Response( + 200, + json=[{ + "id": "wf-uuid-fake-isa", + "provider": "fake", + "providerAccountId": "fake-isa", + }], + ) if req.url.path == "/api/v1/activities/import/check": return httpx.Response(400, json={"errors": ["bad row"]}) return httpx.Response(500)