Add local-first-sqlite-sync-pattern skill
This commit is contained in:
parent
fbdf6e6930
commit
3ee8729ee8
1 changed files with 216 additions and 0 deletions
216
dot_claude/skills/local-first-sqlite-sync-pattern/SKILL.md
Normal file
216
dot_claude/skills/local-first-sqlite-sync-pattern/SKILL.md
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
---
|
||||
name: local-first-sqlite-sync-pattern
|
||||
description: |
|
||||
Add offline-resilient local SQLite cache with background sync to a service that
|
||||
currently does direct HTTP calls to a remote API. Use when: (1) an MCP server or
|
||||
CLI tool makes synchronous HTTP calls that fail when the API is down, (2) you want
|
||||
reads to be instant (local) while writes eventually sync to a central server,
|
||||
(3) you need a pending_ops queue so writes survive API outages, (4) you want
|
||||
backward-compatible mode detection (SQLite-only / hybrid / HTTP-only). Covers
|
||||
the SyncEngine pattern, server_id mapping, soft delete for sync, try-sync-then-queue
|
||||
writes, and incremental pull via updated_at timestamps.
|
||||
author: Claude Code
|
||||
version: 1.0.0
|
||||
date: 2026-03-14
|
||||
---
|
||||
|
||||
# Local-First SQLite Cache with Background Sync
|
||||
|
||||
## Problem
|
||||
|
||||
A service makes direct HTTP calls to a remote API for every operation. When the API
|
||||
goes down (deploy, network issue, DB failure), every call fails immediately. There's
|
||||
no caching, no retry, no graceful degradation.
|
||||
|
||||
## Context / Trigger Conditions
|
||||
|
||||
- MCP server, CLI tool, or local service that talks to a remote API
|
||||
- Reads should be fast regardless of network conditions
|
||||
- Writes should not be lost during API outages
|
||||
- Multiple clients need to eventually converge on the same data
|
||||
- You need backward compatibility with existing direct-HTTP mode
|
||||
|
||||
## Solution
|
||||
|
||||
### 1. Three-Mode Detection (backward compatible)
|
||||
|
||||
```python
|
||||
SYNC_DISABLED = os.environ.get("SYNC_DISABLE", "") == "1"
|
||||
HYBRID_MODE = bool(API_KEY) and not SYNC_DISABLED # local cache + sync
|
||||
HTTP_ONLY = bool(API_KEY) and SYNC_DISABLED # legacy direct HTTP
|
||||
SQLITE_ONLY = not API_KEY # standalone local
|
||||
```
|
||||
|
||||
This preserves existing behavior: no API key = local-only, API key + disable flag = direct HTTP.
|
||||
|
||||
### 2. Local SQLite Schema Additions
|
||||
|
||||
Add to the existing table:
|
||||
```sql
|
||||
ALTER TABLE items ADD COLUMN server_id INTEGER;
|
||||
CREATE UNIQUE INDEX idx_items_server_id ON items(server_id);
|
||||
```
|
||||
|
||||
Add sync infrastructure tables:
|
||||
```sql
|
||||
CREATE TABLE pending_ops (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
op_type TEXT NOT NULL, -- 'store' or 'delete'
|
||||
payload TEXT NOT NULL, -- JSON
|
||||
created_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE sync_meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL -- e.g., last_sync_ts
|
||||
);
|
||||
```
|
||||
|
||||
### 3. SyncEngine (background thread)
|
||||
|
||||
Key design decisions:
|
||||
- **Own SQLite connection** with a threading lock (SQLite is not thread-safe by default)
|
||||
- **Daemon thread** so it dies when the main process exits
|
||||
- **Initial sync is blocking** (runs before first tool call to populate cache)
|
||||
- **Background sync** runs every N seconds (configurable, default 60s)
|
||||
|
||||
```python
|
||||
class SyncEngine:
|
||||
def __init__(self, db_path, api_base_url, api_key, sync_interval=60):
|
||||
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
||||
self._lock = threading.Lock()
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
def start(self):
|
||||
# 1. Initial sync (blocking) — populate cache before first use
|
||||
try:
|
||||
self._sync_once()
|
||||
except Exception:
|
||||
pass # Start in offline mode
|
||||
# 2. Background daemon thread
|
||||
self._thread = threading.Thread(target=self._sync_loop, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
def _sync_loop(self):
|
||||
while not self._stop_event.is_set():
|
||||
self._stop_event.wait(self.sync_interval)
|
||||
if self._stop_event.is_set():
|
||||
break
|
||||
self._sync_once() # push then pull
|
||||
```
|
||||
|
||||
### 4. Write Pattern: Try-Sync-Then-Queue
|
||||
|
||||
For writes, don't just queue blindly — try synchronous push first for lower latency:
|
||||
|
||||
```python
|
||||
def try_sync_store(self, local_id, **data):
|
||||
try:
|
||||
result = self._api_request("POST", "/api/items", data)
|
||||
# Success: update server_id mapping
|
||||
self._conn.execute("UPDATE items SET server_id = ? WHERE id = ?",
|
||||
(result["id"], local_id))
|
||||
return result["id"]
|
||||
except Exception:
|
||||
# Failure: queue for background sync
|
||||
self.enqueue_store(local_id, **data)
|
||||
return None
|
||||
```
|
||||
|
||||
The caller always writes to local SQLite first, then calls `try_sync_store`:
|
||||
```python
|
||||
def store(self, args):
|
||||
result = self._sqlite_store(...) # Always succeeds
|
||||
if HYBRID_MODE:
|
||||
self.sync_engine.try_sync_store(local_id, ...)
|
||||
return result
|
||||
```
|
||||
|
||||
### 5. Server-Side: Soft Delete for Sync
|
||||
|
||||
Change `DELETE` to soft delete so sync clients can detect deletions:
|
||||
|
||||
```sql
|
||||
-- Instead of: DELETE FROM items WHERE id = $1
|
||||
UPDATE items SET deleted_at = NOW(), updated_at = NOW() WHERE id = $1
|
||||
```
|
||||
|
||||
Add `deleted_at IS NULL` to ALL read queries (list, recall, secret, etc.).
|
||||
|
||||
### 6. Incremental Sync Endpoint
|
||||
|
||||
```
|
||||
GET /api/items/sync?since=<iso_timestamp>
|
||||
```
|
||||
|
||||
- **With `since`**: Returns all items with `updated_at > since` (including soft-deleted)
|
||||
- **Without `since`**: Returns full dump of non-deleted items (initial sync)
|
||||
- Response includes `server_time` for the client to use as next `since` value
|
||||
|
||||
### 7. Pull Logic: Server Wins
|
||||
|
||||
```python
|
||||
def _pull_changes(self):
|
||||
params = f"?since={self.last_sync_ts}" if self.last_sync_ts else ""
|
||||
result = self._api_request("GET", f"/api/items/sync{params}")
|
||||
|
||||
for item in result["memories"]:
|
||||
if item.get("deleted_at"):
|
||||
self._conn.execute("DELETE FROM items WHERE server_id = ?", (item["id"],))
|
||||
else:
|
||||
# UPSERT by server_id
|
||||
existing = self._conn.execute(
|
||||
"SELECT id FROM items WHERE server_id = ?", (item["id"],)
|
||||
).fetchone()
|
||||
if existing:
|
||||
self._conn.execute("UPDATE items SET ... WHERE server_id = ?", ...)
|
||||
else:
|
||||
self._conn.execute("INSERT INTO items (..., server_id) VALUES (...)", ...)
|
||||
|
||||
self.last_sync_ts = result["server_time"]
|
||||
```
|
||||
|
||||
### 8. Push Logic: Clear Queue on Success
|
||||
|
||||
```python
|
||||
def _push_pending_ops(self):
|
||||
ops = self._conn.execute("SELECT * FROM pending_ops ORDER BY id").fetchall()
|
||||
for op in ops:
|
||||
try:
|
||||
if op["op_type"] == "store":
|
||||
self._api_request("POST", "/api/items", json.loads(op["payload"]))
|
||||
elif op["op_type"] == "delete":
|
||||
server_id = json.loads(op["payload"])["server_id"]
|
||||
self._api_request("DELETE", f"/api/items/{server_id}")
|
||||
self._conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
|
||||
except Exception:
|
||||
raise # Stop pushing, retry next cycle
|
||||
```
|
||||
|
||||
Important: if a delete returns 404, treat it as success (already deleted on server).
|
||||
|
||||
## Verification
|
||||
|
||||
1. **Hybrid mode**: Set API key, start server, store a memory, verify it appears in both
|
||||
local SQLite and the remote API
|
||||
2. **Offline resilience**: Kill the API, store/recall — should work via local cache.
|
||||
Check `pending_ops` table has queued writes. Restart API, wait 60s, verify queue drained.
|
||||
3. **Sync correctness**: Store via API directly, wait 60s, verify local cache has it.
|
||||
Delete on server, wait 60s, verify local cache removed it.
|
||||
4. **Backward compat**: No API key = pure SQLite (unchanged). API key + SYNC_DISABLE=1 =
|
||||
pure HTTP (unchanged).
|
||||
|
||||
## Notes
|
||||
|
||||
- `_init_sqlite` must return both `(conn, db_path)` if the SyncEngine needs the resolved
|
||||
path to create its own connection
|
||||
- Use `threading.Event.wait(interval)` instead of `time.sleep()` for the sync loop — it
|
||||
allows clean shutdown via `_stop_event.set()`
|
||||
- The SyncEngine's SQLite connection must use `check_same_thread=False` since it's accessed
|
||||
from the background thread
|
||||
- Always add the `server_id` column migration to the SQLite init (not just the SyncEngine
|
||||
init) so the main connection's schema is consistent
|
||||
- For secrets or sensitive data, prefer API-first reads with local fallback rather than
|
||||
caching sensitive content locally
|
||||
- The `updated_at` index on the server is essential for incremental sync performance:
|
||||
`CREATE INDEX idx_items_updated ON items(updated_at)`
|
||||
Loading…
Add table
Add a link
Reference in a new issue