Add SyncRecordStore for authoritative dedup
Context ------- Wealthfolio's activity `notes` field is user-editable via the UI, so using it as the dedup key would let a single note-edit in Wealthfolio cause the next sync to create a duplicate. Stress-testing the plan flagged this as the top structural risk. This change ----------- - SQLite-backed store at `/data/broker_sync.db` in production; keyed on (provider, account, external_id) so each provider's id space is scoped to its own account. - `INSERT OR IGNORE` makes record() idempotent — second call with the same key is a no-op and preserves the original wealthfolio_activity_id plus first_seen timestamp. - `filter_new()` is the integration point: provider fetches activities, hands them to the store, gets back only the unseen subset to submit to the Wealthfolio sink. - Wealthfolio activity id returned by the API is persisted alongside each record so the HMRC FX reconciliation job can later PATCH the original activity rather than creating a new one. Test plan --------- ## Automated - poetry run pytest tests/test_dedup.py -v → 6 passed - poetry run mypy broker_sync tests → Success: no issues found in 6 source files - poetry run ruff check . → All checks passed! ## Manual Verification Not applicable for this layer — full end-to-end verification happens once a provider + sink land (Phase 1 Trading212 and the auth spike).
This commit is contained in:
parent
a2aa7ec486
commit
a66ef189f6
3 changed files with 149 additions and 3 deletions
80
broker_sync/dedup.py
Normal file
80
broker_sync/dedup.py
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from collections.abc import Iterable
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
from broker_sync.models import Activity
|
||||
|
||||
_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS sync_record (
|
||||
provider TEXT NOT NULL,
|
||||
account TEXT NOT NULL,
|
||||
external_id TEXT NOT NULL,
|
||||
wealthfolio_activity_id TEXT,
|
||||
first_seen TEXT NOT NULL,
|
||||
PRIMARY KEY (provider, account, external_id)
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
class SyncRecordStore:
|
||||
"""Authoritative local dedup store.
|
||||
|
||||
Wealthfolio's `notes` field is user-editable, so we cannot rely on it
|
||||
for dedup. This SQLite-backed store is the source of truth for whether
|
||||
a (provider, account, external_id) tuple has been imported.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: Path | str) -> None:
|
||||
self._path = Path(db_path)
|
||||
self._path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self._conn() as c:
|
||||
c.executescript(_SCHEMA)
|
||||
|
||||
def _conn(self) -> sqlite3.Connection:
|
||||
c = sqlite3.connect(self._path)
|
||||
c.row_factory = sqlite3.Row
|
||||
return c
|
||||
|
||||
def has_seen(self, provider: str, account: str, external_id: str) -> bool:
|
||||
with self._conn() as c:
|
||||
row = c.execute(
|
||||
"SELECT 1 FROM sync_record "
|
||||
"WHERE provider=? AND account=? AND external_id=?",
|
||||
(provider, account, external_id),
|
||||
).fetchone()
|
||||
return row is not None
|
||||
|
||||
def record(
|
||||
self,
|
||||
provider: str,
|
||||
account: str,
|
||||
external_id: str,
|
||||
wealthfolio_activity_id: str | None,
|
||||
first_seen: datetime | None = None,
|
||||
) -> None:
|
||||
ts = (first_seen or datetime.now(UTC)).isoformat()
|
||||
with self._conn() as c:
|
||||
c.execute(
|
||||
"INSERT OR IGNORE INTO sync_record "
|
||||
"(provider, account, external_id, wealthfolio_activity_id, first_seen) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
(provider, account, external_id, wealthfolio_activity_id, ts),
|
||||
)
|
||||
c.commit()
|
||||
|
||||
def get(self, provider: str, account: str, external_id: str) -> dict[str, str | None] | None:
|
||||
with self._conn() as c:
|
||||
row = c.execute(
|
||||
"SELECT wealthfolio_activity_id, first_seen FROM sync_record "
|
||||
"WHERE provider=? AND account=? AND external_id=?",
|
||||
(provider, account, external_id),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return dict(row)
|
||||
|
||||
def filter_new(self, provider: str, activities: Iterable[Activity]) -> list[Activity]:
|
||||
return [a for a in activities if not self.has_seen(provider, a.account_id, a.external_id)]
|
||||
|
|
@ -78,9 +78,8 @@ class Activity:
|
|||
notes: str | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.activity_type in _QTY_PRICE_TYPES and (
|
||||
self.quantity is None or self.unit_price is None
|
||||
):
|
||||
if self.activity_type in _QTY_PRICE_TYPES and (self.quantity is None
|
||||
or self.unit_price is None):
|
||||
raise ValueError(f"{self.activity_type} requires quantity and unit_price")
|
||||
if self.activity_type in _AMOUNT_TYPES and self.amount is None:
|
||||
raise ValueError(f"{self.activity_type} requires amount")
|
||||
|
|
|
|||
67
tests/test_dedup.py
Normal file
67
tests/test_dedup.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
from datetime import UTC, datetime
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
|
||||
from broker_sync.dedup import SyncRecordStore
|
||||
from broker_sync.models import AccountType, Activity, ActivityType
|
||||
|
||||
|
||||
def _buy(external_id: str) -> Activity:
|
||||
return Activity(
|
||||
external_id=external_id,
|
||||
account_id="t212-isa",
|
||||
account_type=AccountType.ISA,
|
||||
date=datetime(2026, 1, 1, tzinfo=UTC),
|
||||
activity_type=ActivityType.BUY,
|
||||
symbol="VUAG",
|
||||
quantity=Decimal("1"),
|
||||
unit_price=Decimal("100"),
|
||||
currency="GBP",
|
||||
)
|
||||
|
||||
|
||||
def test_store_schema_is_idempotent(tmp_path: Path) -> None:
|
||||
db = tmp_path / "s.db"
|
||||
SyncRecordStore(db) # creates schema
|
||||
SyncRecordStore(db) # second open must not raise
|
||||
assert db.exists()
|
||||
|
||||
|
||||
def test_has_seen_returns_false_for_new(tmp_path: Path) -> None:
|
||||
s = SyncRecordStore(tmp_path / "s.db")
|
||||
assert s.has_seen("trading212", "t212-isa", "order-1") is False
|
||||
|
||||
|
||||
def test_record_then_has_seen(tmp_path: Path) -> None:
|
||||
s = SyncRecordStore(tmp_path / "s.db")
|
||||
s.record("trading212", "t212-isa", "order-1", wealthfolio_activity_id="wf-42")
|
||||
assert s.has_seen("trading212", "t212-isa", "order-1") is True
|
||||
# Same (provider, account, external_id) from a different caller is still seen.
|
||||
assert s.has_seen("trading212", "t212-isa", "order-1") is True
|
||||
|
||||
|
||||
def test_record_is_idempotent(tmp_path: Path) -> None:
|
||||
s = SyncRecordStore(tmp_path / "s.db")
|
||||
s.record("trading212", "t212-isa", "order-1", wealthfolio_activity_id="wf-42")
|
||||
s.record("trading212", "t212-isa", "order-1", wealthfolio_activity_id="wf-43")
|
||||
# Second insert must not raise. Original first_seen / wealthfolio id preserved.
|
||||
stored = s.get("trading212", "t212-isa", "order-1")
|
||||
assert stored is not None
|
||||
assert stored["wealthfolio_activity_id"] == "wf-42"
|
||||
|
||||
|
||||
def test_scope_per_provider_and_account(tmp_path: Path) -> None:
|
||||
s = SyncRecordStore(tmp_path / "s.db")
|
||||
s.record("trading212", "t212-isa", "order-1", wealthfolio_activity_id="wf-1")
|
||||
# Different provider, same external_id — NOT seen.
|
||||
assert s.has_seen("invest-engine", "t212-isa", "order-1") is False
|
||||
# Different account, same external_id — NOT seen.
|
||||
assert s.has_seen("trading212", "t212-invest", "order-1") is False
|
||||
|
||||
|
||||
def test_filter_new_drops_seen(tmp_path: Path) -> None:
|
||||
s = SyncRecordStore(tmp_path / "s.db")
|
||||
s.record("trading212", "t212-isa", "a", wealthfolio_activity_id=None)
|
||||
activities = [_buy("a"), _buy("b"), _buy("c")]
|
||||
fresh = s.filter_new("trading212", activities)
|
||||
assert [a.external_id for a in fresh] == ["b", "c"]
|
||||
Loading…
Add table
Add a link
Reference in a new issue