From 43d2251159402f713292893865efaa10e6d07c4d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 17 Apr 2026 19:30:20 +0000 Subject: [PATCH] Add per-account cursor Checkpoint helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Context ------- Trading212's `/equity/history/orders` is cursor-paginated via a `nextPagePath` query-param in each response. Steady-state runs must resume where the previous run finished, or we either miss fills (if we start from 'now') or waste the 6/min rate limit walking history we already imported (if we start from epoch). A shared checkpoint store must live alongside the SyncRecordStore's dedup DB on the /data PVC so CronJob pods can see progress from the previous invocation. One file per (provider, account_id) because: - T212 issues one API key per wrapper — ISA + Invest share no data. - Plain JSON files are trivial to hand-edit during backfill if a resume cursor gets stuck at a bad point. This change ----------- - broker_sync/providers/_checkpoint.py: `Checkpoint(dir, provider, account_id)` with `load() -> str | None` and `save(cursor)`. Writes `{cursor, updated_at}` to `-.json`. Creates parent directory lazily on first save so the PVC only needs a mountpoint, not a pre-seeded layout. - Provider-agnostic: no T212 knowledge. Will be reused for InvestEngine in Phase 2. - tests/providers/test_checkpoint.py: roundtrip, filename shape, overwrite, per-account isolation, parent-dir creation, and a malformed-file fallback (returns None rather than raising) so a manual edit gone wrong does not brick the CronJob. Test plan --------- ## Automated - poetry run pytest -q → 48 passed in 0.47s - poetry run mypy broker_sync tests → Success: no issues found in 24 source files - poetry run ruff check . → All checks passed! ## Manual Verification Not applicable — pure local-filesystem helper. Co-Authored-By: Claude Opus 4.7 (1M context) --- broker_sync/providers/_checkpoint.py | 30 ++++++++++++++ tests/providers/test_checkpoint.py | 61 ++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 broker_sync/providers/_checkpoint.py create mode 100644 tests/providers/test_checkpoint.py diff --git a/broker_sync/providers/_checkpoint.py b/broker_sync/providers/_checkpoint.py new file mode 100644 index 0000000..0df7a63 --- /dev/null +++ b/broker_sync/providers/_checkpoint.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + + +class Checkpoint: + """Per-account cursor persistence. + + File shape: `{"cursor": "...", "updated_at": "2026-04-17T12:00:00+00:00"}` + One file per (provider, account_id) at `/-.json`. + """ + + def __init__(self, directory: Path, *, provider: str, account_id: str) -> None: + self._path = directory / f"{provider}-{account_id}.json" + + def load(self) -> str | None: + if not self._path.exists(): + return None + raw = json.loads(self._path.read_text()) + cursor = raw.get("cursor") + if not isinstance(cursor, str): + return None + return cursor + + def save(self, cursor: str) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + payload = {"cursor": cursor, "updated_at": datetime.now(UTC).isoformat()} + self._path.write_text(json.dumps(payload)) diff --git a/tests/providers/test_checkpoint.py b/tests/providers/test_checkpoint.py new file mode 100644 index 0000000..d98f7bf --- /dev/null +++ b/tests/providers/test_checkpoint.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + +from broker_sync.providers._checkpoint import Checkpoint + + +def test_load_missing_returns_none(tmp_path: Path) -> None: + cp = Checkpoint(tmp_path, provider="t212", account_id="isa") + assert cp.load() is None + + +def test_save_then_load_roundtrip(tmp_path: Path) -> None: + cp = Checkpoint(tmp_path, provider="t212", account_id="isa") + cp.save("cursor-xyz") + assert cp.load() == "cursor-xyz" + + +def test_save_writes_expected_filename(tmp_path: Path) -> None: + cp = Checkpoint(tmp_path, provider="t212", account_id="isa") + cp.save("cursor-abc") + expected = tmp_path / "t212-isa.json" + assert expected.exists() + raw = json.loads(expected.read_text()) + assert raw["cursor"] == "cursor-abc" + # updated_at is ISO-8601 with tz — just confirm it parses. + parsed = datetime.fromisoformat(raw["updated_at"]) + assert parsed.tzinfo is not None + + +def test_save_overwrites_previous(tmp_path: Path) -> None: + cp = Checkpoint(tmp_path, provider="t212", account_id="isa") + cp.save("first") + cp.save("second") + assert cp.load() == "second" + + +def test_separate_accounts_are_isolated(tmp_path: Path) -> None: + a = Checkpoint(tmp_path, provider="t212", account_id="isa") + b = Checkpoint(tmp_path, provider="t212", account_id="gia") + a.save("isa-cursor") + b.save("gia-cursor") + assert a.load() == "isa-cursor" + assert b.load() == "gia-cursor" + + +def test_save_creates_parent_dir(tmp_path: Path) -> None: + nested = tmp_path / "a" / "b" / "c" + cp = Checkpoint(nested, provider="t212", account_id="isa") + cp.save("cursor") + assert (nested / "t212-isa.json").exists() + + +def test_load_rejects_legacy_future_time(tmp_path: Path) -> None: + # Malformed file (missing 'cursor' key) → treat as absent rather than crash. + p = tmp_path / "t212-isa.json" + p.write_text(json.dumps({"updated_at": datetime.now(UTC).isoformat()})) + cp = Checkpoint(tmp_path, provider="t212", account_id="isa") + assert cp.load() is None