Per-fund snapshot import landed quantities but dropped cost basis + needed a separate quote-push path we never identified. Snapshotting also collided with WF's own TOTAL aggregation and ZEROED the Fidelity cash balance. Simpler plan: each monthly scrape emits a single DEPOSIT (or WITHDRAWAL on a market drop) sized to the delta between the live PlanViewer pot value and Wealthfolio's running total. dav_corrected PG view continues to subtract these offsets from net_contribution so the dashboard Growth/ROI math stays right. - New gains_offset_delta_activity() — current_gain - prior_offset. - New WealthfolioSink.cumulative_amount_with_notes_prefix() — sums the existing fidelity-planviewer:unrealised-gains-offset DEPOSITs in WF so we know what's already been emitted. - CLI runs sync_provider_to_wealthfolio first (cash flows), then computes + emits the delta via import_activities. - 4 new provider tests for the delta logic; full suite (144 + 1 skipped) green; mypy + ruff clean. The old fidelity_holdings_to_snapshot helper + push_manual_snapshots sink method stay for future use but are no longer called.
396 lines
15 KiB
Python
396 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from collections.abc import Iterable
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, date
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from broker_sync.models import Account, Activity
|
|
|
|
_LOGIN_PATH = "/api/v1/auth/login"
|
|
_ACCOUNTS_PATH = "/api/v1/accounts"
|
|
_IMPORT_CHECK = "/api/v1/activities/import/check"
|
|
_IMPORT_REAL = "/api/v1/activities/import"
|
|
_SNAPSHOTS_IMPORT = "/api/v1/snapshots/import"
|
|
_ACTIVITIES_SEARCH = "/api/v1/activities/search"
|
|
|
|
|
|
class WealthfolioError(Exception):
|
|
pass
|
|
|
|
|
|
class WealthfolioUnauthorizedError(WealthfolioError):
|
|
"""Raised when login itself fails (bad creds or Wealthfolio down).
|
|
|
|
Distinct from a 401 on a random endpoint — those trigger an
|
|
automatic re-login attempt.
|
|
"""
|
|
|
|
|
|
class ImportValidationError(WealthfolioError):
|
|
"""`/activities/import/check` returned a non-2xx. We never reach the real import."""
|
|
|
|
|
|
class WealthfolioSink:
|
|
"""Push canonical Activities to Wealthfolio via its CSV import endpoint.
|
|
|
|
Auth is JWT-cookie via POST /api/v1/auth/login. Cookies are persisted
|
|
to disk so CronJob pods can reuse them across runs (Wealthfolio's
|
|
/auth/login is 5-req/min rate-limited).
|
|
|
|
Not multi-process safe — file locking is added in Phase 1 when we
|
|
fan out to multiple CronJobs.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
base_url: str,
|
|
username: str,
|
|
password: str,
|
|
session_path: Path | str,
|
|
transport: httpx.AsyncBaseTransport | None = None,
|
|
) -> None:
|
|
self._username = username
|
|
self._password = password
|
|
self._session_path = Path(session_path)
|
|
self._client = httpx.AsyncClient(
|
|
base_url=base_url.rstrip("/"),
|
|
timeout=30.0,
|
|
transport=transport,
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
await self._client.aclose()
|
|
|
|
# -- session --
|
|
|
|
def _load_cookies(self) -> dict[str, str]:
|
|
if not self._session_path.exists():
|
|
return {}
|
|
raw = json.loads(self._session_path.read_text())
|
|
got = raw.get("cookies", {})
|
|
assert isinstance(got, dict)
|
|
return got
|
|
|
|
def _save_cookies(self, cookies: dict[str, str]) -> None:
|
|
self._session_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._session_path.write_text(json.dumps({"cookies": cookies}))
|
|
|
|
async def login(self) -> None:
|
|
# Wealthfolio 3.2's LoginRequest is `{ password: String }` only — a
|
|
# username key is rejected as an unknown field (HTTP 400). The
|
|
# `username` constructor arg is kept for a future Wealthfolio
|
|
# release that may add multi-user support.
|
|
resp = await self._client.post(
|
|
_LOGIN_PATH,
|
|
json={"password": self._password},
|
|
)
|
|
if resp.status_code == 401:
|
|
raise WealthfolioUnauthorizedError("Wealthfolio /auth/login returned 401")
|
|
resp.raise_for_status()
|
|
cookies = dict(resp.cookies.items())
|
|
if not cookies:
|
|
raise WealthfolioError("/auth/login returned 2xx but no Set-Cookie")
|
|
self._save_cookies(cookies)
|
|
|
|
@staticmethod
|
|
def _cookie_header(cookies: dict[str, str]) -> str:
|
|
return "; ".join(f"{k}={v}" for k, v in cookies.items())
|
|
|
|
async def _request(self, method: str, path: str, **kw: Any) -> httpx.Response:
|
|
cookies = self._load_cookies()
|
|
headers = dict(kw.pop("headers", {}) or {})
|
|
if cookies:
|
|
headers["Cookie"] = self._cookie_header(cookies)
|
|
resp = await self._client.request(method, path, headers=headers, **kw)
|
|
if resp.status_code == 401 and path != _LOGIN_PATH:
|
|
await self.login()
|
|
cookies = self._load_cookies()
|
|
headers["Cookie"] = self._cookie_header(cookies)
|
|
resp = await self._client.request(method, path, headers=headers, **kw)
|
|
return resp
|
|
|
|
# -- accounts --
|
|
|
|
async def list_accounts(self) -> list[dict[str, Any]]:
|
|
resp = await self._request("GET", _ACCOUNTS_PATH)
|
|
resp.raise_for_status()
|
|
raw = resp.json()
|
|
assert isinstance(raw, list)
|
|
return raw
|
|
|
|
async def ensure_account(self, account: Account) -> str:
|
|
"""Idempotently create the account and return Wealthfolio's UUID for it.
|
|
|
|
Wealthfolio generates its own UUIDs on POST /accounts, ignoring any
|
|
`id` we supply. We identify accounts by (provider, providerAccountId)
|
|
which Wealthfolio DOES preserve verbatim. Our own Account.id is
|
|
used as the providerAccountId.
|
|
"""
|
|
existing = await self.list_accounts()
|
|
for a in existing:
|
|
if (a.get("provider") == account.provider and a.get("providerAccountId") == account.id):
|
|
wf_id = a.get("id")
|
|
assert isinstance(wf_id, str)
|
|
return wf_id
|
|
|
|
# NewAccount is camelCase with required booleans.
|
|
# See apps/server/src/models.rs#NewAccount.
|
|
resp = await self._request(
|
|
"POST",
|
|
_ACCOUNTS_PATH,
|
|
json={
|
|
"name": account.name,
|
|
"accountType": str(account.account_type),
|
|
"currency": account.currency,
|
|
"isDefault": False,
|
|
"isActive": True,
|
|
"isArchived": False,
|
|
"trackingMode": "TRANSACTIONS",
|
|
"provider": account.provider,
|
|
"providerAccountId": account.id,
|
|
},
|
|
)
|
|
resp.raise_for_status()
|
|
created = resp.json()
|
|
wf_id = created.get("id")
|
|
if not isinstance(wf_id, str):
|
|
raise WealthfolioError(f"POST /accounts returned no id: {created}")
|
|
return wf_id
|
|
|
|
# -- activity import --
|
|
|
|
@staticmethod
|
|
def _activity_to_import_row(a: Activity) -> dict[str, Any]:
|
|
"""Match Wealthfolio's ActivityImport struct (camelCase JSON)."""
|
|
# WF /import rejects naive datetimes with "Invalid date" (even though
|
|
# /import/check accepts them) — coerce to UTC if tzinfo is missing.
|
|
date = a.date if a.date.tzinfo is not None else a.date.replace(tzinfo=UTC)
|
|
row: dict[str, Any] = {
|
|
"date": date.isoformat(),
|
|
"symbol": a.symbol or "$CASH",
|
|
"activityType": str(a.activity_type),
|
|
"currency": a.currency,
|
|
"accountId": a.account_id,
|
|
# Required booleans on ActivityImport (no defaults in Rust struct).
|
|
"isDraft": False,
|
|
"isValid": True,
|
|
}
|
|
if a.quantity is not None:
|
|
row["quantity"] = format(a.quantity, "f")
|
|
if a.unit_price is not None:
|
|
row["unitPrice"] = format(a.unit_price, "f")
|
|
if a.amount is not None:
|
|
row["amount"] = format(a.amount, "f")
|
|
if a.fee:
|
|
row["fee"] = format(a.fee, "f")
|
|
if a.notes:
|
|
row["comment"] = a.notes
|
|
return row
|
|
|
|
async def import_activities(self, activities: Iterable[Activity]) -> list[dict[str, Any]]:
|
|
rows = [self._activity_to_import_row(a) for a in activities]
|
|
if not rows:
|
|
return []
|
|
|
|
# Step 1 — /import/check hydrates each row with resolved asset_id,
|
|
# exchange_mic, quote_ccy, instrument_type, quote_mode (and flags
|
|
# errors). The /import endpoint on Wealthfolio 3.2+ DOES NOT
|
|
# re-resolve — if we send the un-enriched row the activity is
|
|
# silently dropped (import returns 200 OK with activities=[] in
|
|
# the payload). We must feed check's output into import.
|
|
check = await self._request("POST", _IMPORT_CHECK, json={"activities": rows})
|
|
if check.status_code >= 400:
|
|
try:
|
|
payload = check.json()
|
|
except Exception:
|
|
payload = {"raw": check.text}
|
|
raise ImportValidationError(f"Wealthfolio /import/check rejected: {payload}")
|
|
|
|
checked = check.json()
|
|
if not isinstance(checked, list):
|
|
raise ImportValidationError(
|
|
f"Wealthfolio /import/check returned non-list: {type(checked).__name__}")
|
|
|
|
invalid = [r for r in checked if isinstance(r, dict) and r.get("errors")]
|
|
if invalid:
|
|
raise ImportValidationError(f"Wealthfolio /import/check flagged {len(invalid)} row(s); "
|
|
f"first: {invalid[0]}")
|
|
# Drop any row the server marked is_valid=false (shouldn't happen
|
|
# without errors, but defensive).
|
|
valid_rows = [r for r in checked if isinstance(r, dict) and r.get("isValid")]
|
|
|
|
real = await self._request("POST", _IMPORT_REAL, json={"activities": valid_rows})
|
|
real.raise_for_status()
|
|
raw = real.json()
|
|
# Two observed response shapes:
|
|
# - {activities:[...], importRunId:"...", summary:{total,imported,skipped,...}}
|
|
# - bare list (older builds)
|
|
if isinstance(raw, dict) and "activities" in raw:
|
|
got = raw["activities"]
|
|
summary = raw.get("summary") if isinstance(raw.get("summary"), dict) else None
|
|
elif isinstance(raw, list):
|
|
got = raw
|
|
summary = None
|
|
else:
|
|
got = []
|
|
summary = None
|
|
# Summary.imported is THE truth. The `activities` field echoes input
|
|
# with errors annotated — its length equals input even when zero
|
|
# actually persisted.
|
|
if summary is not None:
|
|
imported_n = int(summary.get("imported", 0))
|
|
total_n = int(summary.get("total", len(valid_rows)))
|
|
if imported_n < total_n:
|
|
err_msg = summary.get("errorMessage") or "no errorMessage"
|
|
skipped = int(summary.get("skipped", 0))
|
|
dupes = int(summary.get("duplicates", 0))
|
|
raise ImportValidationError(f"Wealthfolio /import persisted {imported_n}/{total_n} "
|
|
f"(skipped={skipped} duplicates={dupes}). "
|
|
f"errorMessage: {err_msg}")
|
|
# Legacy silent-drop guard for no-summary responses.
|
|
elif valid_rows and not got:
|
|
first_warn = next(
|
|
(r.get("warnings") for r in checked if isinstance(r, dict) and r.get("warnings")),
|
|
None,
|
|
)
|
|
raise ImportValidationError(
|
|
f"Wealthfolio /import silently dropped all {len(valid_rows)} rows. "
|
|
f"First checked row: {checked[0] if checked else 'none'}. "
|
|
f"First warning: {first_warn}")
|
|
assert isinstance(got, list)
|
|
return [r for r in got if isinstance(r, dict)]
|
|
|
|
# -- activity lookups --
|
|
|
|
async def cumulative_amount_with_notes_prefix(
|
|
self,
|
|
account_id: str,
|
|
notes_prefix: str,
|
|
) -> Decimal:
|
|
"""Sum the amount of DEPOSIT/WITHDRAWAL activities whose notes start
|
|
with ``notes_prefix``, signed (deposits positive, withdrawals negative).
|
|
|
|
Used by the Fidelity provider to compute the delta gains-offset:
|
|
``current_gain - cumulative_existing_offset`` becomes the new
|
|
DEPOSIT to emit on each monthly run.
|
|
"""
|
|
try:
|
|
resp = await self._request(
|
|
"POST", _ACTIVITIES_SEARCH,
|
|
json={"accountIds": [account_id], "page": 1, "pageSize": 500},
|
|
)
|
|
except Exception:
|
|
return Decimal(0)
|
|
if resp.status_code >= 400:
|
|
return Decimal(0)
|
|
payload = resp.json()
|
|
rows = payload.get("data", payload) if isinstance(payload, dict) else payload
|
|
if not isinstance(rows, list):
|
|
return Decimal(0)
|
|
total = Decimal(0)
|
|
for r in rows:
|
|
if not isinstance(r, dict):
|
|
continue
|
|
notes = r.get("comment") or r.get("notes") or ""
|
|
if not isinstance(notes, str) or not notes.startswith(notes_prefix):
|
|
continue
|
|
amt_raw = r.get("amount")
|
|
if amt_raw is None:
|
|
continue
|
|
try:
|
|
amt = Decimal(str(amt_raw))
|
|
except Exception:
|
|
continue
|
|
atype = (r.get("activityType") or r.get("activity_type") or "").upper()
|
|
if atype == "WITHDRAWAL":
|
|
total -= amt
|
|
else:
|
|
total += amt
|
|
return total
|
|
|
|
# -- manual holdings snapshots --
|
|
|
|
async def push_manual_snapshots(
|
|
self,
|
|
account_id: str,
|
|
snapshots: list[ManualSnapshotPayload],
|
|
) -> dict[str, Any]:
|
|
"""Push manual holdings snapshots to /api/v1/snapshots/import.
|
|
|
|
Each snapshot carries a date + per-fund positions + cash balances.
|
|
Wealthfolio auto-creates any unknown asset symbol with
|
|
``kind=INVESTMENT, quoteMode=MANUAL, quoteCcy=<currency>`` and uses
|
|
the snapshot to derive holdings + valuation for that date — bypassing
|
|
the activity-ledger derivation entirely for the targeted day.
|
|
|
|
Used by the Fidelity provider since PlanViewer exposes current
|
|
fund units + price but no per-trade history. Re-imports for the
|
|
same (account, date) overwrite in place.
|
|
"""
|
|
if not snapshots:
|
|
return {"snapshotsImported": 0, "snapshotsFailed": 0, "errors": []}
|
|
body = {
|
|
"accountId": account_id,
|
|
"snapshots": [_snapshot_to_payload(s) for s in snapshots],
|
|
}
|
|
resp = await self._request("POST", _SNAPSHOTS_IMPORT, json=body)
|
|
if resp.status_code >= 400:
|
|
try:
|
|
payload = resp.json()
|
|
except Exception:
|
|
payload = {"raw": resp.text}
|
|
raise WealthfolioError(
|
|
f"Wealthfolio /snapshots/import rejected: {payload}")
|
|
result = resp.json()
|
|
assert isinstance(result, dict)
|
|
failed = int(result.get("snapshotsFailed", 0))
|
|
if failed > 0:
|
|
raise WealthfolioError(
|
|
f"Wealthfolio /snapshots/import: {failed} snapshot(s) failed; "
|
|
f"errors={result.get('errors')}")
|
|
return result
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SnapshotPosition:
|
|
"""A per-fund position row in a Wealthfolio manual snapshot."""
|
|
symbol: str
|
|
quantity: Decimal
|
|
average_cost: Decimal
|
|
total_cost_basis: Decimal
|
|
currency: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ManualSnapshotPayload:
|
|
"""Sink-facing snapshot row. Mirrors the JSON shape WF expects."""
|
|
date: date
|
|
currency: str
|
|
positions: list[SnapshotPosition]
|
|
cash_balances: dict[str, Decimal]
|
|
|
|
|
|
def _snapshot_to_payload(s: ManualSnapshotPayload) -> dict[str, Any]:
|
|
"""Serialise a ManualSnapshotPayload into WF's import wire format."""
|
|
return {
|
|
"date": s.date.isoformat(),
|
|
"currency": s.currency,
|
|
"positions": [
|
|
{
|
|
"symbol": p.symbol,
|
|
"quantity": format(p.quantity, "f"),
|
|
"averageCost": format(p.average_cost, "f"),
|
|
"totalCostBasis": format(p.total_cost_basis, "f"),
|
|
"currency": p.currency,
|
|
}
|
|
for p in s.positions
|
|
],
|
|
"cashBalances": {k: format(v, "f") for k, v in s.cash_balances.items()},
|
|
}
|