broker-sync/broker_sync/sinks/wealthfolio.py
Viktor Barzin 80ca009373 Match Wealthfolio accounts by providerAccountId, remap accountId on import
Context: Wealthfolio 3.2 generates its own UUIDs on POST /accounts, ignoring any
`id` we supply. Our logical Account.id lives on as `providerAccountId`, which
WF preserves verbatim.

Live run created six duplicate accounts because ensure_account looked up by
our `id`, never found it, and POSTed a new account on every attempt. Deleted
the duplicates manually via DELETE /accounts/{id}.

This change:
- ensure_account now returns Wealthfolio's UUID; matches existing via
  (provider, providerAccountId)
- pipeline remaps activity.account_id to the WF UUID at submission time
  but keeps dedup keyed on our stable id (WF resets must not blow away
  the whole dedup history)
- test updates to the new account-shape + dedup key expectations

poetry run pytest -q    70 passed
poetry run mypy         clean
poetry run ruff check   clean
2026-04-17 20:44:32 +00:00

230 lines
8 KiB
Python

from __future__ import annotations
import json
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import httpx
from broker_sync.models import Account, Activity
_LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import"
class WealthfolioError(Exception):
pass
class WealthfolioUnauthorizedError(WealthfolioError):
"""Raised when login itself fails (bad creds or Wealthfolio down).
Distinct from a 401 on a random endpoint — those trigger an
automatic re-login attempt.
"""
class ImportValidationError(WealthfolioError):
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
class WealthfolioSink:
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
to disk so CronJob pods can reuse them across runs (Wealthfolio's
/auth/login is 5-req/min rate-limited).
Not multi-process safe — file locking is added in Phase 1 when we
fan out to multiple CronJobs.
"""
def __init__(
self,
*,
base_url: str,
username: str,
password: str,
session_path: Path | str,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._username = username
self._password = password
self._session_path = Path(session_path)
self._client = httpx.AsyncClient(
base_url=base_url.rstrip("/"),
timeout=30.0,
transport=transport,
)
async def close(self) -> None:
await self._client.aclose()
# -- session --
def _load_cookies(self) -> dict[str, str]:
if not self._session_path.exists():
return {}
raw = json.loads(self._session_path.read_text())
got = raw.get("cookies", {})
assert isinstance(got, dict)
return got
def _save_cookies(self, cookies: dict[str, str]) -> None:
self._session_path.parent.mkdir(parents=True, exist_ok=True)
self._session_path.write_text(json.dumps({"cookies": cookies}))
async def login(self) -> None:
# Wealthfolio 3.2's LoginRequest is `{ password: String }` only — a
# username key is rejected as an unknown field (HTTP 400). The
# `username` constructor arg is kept for a future Wealthfolio
# release that may add multi-user support.
resp = await self._client.post(
_LOGIN_PATH,
json={"password": self._password},
)
if resp.status_code == 401:
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
resp.raise_for_status()
cookies = dict(resp.cookies.items())
if not cookies:
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
self._save_cookies(cookies)
@staticmethod
def _cookie_header(cookies: dict[str, str]) -> str:
return "; ".join(f"{k}={v}" for k, v in cookies.items())
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
cookies = self._load_cookies()
headers = dict(kw.pop("headers", {}) or {})
if cookies:
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
if resp.status_code == 401 and path != _LOGIN_PATH:
await self.login()
cookies = self._load_cookies()
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
return resp
# -- accounts --
async def list_accounts(self) -> list[dict[str, Any]]:
resp = await self._request("GET", _ACCOUNTS_PATH)
resp.raise_for_status()
raw = resp.json()
assert isinstance(raw, list)
return raw
async def ensure_account(self, account: Account) -> str:
"""Idempotently create the account and return Wealthfolio's UUID for it.
Wealthfolio generates its own UUIDs on POST /accounts, ignoring any
`id` we supply. We identify accounts by (provider, providerAccountId)
which Wealthfolio DOES preserve verbatim. Our own Account.id is
used as the providerAccountId.
"""
existing = await self.list_accounts()
for a in existing:
if (
a.get("provider") == account.provider
and a.get("providerAccountId") == account.id
):
wf_id = a.get("id")
assert isinstance(wf_id, str)
return wf_id
# NewAccount is camelCase with required booleans.
# See apps/server/src/models.rs#NewAccount.
resp = await self._request(
"POST",
_ACCOUNTS_PATH,
json={
"name": account.name,
"accountType": str(account.account_type),
"currency": account.currency,
"isDefault": False,
"isActive": True,
"isArchived": False,
"trackingMode": "TRANSACTIONS",
"provider": account.provider,
"providerAccountId": account.id,
},
)
resp.raise_for_status()
created = resp.json()
wf_id = created.get("id")
if not isinstance(wf_id, str):
raise WealthfolioError(
f"POST /accounts returned no id: {created}"
)
return wf_id
# -- activity import --
@staticmethod
def _activity_to_import_row(a: Activity) -> dict[str, Any]:
"""Match Wealthfolio's ActivityImport struct (camelCase JSON)."""
row: dict[str, Any] = {
"date": a.date.isoformat(),
"symbol": a.symbol or "$CASH",
"activityType": str(a.activity_type),
"currency": a.currency,
"accountId": a.account_id,
# Required booleans on ActivityImport (no defaults in Rust struct).
"isDraft": False,
"isValid": True,
}
if a.quantity is not None:
row["quantity"] = format(a.quantity, "f")
if a.unit_price is not None:
row["unitPrice"] = format(a.unit_price, "f")
if a.amount is not None:
row["amount"] = format(a.amount, "f")
if a.fee:
row["fee"] = format(a.fee, "f")
if a.notes:
row["comment"] = a.notes
return row
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
rows = [self._activity_to_import_row(a) for a in activities]
if not rows:
return []
body = {"activities": rows}
check = await self._request("POST", _IMPORT_CHECK, json=body)
if check.status_code >= 400:
try:
payload = check.json()
except Exception:
payload = {"raw": check.text}
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
# Dry-run may flag per-row errors inside a 200 response.
checked = check.json()
if isinstance(checked, list):
bad = [r for r in checked if isinstance(r, dict) and r.get("errors")]
if bad:
# Show the first few to aid debugging without leaking everything.
raise ImportValidationError(
f"Wealthfolio /import/check flagged {len(bad)} rows; "
f"first: {bad[0]}"
)
real = await self._request("POST", _IMPORT_REAL, json=body)
real.raise_for_status()
raw = real.json()
# import_activities returns ImportActivitiesResult which is an object,
# but we also accept a list (older versions). Normalise.
if isinstance(raw, dict) and "activities" in raw:
got = raw["activities"]
assert isinstance(got, list)
return got
if isinstance(raw, list):
return raw
return []