Context
-------
Two live-integration bugs surfaced during the Phase 0.5 auth-spike
run against the restored production Wealthfolio.
1. Wealthfolio 3.2's LoginRequest schema is `{ password: String }` —
it rejects any request with an unknown `username` field as HTTP
400 (empty body, hard to debug). Upstream source:
https://github.com/afadil/wealthfolio/blob/main/apps/server/src/auth.rs#L86-L88
2. Dockerfile referenced `/opt/poetry/bin/poetry` but pip install
puts poetry on the normal PATH; POETRY_HOME only affects the
self-installer, not `pip install`. Exit 127 in GHA build.
This change
-----------
- WealthfolioSink.login() sends `{password}` only; kept `username`
constructor arg as a stub for the day Wealthfolio adds multi-user.
- Dockerfile drops POETRY_HOME and uses `poetry` on PATH.
- Test: `_login_ok` now asserts body == {"password": "hunter2"}
("hunter2" is the XKCD placeholder — not a real credential).
Test plan
---------
## Automated
- poetry run pytest -q → 70 passed
- poetry run mypy broker_sync tests → Success: no issues found in 29 source files
- poetry run ruff check . → All checks passed!
## Manual Verification (executed live)
```
kubectl -n wealthfolio port-forward svc/wealthfolio 18080:80 &
WF_BASE_URL=http://localhost:18080 WF_USERNAME=admin \
WF_PASSWORD=<from-vault> \
poetry run broker-sync auth-spike
→ "Logged in. 1 account(s) visible."
```
173 lines
5.7 KiB
Python
173 lines
5.7 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import json
|
|
from collections.abc import Iterable
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from broker_sync.models import Account, Activity
|
|
|
|
_LOGIN_PATH = "/api/v1/auth/login"
|
|
_ACCOUNTS_PATH = "/api/v1/accounts"
|
|
_IMPORT_CHECK = "/api/v1/activities/import/check"
|
|
_IMPORT_REAL = "/api/v1/activities/import"
|
|
|
|
|
|
class WealthfolioError(Exception):
|
|
pass
|
|
|
|
|
|
class WealthfolioUnauthorizedError(WealthfolioError):
|
|
"""Raised when login itself fails (bad creds or Wealthfolio down).
|
|
|
|
Distinct from a 401 on a random endpoint — those trigger an
|
|
automatic re-login attempt.
|
|
"""
|
|
|
|
|
|
class ImportValidationError(WealthfolioError):
|
|
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
|
|
|
|
|
|
class WealthfolioSink:
|
|
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
|
|
|
|
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
|
|
to disk so CronJob pods can reuse them across runs (Wealthfolio's
|
|
/auth/login is 5-req/min rate-limited).
|
|
|
|
Not multi-process safe — file locking is added in Phase 1 when we
|
|
fan out to multiple CronJobs.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
base_url: str,
|
|
username: str,
|
|
password: str,
|
|
session_path: Path | str,
|
|
transport: httpx.AsyncBaseTransport | None = None,
|
|
) -> None:
|
|
self._username = username
|
|
self._password = password
|
|
self._session_path = Path(session_path)
|
|
self._client = httpx.AsyncClient(
|
|
base_url=base_url.rstrip("/"),
|
|
timeout=30.0,
|
|
transport=transport,
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
await self._client.aclose()
|
|
|
|
# -- session --
|
|
|
|
def _load_cookies(self) -> dict[str, str]:
|
|
if not self._session_path.exists():
|
|
return {}
|
|
raw = json.loads(self._session_path.read_text())
|
|
got = raw.get("cookies", {})
|
|
assert isinstance(got, dict)
|
|
return got
|
|
|
|
def _save_cookies(self, cookies: dict[str, str]) -> None:
|
|
self._session_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._session_path.write_text(json.dumps({"cookies": cookies}))
|
|
|
|
async def login(self) -> None:
|
|
# Wealthfolio 3.2's LoginRequest is `{ password: String }` only — a
|
|
# username key is rejected as an unknown field (HTTP 400). The
|
|
# `username` constructor arg is kept for a future Wealthfolio
|
|
# release that may add multi-user support.
|
|
resp = await self._client.post(
|
|
_LOGIN_PATH,
|
|
json={"password": self._password},
|
|
)
|
|
if resp.status_code == 401:
|
|
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
|
|
resp.raise_for_status()
|
|
cookies = dict(resp.cookies.items())
|
|
if not cookies:
|
|
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
|
|
self._save_cookies(cookies)
|
|
|
|
@staticmethod
|
|
def _cookie_header(cookies: dict[str, str]) -> str:
|
|
return "; ".join(f"{k}={v}" for k, v in cookies.items())
|
|
|
|
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
|
|
cookies = self._load_cookies()
|
|
headers = dict(kw.pop("headers", {}) or {})
|
|
if cookies:
|
|
headers["Cookie"] = self._cookie_header(cookies)
|
|
resp = await self._client.request(method, path, headers=headers, **kw)
|
|
if resp.status_code == 401 and path != _LOGIN_PATH:
|
|
await self.login()
|
|
cookies = self._load_cookies()
|
|
headers["Cookie"] = self._cookie_header(cookies)
|
|
resp = await self._client.request(method, path, headers=headers, **kw)
|
|
return resp
|
|
|
|
# -- accounts --
|
|
|
|
async def list_accounts(self) -> list[dict[str, Any]]:
|
|
resp = await self._request("GET", _ACCOUNTS_PATH)
|
|
resp.raise_for_status()
|
|
raw = resp.json()
|
|
assert isinstance(raw, list)
|
|
return raw
|
|
|
|
async def ensure_account(self, account: Account) -> None:
|
|
existing = await self.list_accounts()
|
|
if any(a.get("id") == account.id for a in existing):
|
|
return
|
|
resp = await self._request(
|
|
"POST",
|
|
_ACCOUNTS_PATH,
|
|
json={
|
|
"id": account.id,
|
|
"name": account.name,
|
|
"account_type": str(account.account_type),
|
|
"currency": account.currency,
|
|
"provider": account.provider,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
|
|
# -- activity import --
|
|
|
|
@staticmethod
|
|
def _activities_csv(activities: Iterable[Activity]) -> str:
|
|
activities = list(activities)
|
|
if not activities:
|
|
return ""
|
|
rows = [a.to_wealthfolio_csv_row() for a in activities]
|
|
buf = io.StringIO()
|
|
w = csv.DictWriter(buf, fieldnames=list(rows[0].keys()))
|
|
w.writeheader()
|
|
w.writerows(rows)
|
|
return buf.getvalue()
|
|
|
|
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
|
|
csv_text = self._activities_csv(activities)
|
|
files = {"file": ("activities.csv", csv_text, "text/csv")}
|
|
|
|
check = await self._request("POST", _IMPORT_CHECK, files=files)
|
|
if check.status_code >= 400:
|
|
try:
|
|
payload = check.json()
|
|
except Exception:
|
|
payload = {"raw": check.text}
|
|
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
|
|
|
|
# Re-send the same CSV to the real endpoint.
|
|
real = await self._request("POST", _IMPORT_REAL, files=files)
|
|
real.raise_for_status()
|
|
raw = real.json()
|
|
assert isinstance(raw, list)
|
|
return raw
|