Add WealthfolioSink with CSV import + cookie reuse
Context ------- This is the Phase 0.5 deliverable — the hardest-to-validate unknown in the plan. Wealthfolio auth is JWT HttpOnly cookie with a 5-req/min login rate limit. CronJob pods are ephemeral, so we persist cookies to disk between runs (shared PVC in production). Plan stress-test also flagged: use the CSV import path, not per-row JSON POST. Wealthfolio's UI uses /activities/import and its dedup logic is battle-tested; CSVs double as audit artefacts we can replay. This change ----------- - WealthfolioSink (httpx async): login with username/password, persists cookie dict to session_path on disk, attaches it as a Cookie header on subsequent calls. - 401 on a non-login endpoint triggers a single re-login + retry. - ensure_account() is idempotent — GETs the account list first, only POSTs /accounts if id is missing. - import_activities() always runs /activities/import/check first; any non-2xx there raises ImportValidationError and we never touch the real import endpoint. Protects against half-written state when the broker emits a symbol Wealthfolio doesn't know. - httpx.MockTransport-based tests cover: login persistence, 401 on login raises UnauthorizedError, session reuse from disk, 401 retry path, ensure_account idempotency + creation, import dry-run-then-real sequencing, halt on check failure. Not yet covered (deferred): - Multi-process file lock on session_path (single-process enough for now; Phase 1 adds it when multiple CronJobs run concurrently). - 429 jittered backoff (TBD when Wealthfolio actually rate-limits us). Test plan --------- ## Automated - poetry run pytest -q → 31 passed - poetry run mypy broker_sync tests → Success: no issues found in 17 source files - poetry run ruff check . → All checks passed! ## Manual Verification Live auth spike against https://wealthfolio.viktorbarzin.me deferred until the password is seeded into Vault at secret/broker-sync/wealthfolio in a follow-up commit (needs Viktor's Vault session).
This commit is contained in:
parent
f306dc9605
commit
e7da408a85
3 changed files with 411 additions and 0 deletions
172
broker_sync/sinks/wealthfolio.py
Normal file
172
broker_sync/sinks/wealthfolio.py
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import csv
|
||||
import io
|
||||
import json
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from broker_sync.models import Account, Activity
|
||||
|
||||
_LOGIN_PATH = "/api/v1/auth/login"
|
||||
_ACCOUNTS_PATH = "/api/v1/accounts"
|
||||
_IMPORT_CHECK = "/api/v1/activities/import/check"
|
||||
_IMPORT_REAL = "/api/v1/activities/import"
|
||||
|
||||
|
||||
class WealthfolioError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class WealthfolioUnauthorizedError(WealthfolioError):
|
||||
"""Raised when login itself fails (bad creds or Wealthfolio down).
|
||||
|
||||
Distinct from a 401 on a random endpoint — those trigger an
|
||||
automatic re-login attempt.
|
||||
"""
|
||||
|
||||
|
||||
class ImportValidationError(WealthfolioError):
|
||||
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
|
||||
|
||||
|
||||
class WealthfolioSink:
|
||||
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
|
||||
|
||||
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
|
||||
to disk so CronJob pods can reuse them across runs (Wealthfolio's
|
||||
/auth/login is 5-req/min rate-limited).
|
||||
|
||||
Not multi-process safe — file locking is added in Phase 1 when we
|
||||
fan out to multiple CronJobs.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
base_url: str,
|
||||
username: str,
|
||||
password: str,
|
||||
session_path: Path | str,
|
||||
transport: httpx.AsyncBaseTransport | None = None,
|
||||
) -> None:
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._session_path = Path(session_path)
|
||||
self._client = httpx.AsyncClient(
|
||||
base_url=base_url.rstrip("/"),
|
||||
timeout=30.0,
|
||||
transport=transport,
|
||||
)
|
||||
|
||||
async def close(self) -> None:
|
||||
await self._client.aclose()
|
||||
|
||||
# -- session --
|
||||
|
||||
def _load_cookies(self) -> dict[str, str]:
|
||||
if not self._session_path.exists():
|
||||
return {}
|
||||
raw = json.loads(self._session_path.read_text())
|
||||
got = raw.get("cookies", {})
|
||||
assert isinstance(got, dict)
|
||||
return got
|
||||
|
||||
def _save_cookies(self, cookies: dict[str, str]) -> None:
|
||||
self._session_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
self._session_path.write_text(json.dumps({"cookies": cookies}))
|
||||
|
||||
async def login(self) -> None:
|
||||
resp = await self._client.post(
|
||||
_LOGIN_PATH,
|
||||
json={
|
||||
"username": self._username,
|
||||
"password": self._password
|
||||
},
|
||||
)
|
||||
if resp.status_code == 401:
|
||||
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
|
||||
resp.raise_for_status()
|
||||
cookies = {k: v for k, v in resp.cookies.items()}
|
||||
if not cookies:
|
||||
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
|
||||
self._save_cookies(cookies)
|
||||
|
||||
@staticmethod
|
||||
def _cookie_header(cookies: dict[str, str]) -> str:
|
||||
return "; ".join(f"{k}={v}" for k, v in cookies.items())
|
||||
|
||||
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
|
||||
cookies = self._load_cookies()
|
||||
headers = dict(kw.pop("headers", {}) or {})
|
||||
if cookies:
|
||||
headers["Cookie"] = self._cookie_header(cookies)
|
||||
resp = await self._client.request(method, path, headers=headers, **kw)
|
||||
if resp.status_code == 401 and path != _LOGIN_PATH:
|
||||
await self.login()
|
||||
cookies = self._load_cookies()
|
||||
headers["Cookie"] = self._cookie_header(cookies)
|
||||
resp = await self._client.request(method, path, headers=headers, **kw)
|
||||
return resp
|
||||
|
||||
# -- accounts --
|
||||
|
||||
async def list_accounts(self) -> list[dict[str, Any]]:
|
||||
resp = await self._request("GET", _ACCOUNTS_PATH)
|
||||
resp.raise_for_status()
|
||||
raw = resp.json()
|
||||
assert isinstance(raw, list)
|
||||
return raw
|
||||
|
||||
async def ensure_account(self, account: Account) -> None:
|
||||
existing = await self.list_accounts()
|
||||
if any(a.get("id") == account.id for a in existing):
|
||||
return
|
||||
resp = await self._request(
|
||||
"POST",
|
||||
_ACCOUNTS_PATH,
|
||||
json={
|
||||
"id": account.id,
|
||||
"name": account.name,
|
||||
"account_type": str(account.account_type),
|
||||
"currency": account.currency,
|
||||
"provider": account.provider,
|
||||
},
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
# -- activity import --
|
||||
|
||||
@staticmethod
|
||||
def _activities_csv(activities: Iterable[Activity]) -> str:
|
||||
activities = list(activities)
|
||||
if not activities:
|
||||
return ""
|
||||
rows = [a.to_wealthfolio_csv_row() for a in activities]
|
||||
buf = io.StringIO()
|
||||
w = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
|
||||
w.writeheader()
|
||||
w.writerows(rows)
|
||||
return buf.getvalue()
|
||||
|
||||
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
|
||||
csv_text = self._activities_csv(activities)
|
||||
files = {"file": ("activities.csv", csv_text, "text/csv")}
|
||||
|
||||
check = await self._request("POST", _IMPORT_CHECK, files=files)
|
||||
if check.status_code >= 400:
|
||||
try:
|
||||
payload = check.json()
|
||||
except Exception:
|
||||
payload = {"raw": check.text}
|
||||
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
|
||||
|
||||
# Re-send the same CSV to the real endpoint.
|
||||
real = await self._request("POST", _IMPORT_REAL, files=files)
|
||||
real.raise_for_status()
|
||||
raw = real.json()
|
||||
assert isinstance(raw, list)
|
||||
return raw
|
||||
0
tests/sinks/__init__.py
Normal file
0
tests/sinks/__init__.py
Normal file
239
tests/sinks/test_wealthfolio.py
Normal file
239
tests/sinks/test_wealthfolio.py
Normal file
|
|
@ -0,0 +1,239 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from broker_sync.models import Account, AccountType, Activity, ActivityType
|
||||
from broker_sync.sinks.wealthfolio import (
|
||||
ImportValidationError,
|
||||
WealthfolioSink,
|
||||
WealthfolioUnauthorizedError,
|
||||
)
|
||||
|
||||
|
||||
def _buy() -> Activity:
|
||||
return Activity(
|
||||
external_id="t212:1",
|
||||
account_id="t212-isa",
|
||||
account_type=AccountType.ISA,
|
||||
date=datetime(2026, 4, 1, 10, 30, tzinfo=UTC),
|
||||
activity_type=ActivityType.BUY,
|
||||
symbol="VUAG",
|
||||
quantity=Decimal("1"),
|
||||
unit_price=Decimal("100"),
|
||||
currency="GBP",
|
||||
)
|
||||
|
||||
|
||||
def _client(handler: httpx.MockTransport, session_path: Path) -> WealthfolioSink:
|
||||
return WealthfolioSink(
|
||||
base_url="https://wf.test",
|
||||
username="viktor",
|
||||
password="hunter2",
|
||||
session_path=session_path,
|
||||
transport=handler,
|
||||
)
|
||||
|
||||
|
||||
def _login_ok(req: httpx.Request) -> httpx.Response:
|
||||
assert req.url.path == "/api/v1/auth/login"
|
||||
body = json.loads(req.content)
|
||||
assert body == {"username": "viktor", "password": "hunter2"}
|
||||
return httpx.Response(
|
||||
200,
|
||||
json={"ok": True},
|
||||
headers={"set-cookie": "wf_token=abc123; Path=/api; HttpOnly"},
|
||||
)
|
||||
|
||||
|
||||
# -- Login --
|
||||
|
||||
|
||||
async def test_login_persists_cookie(tmp_path: Path) -> None:
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
return _login_ok(req)
|
||||
|
||||
sp = tmp_path / "session.json"
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
await sink.login()
|
||||
|
||||
data = json.loads(sp.read_text())
|
||||
assert "wf_token" in data["cookies"]
|
||||
assert data["cookies"]["wf_token"] == "abc123"
|
||||
|
||||
|
||||
async def test_login_raises_on_401(tmp_path: Path) -> None:
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
return httpx.Response(401, json={"error": "bad creds"})
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), tmp_path / "s.json")
|
||||
with pytest.raises(WealthfolioUnauthorizedError):
|
||||
await sink.login()
|
||||
|
||||
|
||||
# -- Session reuse --
|
||||
|
||||
|
||||
async def test_session_reused_from_disk(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "cached"}}))
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
calls.append(req.url.path)
|
||||
assert "wf_token=cached" in req.headers.get("cookie", "")
|
||||
return httpx.Response(200, json=[])
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
await sink.list_accounts()
|
||||
assert calls == ["/api/v1/accounts"]
|
||||
|
||||
|
||||
async def test_401_triggers_single_reauth_and_retry(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "stale"}}))
|
||||
|
||||
path_calls: list[tuple[str, str]] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
path_calls.append((req.method, req.url.path))
|
||||
if req.url.path == "/api/v1/accounts" and req.headers.get("cookie", "").endswith("stale"):
|
||||
return httpx.Response(401)
|
||||
if req.url.path == "/api/v1/auth/login":
|
||||
return _login_ok(req)
|
||||
# After login the stored cookie is fresh; second GET should succeed.
|
||||
return httpx.Response(200, json=[{"id": "a1"}])
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
out = await sink.list_accounts()
|
||||
assert out == [{"id": "a1"}]
|
||||
assert path_calls == [
|
||||
("GET", "/api/v1/accounts"),
|
||||
("POST", "/api/v1/auth/login"),
|
||||
("GET", "/api/v1/accounts"),
|
||||
]
|
||||
|
||||
|
||||
# -- Account ensure --
|
||||
|
||||
|
||||
async def test_ensure_account_no_op_if_exists(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||
|
||||
posts: list[dict[str, Any]] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
if req.method == "GET" and req.url.path == "/api/v1/accounts":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[{
|
||||
"id": "t212-isa",
|
||||
"name": "Trading212 ISA"
|
||||
}],
|
||||
)
|
||||
if req.method == "POST":
|
||||
posts.append(json.loads(req.content))
|
||||
return httpx.Response(500)
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
acc = Account(
|
||||
id="t212-isa",
|
||||
name="Trading212 ISA",
|
||||
account_type=AccountType.ISA,
|
||||
currency="GBP",
|
||||
provider="trading212",
|
||||
)
|
||||
await sink.ensure_account(acc)
|
||||
assert posts == [] # no create
|
||||
|
||||
|
||||
async def test_ensure_account_creates_if_missing(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||
|
||||
posted: list[dict[str, Any]] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
if req.method == "GET" and req.url.path == "/api/v1/accounts":
|
||||
return httpx.Response(200, json=[])
|
||||
if req.method == "POST" and req.url.path == "/api/v1/accounts":
|
||||
posted.append(json.loads(req.content))
|
||||
return httpx.Response(200, json={"id": "t212-isa"})
|
||||
return httpx.Response(500)
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
acc = Account(
|
||||
id="t212-isa",
|
||||
name="Trading212 ISA",
|
||||
account_type=AccountType.ISA,
|
||||
currency="GBP",
|
||||
provider="trading212",
|
||||
)
|
||||
await sink.ensure_account(acc)
|
||||
assert len(posted) == 1
|
||||
assert posted[0]["id"] == "t212-isa"
|
||||
assert posted[0]["account_type"] == "ISA"
|
||||
assert posted[0]["currency"] == "GBP"
|
||||
|
||||
|
||||
# -- Activity import --
|
||||
|
||||
|
||||
async def test_import_dry_run_then_real(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
calls.append(req.url.path)
|
||||
if req.url.path == "/api/v1/activities/import/check":
|
||||
return httpx.Response(200, json={"ok": True, "rows": 1})
|
||||
if req.url.path == "/api/v1/activities/import":
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[{
|
||||
"id": "wf-1",
|
||||
"external_id": "t212:1"
|
||||
}],
|
||||
)
|
||||
return httpx.Response(500)
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
out = await sink.import_activities([_buy()])
|
||||
assert calls == [
|
||||
"/api/v1/activities/import/check",
|
||||
"/api/v1/activities/import",
|
||||
]
|
||||
assert out == [{"id": "wf-1", "external_id": "t212:1"}]
|
||||
|
||||
|
||||
async def test_import_halts_on_validation_failure(tmp_path: Path) -> None:
|
||||
sp = tmp_path / "s.json"
|
||||
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
|
||||
|
||||
calls: list[str] = []
|
||||
|
||||
async def handler(req: httpx.Request) -> httpx.Response:
|
||||
calls.append(req.url.path)
|
||||
if req.url.path == "/api/v1/activities/import/check":
|
||||
return httpx.Response(
|
||||
400,
|
||||
json={"errors": ["row 1: unknown symbol"]},
|
||||
)
|
||||
return httpx.Response(500)
|
||||
|
||||
sink = _client(httpx.MockTransport(handler), sp)
|
||||
with pytest.raises(ImportValidationError, match="unknown symbol"):
|
||||
await sink.import_activities([_buy()])
|
||||
assert calls == ["/api/v1/activities/import/check"] # real import never hit
|
||||
Loading…
Add table
Add a link
Reference in a new issue