broker-sync/broker_sync/sinks/wealthfolio.py
Viktor Barzin b363032e42 sinks: feed /import/check enrichment into /import body
/import/check hydrates each ActivityImport with resolved assetId,
exchangeMic, quoteCcy, instrumentType, quoteMode. The /import endpoint
on Wealthfolio 3.2 does NOT re-resolve — passing an un-enriched row
returns 200 OK but silently drops the activity (activities=[] in the
response).

The first live run returned `imported=63 failed=0` but nothing reached
the database. Fixed by posting the hydrated rows from the check response
to /import instead of the original.

Requires the test to also return list-shaped check responses (matches
the upstream Json<Vec<ActivityImport>> signature on the Rust side).

poetry run pytest -q     70 passed
poetry run mypy          clean
poetry run ruff check    clean
2026-04-17 20:54:17 +00:00

238 lines
8.5 KiB
Python

from __future__ import annotations
import json
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import httpx
from broker_sync.models import Account, Activity
_LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import"
class WealthfolioError(Exception):
pass
class WealthfolioUnauthorizedError(WealthfolioError):
"""Raised when login itself fails (bad creds or Wealthfolio down).
Distinct from a 401 on a random endpoint — those trigger an
automatic re-login attempt.
"""
class ImportValidationError(WealthfolioError):
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
class WealthfolioSink:
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
to disk so CronJob pods can reuse them across runs (Wealthfolio's
/auth/login is 5-req/min rate-limited).
Not multi-process safe — file locking is added in Phase 1 when we
fan out to multiple CronJobs.
"""
def __init__(
self,
*,
base_url: str,
username: str,
password: str,
session_path: Path | str,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._username = username
self._password = password
self._session_path = Path(session_path)
self._client = httpx.AsyncClient(
base_url=base_url.rstrip("/"),
timeout=30.0,
transport=transport,
)
async def close(self) -> None:
await self._client.aclose()
# -- session --
def _load_cookies(self) -> dict[str, str]:
if not self._session_path.exists():
return {}
raw = json.loads(self._session_path.read_text())
got = raw.get("cookies", {})
assert isinstance(got, dict)
return got
def _save_cookies(self, cookies: dict[str, str]) -> None:
self._session_path.parent.mkdir(parents=True, exist_ok=True)
self._session_path.write_text(json.dumps({"cookies": cookies}))
async def login(self) -> None:
# Wealthfolio 3.2's LoginRequest is `{ password: String }` only — a
# username key is rejected as an unknown field (HTTP 400). The
# `username` constructor arg is kept for a future Wealthfolio
# release that may add multi-user support.
resp = await self._client.post(
_LOGIN_PATH,
json={"password": self._password},
)
if resp.status_code == 401:
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
resp.raise_for_status()
cookies = dict(resp.cookies.items())
if not cookies:
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
self._save_cookies(cookies)
@staticmethod
def _cookie_header(cookies: dict[str, str]) -> str:
return "; ".join(f"{k}={v}" for k, v in cookies.items())
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
cookies = self._load_cookies()
headers = dict(kw.pop("headers", {}) or {})
if cookies:
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
if resp.status_code == 401 and path != _LOGIN_PATH:
await self.login()
cookies = self._load_cookies()
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
return resp
# -- accounts --
async def list_accounts(self) -> list[dict[str, Any]]:
resp = await self._request("GET", _ACCOUNTS_PATH)
resp.raise_for_status()
raw = resp.json()
assert isinstance(raw, list)
return raw
async def ensure_account(self, account: Account) -> str:
"""Idempotently create the account and return Wealthfolio's UUID for it.
Wealthfolio generates its own UUIDs on POST /accounts, ignoring any
`id` we supply. We identify accounts by (provider, providerAccountId)
which Wealthfolio DOES preserve verbatim. Our own Account.id is
used as the providerAccountId.
"""
existing = await self.list_accounts()
for a in existing:
if (
a.get("provider") == account.provider
and a.get("providerAccountId") == account.id
):
wf_id = a.get("id")
assert isinstance(wf_id, str)
return wf_id
# NewAccount is camelCase with required booleans.
# See apps/server/src/models.rs#NewAccount.
resp = await self._request(
"POST",
_ACCOUNTS_PATH,
json={
"name": account.name,
"accountType": str(account.account_type),
"currency": account.currency,
"isDefault": False,
"isActive": True,
"isArchived": False,
"trackingMode": "TRANSACTIONS",
"provider": account.provider,
"providerAccountId": account.id,
},
)
resp.raise_for_status()
created = resp.json()
wf_id = created.get("id")
if not isinstance(wf_id, str):
raise WealthfolioError(
f"POST /accounts returned no id: {created}"
)
return wf_id
# -- activity import --
@staticmethod
def _activity_to_import_row(a: Activity) -> dict[str, Any]:
"""Match Wealthfolio's ActivityImport struct (camelCase JSON)."""
row: dict[str, Any] = {
"date": a.date.isoformat(),
"symbol": a.symbol or "$CASH",
"activityType": str(a.activity_type),
"currency": a.currency,
"accountId": a.account_id,
# Required booleans on ActivityImport (no defaults in Rust struct).
"isDraft": False,
"isValid": True,
}
if a.quantity is not None:
row["quantity"] = format(a.quantity, "f")
if a.unit_price is not None:
row["unitPrice"] = format(a.unit_price, "f")
if a.amount is not None:
row["amount"] = format(a.amount, "f")
if a.fee:
row["fee"] = format(a.fee, "f")
if a.notes:
row["comment"] = a.notes
return row
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
rows = [self._activity_to_import_row(a) for a in activities]
if not rows:
return []
# Step 1 — /import/check hydrates each row with resolved asset_id,
# exchange_mic, quote_ccy, instrument_type, quote_mode (and flags
# errors). The /import endpoint on Wealthfolio 3.2+ DOES NOT
# re-resolve — if we send the un-enriched row the activity is
# silently dropped (import returns 200 OK with activities=[] in
# the payload). We must feed check's output into import.
check = await self._request("POST", _IMPORT_CHECK, json={"activities": rows})
if check.status_code >= 400:
try:
payload = check.json()
except Exception:
payload = {"raw": check.text}
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
checked = check.json()
if not isinstance(checked, list):
raise ImportValidationError(
f"Wealthfolio /import/check returned non-list: {type(checked).__name__}"
)
invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")]
if invalid:
raise ImportValidationError(
f"Wealthfolio /import/check flagged {len(invalid)} row(s); "
f"first: {invalid[0]}"
)
# Drop any row the server marked is_valid=false (shouldn't happen
# without errors, but defensive).
valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")]
real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows})
real.raise_for_status()
raw = real.json()
if isinstance(raw, dict) and "activities" in raw:
got = raw["activities"]
assert isinstance(got, list)
return got
if isinstance(raw, list):
return raw
return []