diff --git a/broker_sync/providers/trading212.py b/broker_sync/providers/trading212.py index 0fd0fe0..28b50d1 100644 --- a/broker_sync/providers/trading212.py +++ b/broker_sync/providers/trading212.py @@ -1,11 +1,14 @@ from __future__ import annotations +import asyncio +import random import re from collections.abc import AsyncIterator from datetime import datetime from decimal import Decimal from pathlib import Path from typing import Any +from urllib.parse import parse_qs, urlparse import httpx @@ -18,12 +21,22 @@ _PAGE_LIMIT = 50 _SUFFIX_RE = re.compile(r"(?:_US)?(?:[a-z])?_EQ$") +# Retry config for 429 without Retry-After and for 5xx. +_BACKOFF_INITIAL = 10.0 +_BACKOFF_CAP = 120.0 +_MAX_RETRIES = 3 + def _normalise_ticker(raw: str) -> str: """Strip T212's exchange-suffix decoration from a ticker.""" return _SUFFIX_RE.sub("", raw) +def _jitter(base: float) -> float: + """Return a jittered backoff value in [base/2, base].""" + return base * (0.5 + random.random() / 2) + + class Trading212Error(Exception): """Any non-retryable Trading212 API failure.""" @@ -37,7 +50,11 @@ class Trading212Provider: One instance serves every T212 wrapper the user owns (ISA + Invest) — the caller hands over (Account, api_key) pairs and `fetch()` walks - each account's history in turn. + each account's history in turn. Pagination is cursor-based via the + `nextPagePath` field in each response; the provider saves the cursor + that points to the NEXT page only after the current page has been + fully yielded to the caller, so a crash mid-stream resumes at the + start of the unfinished page rather than halfway through. """ name = "trading212" @@ -85,37 +102,85 @@ class Trading212Provider: provider=self.name, account_id=account.id, ) - page = await self._get_page(api_key, cursor=None) - for item in page.get("items", []): - activity = _item_to_activity(item, account) - if activity is None: - continue - if since is not None and activity.date < since: - continue - if before is not None and activity.date >= before: - continue - yield activity - # Checkpoint saved at end of page — resume on next run. - next_cursor = page.get("nextPagePath") - if isinstance(next_cursor, str): - checkpoint.save(next_cursor) + cursor: str | None = None + while True: + page = await self._get_page_with_retry(api_key, cursor) + items = page.get("items", []) + saw_too_old = False + for item in items: + activity = _item_to_activity(item, account) + if activity is None: + continue + if since is not None and activity.date < since: + saw_too_old = True + continue + if before is not None and activity.date >= before: + continue + yield activity - async def _get_page(self, api_key: str, cursor: str | None) -> dict[str, Any]: + next_path = page.get("nextPagePath") + if isinstance(next_path, str) and next_path: + checkpoint.save(next_path) + if not isinstance(next_path, str) or not next_path or saw_too_old: + return + cursor = _extract_cursor(next_path) + + async def _get_page_with_retry( + self, + api_key: str, + cursor: str | None, + ) -> dict[str, Any]: + attempts = 0 + backoff = _BACKOFF_INITIAL + while True: + resp = await self._request_page(api_key, cursor) + if resp.status_code == 200: + raw = resp.json() + assert isinstance(raw, dict) + return raw + if resp.status_code == 401: + raise Trading212AuthError("Trading212 rejected API key (HTTP 401)") + retryable = resp.status_code == 429 or 500 <= resp.status_code < 600 + if not retryable: + raise Trading212Error(f"Trading212 /orders HTTP {resp.status_code}: {resp.text}") + if attempts >= _MAX_RETRIES: + raise Trading212Error( + f"Trading212 /orders HTTP {resp.status_code} after {attempts} retries") + sleep_for = _sleep_after(resp, backoff) + await asyncio.sleep(sleep_for) + attempts += 1 + backoff = min(backoff * 2, _BACKOFF_CAP) + + async def _request_page(self, api_key: str, cursor: str | None) -> httpx.Response: params: dict[str, str | int] = {"limit": _PAGE_LIMIT} if cursor is not None: params["cursor"] = cursor - resp = await self._client.get( + return await self._client.get( _ORDERS_PATH, params=params, headers={"Authorization": api_key}, ) - if resp.status_code == 401: - raise Trading212AuthError("Trading212 rejected API key (HTTP 401)") - if resp.status_code >= 400: - raise Trading212Error(f"Trading212 /orders HTTP {resp.status_code}: {resp.text}") - raw = resp.json() - assert isinstance(raw, dict) - return raw + + +def _sleep_after(resp: httpx.Response, backoff: float) -> float: + if resp.status_code == 429: + retry_after = resp.headers.get("Retry-After") + if retry_after is not None: + try: + return float(retry_after) + except ValueError: + pass + return _jitter(backoff) + + +def _extract_cursor(next_page_path: str) -> str | None: + """Pull the `cursor` query param out of a nextPagePath URL fragment.""" + parsed = urlparse(next_page_path) + q = parse_qs(parsed.query) + cursor_values = q.get("cursor") + if not cursor_values: + return None + return cursor_values[0] def _item_to_activity(item: dict[str, Any], account: Account) -> Activity | None: diff --git a/poetry.lock b/poetry.lock index f1bcf1b..58029c8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -56,14 +56,14 @@ files = [ [[package]] name = "click" -version = "8.3.2" +version = "8.1.8" description = "Composable command line interface toolkit" optional = false -python-versions = ">=3.10" +python-versions = ">=3.7" groups = ["main"] files = [ - {file = "click-8.3.2-py3-none-any.whl", hash = "sha256:1924d2c27c5653561cd2cae4548d1406039cb79b858b747cfea24924bbc1616d"}, - {file = "click-8.3.2.tar.gz", hash = "sha256:14162b8b3b3550a7d479eafa77dfd3c38d9dc8951f6f69c78913a8f9a7540fd5"}, + {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, + {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, ] [package.dependencies] @@ -658,4 +658,4 @@ platformdirs = ">=3.5.1" [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.13" -content-hash = "50434b40777094ac433d85d7c5a4629f06f5df31e5151af8ca1177db4b8b3e62" +content-hash = "b9c19ac1963682740a98cd539d3790ff180c2e8195d5cfcc9572da855db3fa7d" diff --git a/pyproject.toml b/pyproject.toml index 6841999..adcf5cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ httpx = "^0.27" beautifulsoup4 = "^4.12" python-dateutil = "^2.9" typer = "^0.12" +click = "<8.2" # typer 0.12 uses make_metavar() without ctx; click 8.2 made ctx required [tool.poetry.group.dev.dependencies] pytest = "^8.3" diff --git a/tests/providers/test_trading212.py b/tests/providers/test_trading212.py index 047d160..0fbefeb 100644 --- a/tests/providers/test_trading212.py +++ b/tests/providers/test_trading212.py @@ -213,3 +213,176 @@ def test_accounts_returns_registered_pairs(tmp_path: Path) -> None: ) accs = p.accounts() assert [a.id for a in accs] == ["t212-isa", "t212-invest"] + + +# -- pagination -- + + +async def test_pagination_follows_next_page_path(tmp_path: Path) -> None: + pages = iter([ + _page([_fill(fill_id="p1-a"), _fill(fill_id="p1-b")], + next_path="/api/v0/equity/history/orders?cursor=p2"), + _page([_fill(fill_id="p2-a")], next_path=None), + ]) + visited: list[str | None] = [] + + def handler(req: httpx.Request) -> httpx.Response: + visited.append(req.url.params.get("cursor")) + return httpx.Response(200, json=next(pages)) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + out = await _collect(p) + assert [a.external_id for a in out] == [ + "t212:fill:p1-a", + "t212:fill:p1-b", + "t212:fill:p2-a", + ] + # First call has no cursor; second uses the cursor from nextPagePath. + assert visited == [None, "p2"] + + +async def test_pagination_stops_when_since_reached(tmp_path: Path) -> None: + # First page has one too-old fill; remaining fill is new. Provider + # must stop without fetching page 2 once a page has items strictly + # older than `since`. + pages = iter([ + _page( + [ + _fill(fill_id="new", filled_at="2026-04-01T10:30:00.000Z"), + _fill(fill_id="old", filled_at="2020-01-01T00:00:00.000Z"), + ], + next_path="/api/v0/equity/history/orders?cursor=p2", + ), + _page([_fill(fill_id="p2-a")], next_path=None), + ]) + call_count = 0 + + def handler(req: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + return httpx.Response(200, json=next(pages)) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + since = datetime(2026, 1, 1, tzinfo=UTC) + out = [a async for a in p.fetch(since=since)] + assert [a.external_id for a in out] == ["t212:fill:new"] + assert call_count == 1 # did NOT walk to page 2 + + +async def test_checkpoint_advances_only_after_page_yielded(tmp_path: Path) -> None: + cursor_next = "/api/v0/equity/history/orders?cursor=p2" + calls = 0 + + def handler(req: httpx.Request) -> httpx.Response: + # Page 1: one fill + next_path (triggers a checkpoint save). + # Page 2: empty + no next — terminates the loop cleanly. + nonlocal calls + calls += 1 + if calls == 1: + return httpx.Response(200, json=_page([_fill()], next_path=cursor_next)) + return httpx.Response(200, json=_page([], next_path=None)) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + await _collect(p) + # After a successful fetch, the checkpoint holds the cursor for the NEXT page. + from broker_sync.providers._checkpoint import Checkpoint + cp = Checkpoint(tmp_path, provider="trading212", account_id="t212-isa") + assert cp.load() == cursor_next + + +# -- retries -- + + +async def test_429_with_retry_after_sleeps_then_retries(tmp_path: Path, + monkeypatch: pytest.MonkeyPatch) -> None: + calls = 0 + sleeps: list[float] = [] + + async def fake_sleep(seconds: float) -> None: + sleeps.append(seconds) + + monkeypatch.setattr("asyncio.sleep", fake_sleep) + + def handler(req: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + if calls == 1: + return httpx.Response(429, headers={"Retry-After": "7"}) + return httpx.Response(200, json=_page([_fill()])) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + out = await _collect(p) + assert len(out) == 1 + assert calls == 2 + assert sleeps == [7.0] + + +async def test_429_without_retry_after_uses_backoff(tmp_path: Path, + monkeypatch: pytest.MonkeyPatch) -> None: + calls = 0 + sleeps: list[float] = [] + + async def fake_sleep(seconds: float) -> None: + sleeps.append(seconds) + + monkeypatch.setattr("asyncio.sleep", fake_sleep) + # Deterministic jitter: always 0.5 of the cap. + monkeypatch.setattr( + "broker_sync.providers.trading212._jitter", + lambda base: base, + ) + + def handler(req: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + if calls < 3: + return httpx.Response(429) + return httpx.Response(200, json=_page([_fill()])) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + out = await _collect(p) + assert len(out) == 1 + # First backoff = 10s; second doubles to 20s. + assert sleeps == [10.0, 20.0] + + +async def test_429_gives_up_after_max_retries(tmp_path: Path, + monkeypatch: pytest.MonkeyPatch) -> None: + + async def noop_sleep(seconds: float) -> None: + return None + + monkeypatch.setattr("asyncio.sleep", noop_sleep) + + def handler(req: httpx.Request) -> httpx.Response: + return httpx.Response(429) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + with pytest.raises(Trading212Error, match="429"): + await _collect(p) + + +async def test_5xx_retries_with_backoff(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + calls = 0 + sleeps: list[float] = [] + + async def fake_sleep(seconds: float) -> None: + sleeps.append(seconds) + + monkeypatch.setattr("asyncio.sleep", fake_sleep) + monkeypatch.setattr( + "broker_sync.providers.trading212._jitter", + lambda base: base, + ) + + def handler(req: httpx.Request) -> httpx.Response: + nonlocal calls + calls += 1 + if calls == 1: + return httpx.Response(502) + return httpx.Response(200, json=_page([_fill()])) + + p = _provider(checkpoint_dir=tmp_path, transport=httpx.MockTransport(handler)) + out = await _collect(p) + assert len(out) == 1 + assert sleeps == [10.0]