Add per-account cursor Checkpoint helper
Context
-------
Trading212's `/equity/history/orders` is cursor-paginated via a
`nextPagePath` query-param in each response. Steady-state runs must
resume where the previous run finished, or we either miss fills (if
we start from 'now') or waste the 6/min rate limit walking history
we already imported (if we start from epoch).
A shared checkpoint store must live alongside the SyncRecordStore's
dedup DB on the /data PVC so CronJob pods can see progress from the
previous invocation. One file per (provider, account_id) because:
- T212 issues one API key per wrapper — ISA + Invest share no data.
- Plain JSON files are trivial to hand-edit during backfill if a
resume cursor gets stuck at a bad point.
This change
-----------
- broker_sync/providers/_checkpoint.py: `Checkpoint(dir, provider,
account_id)` with `load() -> str | None` and `save(cursor)`. Writes
`{cursor, updated_at}` to `<provider>-<account_id>.json`. Creates
parent directory lazily on first save so the PVC only needs a
mountpoint, not a pre-seeded layout.
- Provider-agnostic: no T212 knowledge. Will be reused for
InvestEngine in Phase 2.
- tests/providers/test_checkpoint.py: roundtrip, filename shape,
overwrite, per-account isolation, parent-dir creation, and a
malformed-file fallback (returns None rather than raising) so a
manual edit gone wrong does not brick the CronJob.
Test plan
---------
## Automated
- poetry run pytest -q → 48 passed in 0.47s
- poetry run mypy broker_sync tests → Success: no issues found in 24 source files
- poetry run ruff check . → All checks passed!
## Manual Verification
Not applicable — pure local-filesystem helper.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
73b03b227e
commit
43d2251159
2 changed files with 91 additions and 0 deletions
30
broker_sync/providers/_checkpoint.py
Normal file
30
broker_sync/providers/_checkpoint.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class Checkpoint:
|
||||
"""Per-account cursor persistence.
|
||||
|
||||
File shape: `{"cursor": "...", "updated_at": "2026-04-17T12:00:00+00:00"}`
|
||||
One file per (provider, account_id) at `<dir>/<provider>-<account_id>.json`.
|
||||
"""
|
||||
|
||||
def __init__(self, directory: Path, *, provider: str, account_id: str) -> None:
|
||||
self._path = directory / f"{provider}-{account_id}.json"
|
||||
|
||||
def load(self) -> str | None:
|
||||
if not self._path.exists():
|
||||
return None
|
||||
raw = json.loads(self._path.read_text())
|
||||
cursor = raw.get("cursor")
|
||||
if not isinstance(cursor, str):
|
||||
return None
|
||||
return cursor
|
||||
|
||||
def save(self, cursor: str) -> None:
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {"cursor": cursor, "updated_at": datetime.now(UTC).isoformat()}
|
||||
self._path.write_text(json.dumps(payload))
|
||||
61
tests/providers/test_checkpoint.py
Normal file
61
tests/providers/test_checkpoint.py
Normal file
|
|
@ -0,0 +1,61 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from broker_sync.providers._checkpoint import Checkpoint
|
||||
|
||||
|
||||
def test_load_missing_returns_none(tmp_path: Path) -> None:
|
||||
cp = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
assert cp.load() is None
|
||||
|
||||
|
||||
def test_save_then_load_roundtrip(tmp_path: Path) -> None:
|
||||
cp = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
cp.save("cursor-xyz")
|
||||
assert cp.load() == "cursor-xyz"
|
||||
|
||||
|
||||
def test_save_writes_expected_filename(tmp_path: Path) -> None:
|
||||
cp = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
cp.save("cursor-abc")
|
||||
expected = tmp_path / "t212-isa.json"
|
||||
assert expected.exists()
|
||||
raw = json.loads(expected.read_text())
|
||||
assert raw["cursor"] == "cursor-abc"
|
||||
# updated_at is ISO-8601 with tz — just confirm it parses.
|
||||
parsed = datetime.fromisoformat(raw["updated_at"])
|
||||
assert parsed.tzinfo is not None
|
||||
|
||||
|
||||
def test_save_overwrites_previous(tmp_path: Path) -> None:
|
||||
cp = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
cp.save("first")
|
||||
cp.save("second")
|
||||
assert cp.load() == "second"
|
||||
|
||||
|
||||
def test_separate_accounts_are_isolated(tmp_path: Path) -> None:
|
||||
a = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
b = Checkpoint(tmp_path, provider="t212", account_id="gia")
|
||||
a.save("isa-cursor")
|
||||
b.save("gia-cursor")
|
||||
assert a.load() == "isa-cursor"
|
||||
assert b.load() == "gia-cursor"
|
||||
|
||||
|
||||
def test_save_creates_parent_dir(tmp_path: Path) -> None:
|
||||
nested = tmp_path / "a" / "b" / "c"
|
||||
cp = Checkpoint(nested, provider="t212", account_id="isa")
|
||||
cp.save("cursor")
|
||||
assert (nested / "t212-isa.json").exists()
|
||||
|
||||
|
||||
def test_load_rejects_legacy_future_time(tmp_path: Path) -> None:
|
||||
# Malformed file (missing 'cursor' key) → treat as absent rather than crash.
|
||||
p = tmp_path / "t212-isa.json"
|
||||
p.write_text(json.dumps({"updated_at": datetime.now(UTC).isoformat()}))
|
||||
cp = Checkpoint(tmp_path, provider="t212", account_id="isa")
|
||||
assert cp.load() is None
|
||||
Loading…
Add table
Add a link
Reference in a new issue