sinks: read summary.imported as truth for partial-persist detection

The /import response returns activities=[input echo with errors annotated]
— its length equals input size regardless of actual persistence. The
summary{total,imported,skipped,duplicates} block is the authoritative
signal. When imported<total, raise with errorMessage + skipped + duplicates
counts.
This commit is contained in:
Viktor Barzin 2026-04-17 22:30:24 +00:00
parent 4e2da87637
commit 74b2179c83

View file

@ -221,20 +221,35 @@ class WealthfolioSink:
real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows}) real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows})
real.raise_for_status() real.raise_for_status()
raw = real.json() raw = real.json()
# Two observed response shapes:
# - {activities:[...], importRunId:"...", summary:{total,imported,skipped,...}}
# - bare list (older builds)
if isinstance(raw, dict) and "activities" in raw: if isinstance(raw, dict) and "activities" in raw:
got = raw["activities"] got = raw["activities"]
assert isinstance(got, list) summary = raw.get("summary") if isinstance(raw.get("summary"), dict) else None
elif isinstance(raw, list): elif isinstance(raw, list):
got = raw got = raw
summary = None
else: else:
got = [] got = []
# Silent-drop detection: if we sent N valid rows but got 0 back, something summary = None
# is silently rejecting them (usually a date-format or asset-resolution # Summary.imported is THE truth. The `activities` field echoes input
# quirk that check() didn't catch). Raise so the pipeline records failure # with errors annotated — its length equals input even when zero
# instead of marking the rows as synced when they never landed. # actually persisted.
if valid_rows and not got: if summary is not None:
# Also surface any per-row `errors` or `warnings` from the check step imported_n = int(summary.get("imported", 0))
# — those are often the best hint about why /import dropped them. total_n = int(summary.get("total", len(valid_rows)))
if imported_n < total_n:
err_msg = summary.get("errorMessage") or "no errorMessage"
skipped = int(summary.get("skipped", 0))
dupes = int(summary.get("duplicates", 0))
raise ImportValidationError(
f"Wealthfolio /import persisted {imported_n}/{total_n} "
f"(skipped={skipped} duplicates={dupes}). "
f"errorMessage: {err_msg}"
)
# Legacy silent-drop guard for no-summary responses.
elif valid_rows and not got:
first_warn = next( first_warn = next(
(r.get("warnings") for r in checked if isinstance(r, dict) and r.get("warnings")), (r.get("warnings") for r in checked if isinstance(r, dict) and r.get("warnings")),
None, None,
@ -242,6 +257,6 @@ class WealthfolioSink:
raise ImportValidationError( raise ImportValidationError(
f"Wealthfolio /import silently dropped all {len(valid_rows)} rows. " f"Wealthfolio /import silently dropped all {len(valid_rows)} rows. "
f"First checked row: {checked[0] if checked else 'none'}. " f"First checked row: {checked[0] if checked else 'none'}. "
f"First warning (if any): {first_warn}" f"First warning: {first_warn}"
) )
return got return got