sinks: feed /import/check enrichment into /import body

/import/check hydrates each ActivityImport with resolved assetId,
exchangeMic, quoteCcy, instrumentType, quoteMode. The /import endpoint
on Wealthfolio 3.2 does NOT re-resolve — passing an un-enriched row
returns 200 OK but silently drops the activity (activities=[] in the
response).

The first live run returned `imported=63 failed=0` but nothing reached
the database. Fixed by posting the hydrated rows from the check response
to /import instead of the original.

Requires the test to also return list-shaped check responses (matches
the upstream Json<Vec<ActivityImport>> signature on the Rust side).

poetry run pytest -q     70 passed
poetry run mypy          clean
poetry run ruff check    clean
This commit is contained in:
Viktor Barzin 2026-04-17 20:54:17 +00:00
parent 80ca009373
commit b363032e42
3 changed files with 47 additions and 25 deletions

View file

@ -195,9 +195,14 @@ class WealthfolioSink:
rows = [self._activity_to_import_row(a) for a in activities] rows = [self._activity_to_import_row(a) for a in activities]
if not rows: if not rows:
return [] return []
body = {"activities": rows}
check = await self._request("POST", _IMPORT_CHECK, json=body) # Step 1 — /import/check hydrates each row with resolved asset_id,
# exchange_mic, quote_ccy, instrument_type, quote_mode (and flags
# errors). The /import endpoint on Wealthfolio 3.2+ DOES NOT
# re-resolve — if we send the un-enriched row the activity is
# silently dropped (import returns 200 OK with activities=[] in
# the payload). We must feed check's output into import.
check = await self._request("POST", _IMPORT_CHECK, json={"activities": rows})
if check.status_code >= 400: if check.status_code >= 400:
try: try:
payload = check.json() payload = check.json()
@ -205,22 +210,25 @@ class WealthfolioSink:
payload = {"raw": check.text} payload = {"raw": check.text}
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}") raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
# Dry-run may flag per-row errors inside a 200 response.
checked = check.json() checked = check.json()
if isinstance(checked, list): if not isinstance(checked, list):
bad = [r for r in checked if isinstance(r, dict) and r.get("errors")] raise ImportValidationError(
if bad: f"Wealthfolio /import/check returned non-list: {type(checked).__name__}"
# Show the first few to aid debugging without leaking everything. )
raise ImportValidationError(
f"Wealthfolio /import/check flagged {len(bad)} rows; "
f"first: {bad[0]}"
)
real = await self._request("POST", _IMPORT_REAL, json=body) invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")]
if invalid:
raise ImportValidationError(
f"Wealthfolio /import/check flagged {len(invalid)} row(s); "
f"first: {invalid[0]}"
)
# Drop any row the server marked is_valid=false (shouldn't happen
# without errors, but defensive).
valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")]
real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows})
real.raise_for_status() real.raise_for_status()
raw = real.json() raw = real.json()
# import_activities returns ImportActivitiesResult which is an object,
# but we also accept a list (older versions). Normalise.
if isinstance(raw, dict) and "activities" in raw: if isinstance(raw, dict) and "activities" in raw:
got = raw["activities"] got = raw["activities"]
assert isinstance(got, list) assert isinstance(got, list)

View file

@ -218,14 +218,24 @@ async def test_import_dry_run_then_real(tmp_path: Path) -> None:
async def handler(req: httpx.Request) -> httpx.Response: async def handler(req: httpx.Request) -> httpx.Response:
calls.append(req.url.path) calls.append(req.url.path)
if req.url.path == "/api/v1/activities/import/check": if req.url.path == "/api/v1/activities/import/check":
return httpx.Response(200, json={"ok": True, "rows": 1}) # /import/check hydrates and returns a list of ActivityImport.
return httpx.Response(200, json=[
{
"symbol": "VUAG",
"isValid": True,
"errors": None,
"assetId": "enriched-asset-uuid",
"exchangeMic": "XLON",
},
])
if req.url.path == "/api/v1/activities/import": if req.url.path == "/api/v1/activities/import":
return httpx.Response( return httpx.Response(
200, 200,
json=[{ json={
"id": "wf-1", "activities": [
"external_id": "t212:1" {"id": "wf-1", "external_id": "t212:1"},
}], ],
},
) )
return httpx.Response(500) return httpx.Response(500)

View file

@ -83,17 +83,21 @@ async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None:
}], }],
) )
if req.url.path == "/api/v1/activities/import/check": if req.url.path == "/api/v1/activities/import/check":
return httpx.Response(200, json={"ok": True}) body = json.loads(req.content)
# Echo each activity back marked valid (mimic Wealthfolio's
# hydrate step).
return httpx.Response(200, json=[
{**a, "isValid": True, "errors": None} for a in body["activities"]
])
if req.url.path == "/api/v1/activities/import": if req.url.path == "/api/v1/activities/import":
body = req.content.decode() body = req.content.decode()
posted_batches.append(body) posted_batches.append(body)
# Echo back external_ids so dedup.record gets the WF activity id.
return httpx.Response( return httpx.Response(
200, 200,
json=[{ json={"activities": [
"id": f"wf-{i}", {"id": f"wf-{i}", "external_id": ext}
"external_id": ext for i, ext in enumerate(["a", "b", "c"])
} for i, ext in enumerate(["a", "b", "c"])], ]},
) )
return httpx.Response(500) return httpx.Response(500)