broker-sync/broker_sync/sinks/wealthfolio.py
Viktor Barzin e7da408a85 Add WealthfolioSink with CSV import + cookie reuse
Context
-------
This is the Phase 0.5 deliverable — the hardest-to-validate unknown
in the plan. Wealthfolio auth is JWT HttpOnly cookie with a 5-req/min
login rate limit. CronJob pods are ephemeral, so we persist cookies
to disk between runs (shared PVC in production).

Plan stress-test also flagged: use the CSV import path, not per-row
JSON POST. Wealthfolio's UI uses /activities/import and its dedup
logic is battle-tested; CSVs double as audit artefacts we can replay.

This change
-----------
- WealthfolioSink (httpx async): login with username/password, persists
  cookie dict to session_path on disk, attaches it as a Cookie header
  on subsequent calls.
- 401 on a non-login endpoint triggers a single re-login + retry.
- ensure_account() is idempotent — GETs the account list first, only
  POSTs /accounts if id is missing.
- import_activities() always runs /activities/import/check first; any
  non-2xx there raises ImportValidationError and we never touch the
  real import endpoint. Protects against half-written state when the
  broker emits a symbol Wealthfolio doesn't know.
- httpx.MockTransport-based tests cover: login persistence, 401 on
  login raises UnauthorizedError, session reuse from disk, 401 retry
  path, ensure_account idempotency + creation, import dry-run-then-real
  sequencing, halt on check failure.

Not yet covered (deferred):
- Multi-process file lock on session_path (single-process enough for
  now; Phase 1 adds it when multiple CronJobs run concurrently).
- 429 jittered backoff (TBD when Wealthfolio actually rate-limits us).

Test plan
---------
## Automated
- poetry run pytest -q  →  31 passed
- poetry run mypy broker_sync tests  →  Success: no issues found in 17 source files
- poetry run ruff check .  →  All checks passed!

## Manual Verification
Live auth spike against https://wealthfolio.viktorbarzin.me deferred
until the password is seeded into Vault at secret/broker-sync/wealthfolio
in a follow-up commit (needs Viktor's Vault session).
2026-04-17 19:22:34 +00:00

172 lines
5.5 KiB
Python

from __future__ import annotations
import csv
import io
import json
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import httpx
from broker_sync.models import Account, Activity
_LOGIN_PATH = "/api/v1/auth/login"
_ACCOUNTS_PATH = "/api/v1/accounts"
_IMPORT_CHECK = "/api/v1/activities/import/check"
_IMPORT_REAL = "/api/v1/activities/import"
class WealthfolioError(Exception):
pass
class WealthfolioUnauthorizedError(WealthfolioError):
"""Raised when login itself fails (bad creds or Wealthfolio down).
Distinct from a 401 on a random endpoint — those trigger an
automatic re-login attempt.
"""
class ImportValidationError(WealthfolioError):
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
class WealthfolioSink:
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
to disk so CronJob pods can reuse them across runs (Wealthfolio's
/auth/login is 5-req/min rate-limited).
Not multi-process safe — file locking is added in Phase 1 when we
fan out to multiple CronJobs.
"""
def __init__(
self,
*,
base_url: str,
username: str,
password: str,
session_path: Path | str,
transport: httpx.AsyncBaseTransport | None = None,
) -> None:
self._username = username
self._password = password
self._session_path = Path(session_path)
self._client = httpx.AsyncClient(
base_url=base_url.rstrip("/"),
timeout=30.0,
transport=transport,
)
async def close(self) -> None:
await self._client.aclose()
# -- session --
def _load_cookies(self) -> dict[str, str]:
if not self._session_path.exists():
return {}
raw = json.loads(self._session_path.read_text())
got = raw.get("cookies", {})
assert isinstance(got, dict)
return got
def _save_cookies(self, cookies: dict[str, str]) -> None:
self._session_path.parent.mkdir(parents=True, exist_ok=True)
self._session_path.write_text(json.dumps({"cookies": cookies}))
async def login(self) -> None:
resp = await self._client.post(
_LOGIN_PATH,
json={
"username": self._username,
"password": self._password
},
)
if resp.status_code == 401:
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
resp.raise_for_status()
cookies = {k: v for k, v in resp.cookies.items()}
if not cookies:
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
self._save_cookies(cookies)
@staticmethod
def _cookie_header(cookies: dict[str, str]) -> str:
return "; ".join(f"{k}={v}" for k, v in cookies.items())
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
cookies = self._load_cookies()
headers = dict(kw.pop("headers", {}) or {})
if cookies:
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
if resp.status_code == 401 and path != _LOGIN_PATH:
await self.login()
cookies = self._load_cookies()
headers["Cookie"] = self._cookie_header(cookies)
resp = await self._client.request(method, path, headers=headers, **kw)
return resp
# -- accounts --
async def list_accounts(self) -> list[dict[str, Any]]:
resp = await self._request("GET", _ACCOUNTS_PATH)
resp.raise_for_status()
raw = resp.json()
assert isinstance(raw, list)
return raw
async def ensure_account(self, account: Account) -> None:
existing = await self.list_accounts()
if any(a.get("id") == account.id for a in existing):
return
resp = await self._request(
"POST",
_ACCOUNTS_PATH,
json={
"id": account.id,
"name": account.name,
"account_type": str(account.account_type),
"currency": account.currency,
"provider": account.provider,
},
)
resp.raise_for_status()
# -- activity import --
@staticmethod
def _activities_csv(activities: Iterable[Activity]) -> str:
activities = list(activities)
if not activities:
return ""
rows = [a.to_wealthfolio_csv_row() for a in activities]
buf = io.StringIO()
w = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
w.writeheader()
w.writerows(rows)
return buf.getvalue()
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
csv_text = self._activities_csv(activities)
files = {"file": ("activities.csv", csv_text, "text/csv")}
check = await self._request("POST", _IMPORT_CHECK, files=files)
if check.status_code >= 400:
try:
payload = check.json()
except Exception:
payload = {"raw": check.text}
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
# Re-send the same CSV to the real endpoint.
real = await self._request("POST", _IMPORT_REAL, files=files)
real.raise_for_status()
raw = real.json()
assert isinstance(raw, list)
return raw