broker-sync/tests/test_pipeline.py
Viktor Barzin b363032e42 sinks: feed /import/check enrichment into /import body
/import/check hydrates each ActivityImport with resolved assetId,
exchangeMic, quoteCcy, instrumentType, quoteMode. The /import endpoint
on Wealthfolio 3.2 does NOT re-resolve — passing an un-enriched row
returns 200 OK but silently drops the activity (activities=[] in the
response).

The first live run returned `imported=63 failed=0` but nothing reached
the database. Fixed by posting the hydrated rows from the check response
to /import instead of the original.

Requires the test to also return list-shaped check responses (matches
the upstream Json<Vec<ActivityImport>> signature on the Rust side).

poetry run pytest -q     70 passed
poetry run mypy          clean
poetry run ruff check    clean
2026-04-17 20:54:17 +00:00

175 lines
5.5 KiB
Python

from __future__ import annotations
import json
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from decimal import Decimal
from pathlib import Path
import httpx
from broker_sync.dedup import SyncRecordStore
from broker_sync.models import Account, AccountType, Activity, ActivityType
from broker_sync.pipeline import sync_provider_to_wealthfolio
from broker_sync.sinks.wealthfolio import WealthfolioSink
class _FakeProvider:
name = "fake"
def __init__(self, accounts: list[Account], activities: list[Activity]) -> None:
self._accounts = accounts
self._activities = activities
def accounts(self) -> list[Account]:
return list(self._accounts)
async def fetch(
self,
*,
since: datetime | None = None,
before: datetime | None = None,
) -> AsyncIterator[Activity]:
for a in self._activities:
yield a
def _buy(external_id: str, account_id: str = "fake-isa") -> Activity:
return Activity(
external_id=external_id,
account_id=account_id,
account_type=AccountType.ISA,
date=datetime(2026, 4, 1, tzinfo=UTC),
activity_type=ActivityType.BUY,
symbol="VUAG",
quantity=Decimal("1"),
unit_price=Decimal("100"),
currency="GBP",
)
def _sink(transport: httpx.MockTransport, session_path: Path) -> WealthfolioSink:
session_path.write_text(json.dumps({"cookies": {"wf_token": "fresh"}}))
return WealthfolioSink(
base_url="https://wf.test",
username="x",
password="x",
session_path=session_path,
transport=transport,
)
async def test_pipeline_skips_dedup_then_imports_new(tmp_path: Path) -> None:
account = Account(
id="fake-isa",
name="Fake ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="fake",
)
provider = _FakeProvider([account], [_buy("a"), _buy("b"), _buy("c")])
posted_batches: list[str] = []
async def handler(req: httpx.Request) -> httpx.Response:
if req.method == "GET" and req.url.path == "/api/v1/accounts":
# Return account with Wealthfolio-assigned UUID + our providerAccountId.
return httpx.Response(
200,
json=[{
"id": "wf-uuid-fake-isa",
"provider": "fake",
"providerAccountId": "fake-isa",
}],
)
if req.url.path == "/api/v1/activities/import/check":
body = json.loads(req.content)
# Echo each activity back marked valid (mimic Wealthfolio's
# hydrate step).
return httpx.Response(200, json=[
{**a, "isValid": True, "errors": None} for a in body["activities"]
])
if req.url.path == "/api/v1/activities/import":
body = req.content.decode()
posted_batches.append(body)
return httpx.Response(
200,
json={"activities": [
{"id": f"wf-{i}", "external_id": ext}
for i, ext in enumerate(["a", "b", "c"])
]},
)
return httpx.Response(500)
sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json")
dedup = SyncRecordStore(tmp_path / "sync.db")
# Seed one already-seen to confirm dedup.
dedup.record("fake", "fake-isa", "a", wealthfolio_activity_id="wf-old")
try:
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
assert result.fetched == 3
assert result.new_after_dedup == 2
assert result.imported == 2
assert result.failed == 0
assert len(posted_batches) == 1
body = posted_batches[0]
# Only the new rows (b, c) — NOT the already-seen "a".
assert "sync:fake:a" not in body
assert "sync:fake:b" in body
assert "sync:fake:c" in body
# All three external_ids are now in dedup after the run.
assert dedup.has_seen("fake", "fake-isa", "a")
assert dedup.has_seen("fake", "fake-isa", "b")
assert dedup.has_seen("fake", "fake-isa", "c")
async def test_pipeline_records_failure_when_import_rejects(tmp_path: Path) -> None:
account = Account(
id="fake-isa",
name="Fake ISA",
account_type=AccountType.ISA,
currency="GBP",
provider="fake",
)
provider = _FakeProvider([account], [_buy("a")])
async def handler(req: httpx.Request) -> httpx.Response:
if req.method == "GET" and req.url.path == "/api/v1/accounts":
return httpx.Response(
200,
json=[{
"id": "wf-uuid-fake-isa",
"provider": "fake",
"providerAccountId": "fake-isa",
}],
)
if req.url.path == "/api/v1/activities/import/check":
return httpx.Response(400, json={"errors": ["bad row"]})
return httpx.Response(500)
sink = _sink(httpx.MockTransport(handler), tmp_path / "wf-session.json")
dedup = SyncRecordStore(tmp_path / "sync.db")
try:
result = await sync_provider_to_wealthfolio(
provider=provider,
sink=sink,
dedup=dedup,
)
finally:
await sink.close()
assert result.fetched == 1
assert result.imported == 0
assert result.failed == 1
# NOT recorded in dedup so the next run retries.
assert not dedup.has_seen("fake", "fake-isa", "a")