From 882415464e3bee2d23d4d491dab0d8c9d82d430b Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Tue, 26 May 2026 22:26:24 +0000 Subject: [PATCH] wealthfolio: add compute_position_qty for broker reconciliation --- broker_sync/sinks/wealthfolio.py | 44 ++++++++++++++++++++++++ tests/sinks/test_wealthfolio.py | 59 ++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/broker_sync/sinks/wealthfolio.py b/broker_sync/sinks/wealthfolio.py index 7144f6f..51a2d41 100644 --- a/broker_sync/sinks/wealthfolio.py +++ b/broker_sync/sinks/wealthfolio.py @@ -315,6 +315,50 @@ class WealthfolioSink: total += amt return total + async def compute_position_qty(self, account_id: str) -> dict[str, Decimal]: + """Return per-symbol net position quantity (BUY/IN minus SELL/OUT) for + one account. Skips cash activities and unknown activity types. + + Used by the IBKR reconciliation step to compare against broker-reported + OpenPositions. + """ + qty_by_symbol: dict[str, Decimal] = {} + page = 1 + while True: + resp = await self._request( + "POST", _ACTIVITIES_SEARCH, + json={"accountIds": [account_id], "page": page, "pageSize": 500}, + ) + resp.raise_for_status() + payload = resp.json() + activities = payload.get("activities", []) if isinstance(payload, dict) else [] + if not activities: + break + for act in activities: + if not isinstance(act, dict): + continue + symbol = act.get("symbol") or "" + if not symbol or symbol.startswith("$CASH"): + continue + act_type = act.get("activityType") or "" + sign: int + if act_type in {"BUY", "ADD_HOLDING", "TRANSFER_IN"}: + sign = 1 + elif act_type in {"SELL", "REMOVE_HOLDING", "TRANSFER_OUT"}: + sign = -1 + else: + continue + try: + qty = Decimal(str(act.get("quantity") or 0)) + except Exception: + continue + qty_by_symbol[symbol] = qty_by_symbol.get(symbol, Decimal(0)) + sign * qty + total_pages = int(payload.get("totalPages") or 1) if isinstance(payload, dict) else 1 + if page >= total_pages: + break + page += 1 + return qty_by_symbol + # -- manual holdings snapshots -- async def push_manual_snapshots( diff --git a/tests/sinks/test_wealthfolio.py b/tests/sinks/test_wealthfolio.py index 436e52b..2b43681 100644 --- a/tests/sinks/test_wealthfolio.py +++ b/tests/sinks/test_wealthfolio.py @@ -373,3 +373,62 @@ async def test_push_manual_snapshots_short_circuits_on_empty( sink = _client(httpx.MockTransport(handler), sp) result = await sink.push_manual_snapshots(account_id="acct", snapshots=[]) assert result["snapshotsImported"] == 0 + + +# -- compute_position_qty (used by IBKR reconciliation) -- + + +@pytest.mark.asyncio +async def test_compute_position_qty_sums_buys_minus_sells(tmp_path: Path) -> None: + """Sums BUY/ADD_HOLDING/TRANSFER_IN minus SELL/REMOVE_HOLDING/TRANSFER_OUT + quantities per symbol, skipping cash activities.""" + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + page_1: dict[str, Any] = { + "activities": [ + {"symbol": "VUAG.L", "activityType": "BUY", "quantity": "10"}, + {"symbol": "VUAG.L", "activityType": "SELL", "quantity": "2"}, + {"symbol": "AAPL", "activityType": "BUY", "quantity": "5"}, + {"symbol": "$CASH-GBP", "activityType": "DEPOSIT", "quantity": "0", + "amount": "100"}, + # Unknown activity type — must be skipped, not crash. + {"symbol": "VUAG.L", "activityType": "DIVIDEND", "quantity": "0", + "amount": "0.5"}, + ], + "totalPages": 1, + } + + async def handler(req: httpx.Request) -> httpx.Response: + if req.url.path == "/api/v1/activities/search": + return httpx.Response(200, json=page_1) + raise AssertionError(f"unexpected request: {req.method} {req.url.path}") + + sink = _client(httpx.MockTransport(handler), sp) + result = await sink.compute_position_qty("acct-123") + assert result == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")} + + +@pytest.mark.asyncio +async def test_compute_position_qty_paginates(tmp_path: Path) -> None: + """Walks all pages until totalPages reached.""" + sp = tmp_path / "s.json" + sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}})) + + pages: dict[int, dict[str, Any]] = { + 1: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY", + "quantity": "3"}], "totalPages": 2}, + 2: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY", + "quantity": "4"}], "totalPages": 2}, + } + seen_pages: list[int] = [] + + async def handler(req: httpx.Request) -> httpx.Response: + body = json.loads(req.content) + seen_pages.append(body["page"]) + return httpx.Response(200, json=pages[body["page"]]) + + sink = _client(httpx.MockTransport(handler), sp) + result = await sink.compute_position_qty("acct-x") + assert sorted(seen_pages) == [1, 2] + assert result == {"VUAG.L": Decimal("7")}