broker-sync/broker_sync/sinks/wealthfolio.py
Viktor Barzin ea881e272b sinks: match Wealthfolio NewAccount camelCase schema + required booleans
Wealthfolio 3.2's POST /api/v1/accounts was 422ing on live traffic — its
NewAccount struct uses camelCase field names and requires isDefault +
isActive as booleans. Reference:
https://github.com/afadil/wealthfolio/blob/main/apps/server/src/models.rs#L~145

Sends trackingMode=TRANSACTIONS so Wealthfolio computes holdings from
our imported activities (vs HOLDINGS mode which requires periodic
holdings snapshots). Populates providerAccountId so the broker account
is traceable back to our sync's id scheme.

Test plan:
  poetry run pytest -q   →  70 passed
  poetry run mypy        →  clean
  poetry run ruff check  →  clean

Live re-run of the backfill Job follows this commit's image rebuild.
2026-04-17 20:29:43 +00:00

180 lines
6 KiB
Python

from __future__ import annotations
import csv
import io
import json
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import httpx
from broker_sync.models import Account, Activity
_LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import"
class WealthfolioError(Exception):
pass
class WealthfolioUnauthorizedError(WealthfolioError):
"""Raised when login itself fails (bad creds or Wealthfolio down).
Distinct from a 401 on a random endpoint — those trigger an
automatic re-login attempt.
"""
class ImportValidationError(WealthfolioError):
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
class WealthfolioSink:
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
to disk so CronJob pods can reuse them across runs (Wealthfolio's
/auth/login is 5-req/min rate-limited).
Not multi-process safe — file locking is added in Phase 1 when we
fan out to multiple CronJobs.
"""
def __init__(
self,
*,
base_url: str,
username: str,
password: str,
session_path: Path | str,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._username = username
self._password = password
self._session_path = Path(session_path)
self._client = httpx.AsyncClient(
base_url=base_url.rstrip("/"),
timeout=30.0,
transport=transport,
)
async def close(self) -> None:
await self._client.aclose()
# -- session --
def _load_cookies(self) -> dict[str, str]:
if not self._session_path.exists():
return {}
raw = json.loads(self._session_path.read_text())
got = raw.get("cookies", {})
assert isinstance(got, dict)
return got
def _save_cookies(self, cookies: dict[str, str]) -> None:
self._session_path.parent.mkdir(parents=True, exist_ok=True)
self._session_path.write_text(json.dumps({"cookies": cookies}))
async def login(self) -> None:
# Wealthfolio 3.2's LoginRequest is `{ password: String }` only — a
# username key is rejected as an unknown field (HTTP 400). The
# `username` constructor arg is kept for a future Wealthfolio
# release that may add multi-user support.
resp = await self._client.post(
_LOGIN_PATH,
json={"password": self._password},
)
if resp.status_code == 401:
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
resp.raise_for_status()
cookies = dict(resp.cookies.items())
if not cookies:
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
self._save_cookies(cookies)
@staticmethod
def _cookie_header(cookies: dict[str, str]) -> str:
return "; ".join(f"{k}={v}" for k, v in cookies.items())
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
cookies = self._load_cookies()
headers = dict(kw.pop("headers", {}) or {})
if cookies:
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
if resp.status_code == 401 and path != _LOGIN_PATH:
await self.login()
cookies = self._load_cookies()
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
return resp
# -- accounts --
async def list_accounts(self) -> list[dict[str, Any]]:
resp = await self._request("GET", _ACCOUNTS_PATH)
resp.raise_for_status()
raw = resp.json()
assert isinstance(raw, list)
return raw
async def ensure_account(self, account: Account) -> None:
existing = await self.list_accounts()
if any(a.get("id") == account.id for a in existing):
return
# Wealthfolio 3.2's NewAccount is camelCase with required booleans.
# See apps/server/src/models.rs#NewAccount.
resp = await self._request(
"POST",
_ACCOUNTS_PATH,
json={
"id": account.id,
"name": account.name,
"accountType": str(account.account_type),
"currency": account.currency,
"isDefault": False,
"isActive": True,
"isArchived": False,
"trackingMode": "TRANSACTIONS",
"provider": account.provider,
"providerAccountId": account.id,
},
)
resp.raise_for_status()
# -- activity import --
@staticmethod
def _activities_csv(activities: Iterable[Activity]) -> str:
activities = list(activities)
if not activities:
return ""
rows = [a.to_wealthfolio_csv_row() for a in activities]
buf = io.StringIO()
w = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
return buf.getvalue()
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
csv_text = self._activities_csv(activities)
files = {"file": ("activities.csv", csv_text, "text/csv")}
check = await self._request("POST", _IMPORT_CHECK, files=files)
if check.status_code >= 400:
try:
payload = check.json()
except Exception:
payload = {"raw": check.text}
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
# Re-send the same CSV to the real endpoint.
real = await self._request("POST", _IMPORT_REAL, files=files)
real.raise_for_status()
raw = real.json()
assert isinstance(raw, list)
return raw