diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py new file mode 100644 index 0000000..42bd0b0 --- /dev/null +++ b/broker_sync/sinks/wealthfolio.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import csv +import io +import json +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +import httpx + +from broker_sync.models import Account, Activity + +_LOGIN_PATH = "/api/v1/auth/login" +_ACCOUNTS_PATH = "/api/v1/accounts" +_IMPORT_CHECK = "/api/v1/activities/import/check" +_IMPORT_REAL = "/api/v1/activities/import" + + +class WealthfolioError(Exception): + pass + + +class WealthfolioUnauthorizedError(WealthfolioError): + """Raised when login itself fails (bad creds or Wealthfolio down). + + Distinct from a 401 on a random endpoint — those trigger an + automatic re-login attempt. + """ + + +class ImportValidationError(WealthfolioError): + """`/activities/import/check` returned a non-2xx. We never reach the real import.""" + + +class WealthfolioSink: + """Push canonical Activities to Wealthfolio via its CSV import endpoint. + + Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted + to disk so CronJob pods can reuse them across runs (Wealthfolio's + /auth/login is 5-req/min rate-limited). + + Not multi-process safe — file locking is added in Phase 1 when we + fan out to multiple CronJobs. + """ + + def __init__( + self, + *, + base_url: str, + username: str, + password: str, + session_path: Path | str, + transport: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._username = username + self._password = password + self._session_path = Path(session_path) + self._client = httpx.AsyncClient( + base_url=base_url.rstrip("/"), + timeout=30.0, + transport=transport, + ) + + async def close(self) -> None: + await self._client.aclose() + + # -- session -- + + def _load_cookies(self) -> dict[str, str]: + if not self._session_path.exists(): + return {} + raw = json.loads(self._session_path.read_text()) + got = raw.get("cookies", {}) + assert isinstance(got, dict) + return got + + def _save_cookies(self, cookies: dict[str, str]) -> None: + self._session_path.parent.mkdir(parents=True, exist_ok=True) + self._session_path.write_text(json.dumps({"cookies": cookies})) + + async def login(self) -> None: + resp = await self._client.post( + _LOGIN_PATH, + json={ + "username": self._username, + "password": self._password + }, + ) + if resp.status_code == 401: + raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401") + resp.raise_for_status() + cookies = {k: v for k, v in resp.cookies.items()} + if not cookies: + raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie") + self._save_cookies(cookies) + + @staticmethod + def _cookie_header(cookies: dict[str, str]) -> str: + return "; ".join(f"{k}={v}" for k, v in cookies.items()) + + async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response: + cookies = self._load_cookies() + headers = dict(kw.pop("headers", {}) or {}) + if cookies: + headers["Cookie"] = self._cookie_header(cookies) + resp = await self._client.request(method, path, headers=headers, **kw) + if resp.status_code == 401 and path != _LOGIN_PATH: + await self.login() + cookies = self._load_cookies() + headers["Cookie"] = self._cookie_header(cookies) + resp = await self._client.request(method, path, headers=headers, **kw) + return resp + + # -- accounts -- + + async def list_accounts(self) -> list[dict[str, Any]]: + resp = await self._request("GET", _ACCOUNTS_PATH) + resp.raise_for_status() + raw = resp.json() + assert isinstance(raw, list) + return raw + + async def ensure_account(self, account: Account) -> None: + existing = await self.list_accounts() + if any(a.get("id") == account.id for a in existing): + return + resp = await self._request( + "POST", + _ACCOUNTS_PATH, + json={ + "id": account.id, + "name": account.name, + "account_type": str(account.account_type), + "currency": account.currency, + "provider": account.provider, + }, + ) + resp.raise_for_status() + + # -- activity import -- + + @staticmethod + def _activities_csv(activities: Iterable[Activity]) -> str: + activities = list(activities) + if not activities: + return "" + rows = [a.to_wealthfolio_csv_row() for a in activities] + buf = io.StringIO() + w = csv.DictWriter(buf, fieldnames=list(rows[0].keys())) + w.writeheader() + w.writerows(rows) + return buf.getvalue() + + async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]: + csv_text = self._activities_csv(activities) + files = {"file": ("activities.csv", csv_text, "text/csv")} + + check = await self._request("POST", _IMPORT_CHECK, files=files) + if check.status_code >= 400: + try: + payload = check.json() + except Exception: + payload = {"raw": check.text} + raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}") + + # Re-send the same CSV to the real endpoint. + real = await self._request("POST", _IMPORT_REAL, files=files) + real.raise_for_status() + raw = real.json() + assert isinstance(raw, list) + return raw diff --git a/tests/sinks/__init__.py b/tests/sinks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py new file mode 100644 index 0000000..56f9a52 --- /dev/null +++ b/tests/sinks/test_wealthfolio.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import json +from datetime import UTC, datetime +from decimal import Decimal +from pathlib import Path +from typing import Any + +import httpx +import pytest + +from broker_sync.models import Account, AccountType, Activity, ActivityType +from broker_sync.sinks.wealthfolio import ( + ImportValidationError, + WealthfolioSink, + WealthfolioUnauthorizedError, +) + + +def _buy() -> Activity: + return Activity( + external_id="t212:1", + account_id="t212-isa", + account_type=AccountType.ISA, + date=datetime(2026, 4, 1, 10, 30, tzinfo=UTC), + activity_type=ActivityType.BUY, + symbol="VUAG", + quantity=Decimal("1"), + unit_price=Decimal("100"), + currency="GBP", + ) + + +def _client(handler: httpx.MockTransport, session_path: Path) -> WealthfolioSink: + return WealthfolioSink( + base_url="https://wf.test", + username="viktor", + password="hunter2", + session_path=session_path, + transport=handler, + ) + + +def _login_ok(req: httpx.Request) -> httpx.Response: + assert req.url.path == "/api/v1/auth/login" + body = json.loads(req.content) + assert body == {"username": "viktor", "password": "hunter2"} + return httpx.Response( + 200, + json={"ok": True}, + headers={"set-cookie": "wf_token=abc123; Path=/api; HttpOnly"}, + ) + + +# -- Login -- + + +async def test_login_persists_cookie(tmp_path: Path) -> None: + + async def handler(req: httpx.Request) -> httpx.Response: + return _login_ok(req) + + sp = tmp_path / "session.json" + sink = _client(httpx.MockTransport(handler), sp) + await sink.login() + + data = json.loads(sp.read_text()) + assert "wf_token" in data["cookies"] + assert data["cookies"]["wf_token"] == "abc123" + + +async def test_login_raises_on_401(tmp_path: Path) -> None: + + async def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(401, json={"error": "bad creds"}) + + sink = _client(httpx.MockTransport(handler), tmp_path / "s.json") + with pytest.raises(WealthfolioUnauthorizedError): + await sink.login() + + +# -- Session reuse -- + + +async def test_session_reused_from_disk(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "cached"}})) + + calls: list[str] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + calls.append(req.url.path) + assert "wf_token=cached" in req.headers.get("cookie", "") + return httpx.Response(200, json=[]) + + sink = _client(httpx.MockTransport(handler), sp) + await sink.list_accounts() + assert calls == ["/api/v1/accounts"] + + +async def test_401_triggers_single_reauth_and_retry(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "stale"}})) + + path_calls: list[tuple[str, str]] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + path_calls.append((req.method, req.url.path)) + if req.url.path == "/api/v1/accounts" and req.headers.get("cookie", "").endswith("stale"): + return httpx.Response(401) + if req.url.path == "/api/v1/auth/login": + return _login_ok(req) + # After login the stored cookie is fresh; second GET should succeed. + return httpx.Response(200, json=[{"id": "a1"}]) + + sink = _client(httpx.MockTransport(handler), sp) + out = await sink.list_accounts() + assert out == [{"id": "a1"}] + assert path_calls == [ + ("GET", "/api/v1/accounts"), + ("POST", "/api/v1/auth/login"), + ("GET", "/api/v1/accounts"), + ] + + +# -- Account ensure -- + + +async def test_ensure_account_no_op_if_exists(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + posts: list[dict[str, Any]] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + if req.method == "GET" and req.url.path == "/api/v1/accounts": + return httpx.Response( + 200, + json=[{ + "id": "t212-isa", + "name": "Trading212 ISA" + }], + ) + if req.method == "POST": + posts.append(json.loads(req.content)) + return httpx.Response(500) + + sink = _client(httpx.MockTransport(handler), sp) + acc = Account( + id="t212-isa", + name="Trading212 ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="trading212", + ) + await sink.ensure_account(acc) + assert posts == [] # no create + + +async def test_ensure_account_creates_if_missing(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + posted: list[dict[str, Any]] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + if req.method == "GET" and req.url.path == "/api/v1/accounts": + return httpx.Response(200, json=[]) + if req.method == "POST" and req.url.path == "/api/v1/accounts": + posted.append(json.loads(req.content)) + return httpx.Response(200, json={"id": "t212-isa"}) + return httpx.Response(500) + + sink = _client(httpx.MockTransport(handler), sp) + acc = Account( + id="t212-isa", + name="Trading212 ISA", + account_type=AccountType.ISA, + currency="GBP", + provider="trading212", + ) + await sink.ensure_account(acc) + assert len(posted) == 1 + assert posted[0]["id"] == "t212-isa" + assert posted[0]["account_type"] == "ISA" + assert posted[0]["currency"] == "GBP" + + +# -- Activity import -- + + +async def test_import_dry_run_then_real(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + calls: list[str] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + calls.append(req.url.path) + if req.url.path == "/api/v1/activities/import/check": + return httpx.Response(200, json={"ok": True, "rows": 1}) + if req.url.path == "/api/v1/activities/import": + return httpx.Response( + 200, + json=[{ + "id": "wf-1", + "external_id": "t212:1" + }], + ) + return httpx.Response(500) + + sink = _client(httpx.MockTransport(handler), sp) + out = await sink.import_activities([_buy()]) + assert calls == [ + "/api/v1/activities/import/check", + "/api/v1/activities/import", + ] + assert out == [{"id": "wf-1", "external_id": "t212:1"}] + + +async def test_import_halts_on_validation_failure(tmp_path: Path) -> None: + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + calls: list[str] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + calls.append(req.url.path) + if req.url.path == "/api/v1/activities/import/check": + return httpx.Response( + 400, + json={"errors": ["row 1: unknown symbol"]}, + ) + return httpx.Response(500) + + sink = _client(httpx.MockTransport(handler), sp) + with pytest.raises(ImportValidationError, match="unknown symbol"): + await sink.import_activities([_buy()]) + assert calls == ["/api/v1/activities/import/check"] # real import never hit