From b363032e42b5b59ca46f233f7243a1ad9d59b76f Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 20:54:17 +0000 Subject: [PATCH] sinks: feed /import/check enrichment into /import body MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit /import/check hydrates each ActivityImport with resolved assetId, exchangeMic, quoteCcy, instrumentType, quoteMode. The /import endpoint on Wealthfolio 3.2 does NOT re-resolve — passing an un-enriched row returns 200 OK but silently drops the activity (activities=[] in the response). The first live run returned `imported=63 failed=0` but nothing reached the database. Fixed by posting the hydrated rows from the check response to /import instead of the original. Requires the test to also return list-shaped check responses (matches the upstream Json> signature on the Rust side). poetry run pytest -q 70 passed poetry run mypy clean poetry run ruff check clean --- broker_sync/sinks/wealthfolio.py | 36 +++++++++++++++++++------------- tests/sinks/test_wealthfolio.py | 20 +++++++++++++----- tests/test_pipeline.py | 16 ++++++++------ 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index adb5305..f82817f 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -195,9 +195,14 @@ class WealthfolioSink: rows = [self._activity_to_import_row(a) for a in activities] if not rows: return [] - body = {"activities": rows} - check = await self._request("POST", _IMPORT_CHECK, json=body) + # Step 1 — /import/check hydrates each row with resolved asset_id, + # exchange_mic, quote_ccy, instrument_type, quote_mode (and flags + # errors). The /import endpoint on Wealthfolio 3.2+ DOES NOT + # re-resolve — if we send the un-enriched row the activity is + # silently dropped (import returns 200 OK with activities=[] in + # the payload). We must feed check's output into import. + check = await self._request("POST", _IMPORT_CHECK, json={"activities": rows}) if check.status_code >= 400: try: payload = check.json() @@ -205,22 +210,25 @@ class WealthfolioSink: payload = {"raw": check.text} raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}") - # Dry-run may flag per-row errors inside a 200 response. checked = check.json() - if isinstance(checked, list): - bad = [r for r in checked if isinstance(r, dict) and r.get("errors")] - if bad: - # Show the first few to aid debugging without leaking everything. - raise ImportValidationError( - f"Wealthfolio /import/check flagged {len(bad)} rows; " - f"first: {bad[0]}" - ) + if not isinstance(checked, list): + raise ImportValidationError( + f"Wealthfolio /import/check returned non-list: {type(checked).__name__}" + ) - real = await self._request("POST", _IMPORT_REAL, json=body) + invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")] + if invalid: + raise ImportValidationError( + f"Wealthfolio /import/check flagged {len(invalid)} row(s); " + f"first: {invalid[0]}" + ) + # Drop any row the server marked is_valid=false (shouldn't happen + # without errors, but defensive). + valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")] + + real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows}) real.raise_for_status() raw = real.json() - # import_activities returns ImportActivitiesResult which is an object, - # but we also accept a list (older versions). Normalise. if isinstance(raw, dict) and "activities" in raw: got = raw["activities"] assert isinstance(got, list) diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index b5e2fb8..f554a19 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -218,14 +218,24 @@ async def test_import_dry_run_then_real(tmp_path: Path) -> None: async def handler(req: httpx.Request) -> httpx.Response: calls.append(req.url.path) if req.url.path == "/api/v1/activities/import/check": - return httpx.Response(200, json={"ok": True, "rows": 1}) + # /import/check hydrates and returns a list of ActivityImport. + return httpx.Response(200, json=[ + { + "symbol": "VUAG", + "isValid": True, + "errors": None, + "assetId": "enriched-asset-uuid", + "exchangeMic": "XLON", + }, + ]) if req.url.path == "/api/v1/activities/import": return httpx.Response( 200, - json=[{ - "id": "wf-1", - "external_id": "t212:1" - }], + json={ + "activities": [ + {"id": "wf-1", "external_id": "t212:1"}, + ], + }, ) return httpx.Response(500) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index a59b6cd..198e58b 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -83,17 +83,21 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None: }], ) if req.url.path == "/api/v1/activities/import/check": - return httpx.Response(200, json={"ok": True}) + body = json.loads(req.content) + # Echo each activity back marked valid (mimic Wealthfolio's + # hydrate step). + return httpx.Response(200, json=[ + {**a, "isValid": True, "errors": None} for a in body["activities"] + ]) if req.url.path == "/api/v1/activities/import": body = req.content.decode() posted_batches.append(body) - # Echo back external_ids so dedup.record gets the WF activity id. return httpx.Response( 200, - json=[{ - "id": f"wf-{i}", - "external_id": ext - } for i, ext in enumerate(["a", "b", "c"])], + json={"activities": [ + {"id": f"wf-{i}", "external_id": ext} + for i, ext in enumerate(["a", "b", "c"]) + ]}, ) return httpx.Response(500)