wealthfolio: add compute_position_qty for broker reconciliation

This commit is contained in:
Viktor Barzin 2026-05-26 22:26:24 +00:00
parent 975c3b4bf7
commit 882415464e
2 changed files with 103 additions and 0 deletions

View file

@ -315,6 +315,50 @@ class WealthfolioSink:
total += amt
return total
async def compute_position_qty(self, account_id: str) -> dict[str, Decimal]:
"""Return per-symbol net position quantity (BUY/IN minus SELL/OUT) for
one account. Skips cash activities and unknown activity types.
Used by the IBKR reconciliation step to compare against broker-reported
OpenPositions.
"""
qty_by_symbol: dict[str, Decimal] = {}
page = 1
while True:
resp = await self._request(
"POST", _ACTIVITIES_SEARCH,
json={"accountIds": [account_id], "page": page, "pageSize": 500},
)
resp.raise_for_status()
payload = resp.json()
activities = payload.get("activities", []) if isinstance(payload, dict) else []
if not activities:
break
for act in activities:
if not isinstance(act, dict):
continue
symbol = act.get("symbol") or ""
if not symbol or symbol.startswith("$CASH"):
continue
act_type = act.get("activityType") or ""
sign: int
if act_type in {"BUY", "ADD_HOLDING", "TRANSFER_IN"}:
sign = 1
elif act_type in {"SELL", "REMOVE_HOLDING", "TRANSFER_OUT"}:
sign = -1
else:
continue
try:
qty = Decimal(str(act.get("quantity") or 0))
except Exception:
continue
qty_by_symbol[symbol] = qty_by_symbol.get(symbol, Decimal(0)) + sign * qty
total_pages = int(payload.get("totalPages") or 1) if isinstance(payload, dict) else 1
if page >= total_pages:
break
page += 1
return qty_by_symbol
# -- manual holdings snapshots --
async def push_manual_snapshots(

View file

@ -373,3 +373,62 @@ async def test_push_manual_snapshots_short_circuits_on_empty(
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.push_manual_snapshots(account_id="acct", snapshots=[])
assert result["snapshotsImported"] == 0
# -- compute_position_qty (used by IBKR reconciliation) --
@pytest.mark.asyncio
async def test_compute_position_qty_sums_buys_minus_sells(tmp_path: Path) -> None:
"""Sums BUY/ADD_HOLDING/TRANSFER_IN minus SELL/REMOVE_HOLDING/TRANSFER_OUT
quantities per symbol, skipping cash activities."""
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
page_1: dict[str, Any] = {
"activities": [
{"symbol": "VUAG.L", "activityType": "BUY", "quantity": "10"},
{"symbol": "VUAG.L", "activityType": "SELL", "quantity": "2"},
{"symbol": "AAPL", "activityType": "BUY", "quantity": "5"},
{"symbol": "$CASH-GBP", "activityType": "DEPOSIT", "quantity": "0",
"amount": "100"},
# Unknown activity type — must be skipped, not crash.
{"symbol": "VUAG.L", "activityType": "DIVIDEND", "quantity": "0",
"amount": "0.5"},
],
"totalPages": 1,
}
async def handler(req: httpx.Request) -> httpx.Response:
if req.url.path == "/api/v1/activities/search":
return httpx.Response(200, json=page_1)
raise AssertionError(f"unexpected request: {req.method} {req.url.path}")
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.compute_position_qty("acct-123")
assert result == {"VUAG.L": Decimal("8"), "AAPL": Decimal("5")}
@pytest.mark.asyncio
async def test_compute_position_qty_paginates(tmp_path: Path) -> None:
"""Walks all pages until totalPages reached."""
sp = tmp_path / "s.json"
sp.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
pages: dict[int, dict[str, Any]] = {
1: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY",
"quantity": "3"}], "totalPages": 2},
2: {"activities": [{"symbol": "VUAG.L", "activityType": "BUY",
"quantity": "4"}], "totalPages": 2},
}
seen_pages: list[int] = []
async def handler(req: httpx.Request) -> httpx.Response:
body = json.loads(req.content)
seen_pages.append(body["page"])
return httpx.Response(200, json=pages[body["page"]])
sink = _client(httpx.MockTransport(handler), sp)
result = await sink.compute_position_qty("acct-x")
assert sorted(seen_pages) == [1, 2]
assert result == {"VUAG.L": Decimal("7")}