add memory sync hook + cron script for local↔remote SQLite sync

This commit is contained in:
Viktor Barzin 2026-03-22 22:47:29 +02:00
parent e77e3567d8
commit de205cb692
No known key found for this signature in database
GPG key ID: 0EB088298288D958
3 changed files with 582 additions and 6 deletions

View file

@ -0,0 +1,315 @@
#!/bin/bash
# Sync local SQLite ↔ remote PostgreSQL for claude-memory.
# Runs as a cron job — sends macOS notification on failure or drift.
# Usage: sync-memories.sh [--quiet]
set -uo pipefail
DB_PATH="$HOME/.claude/claude-memory/memory/memory.db"
API_BASE="https://claude-memory.viktorbarzin.me"
LOCK_FILE="/tmp/claude-memory-sync.lock"
LOG_FILE="$HOME/.claude/claude-memory/sync.log"
QUIET="${1:-}"
# Prevent concurrent runs
if [ -f "$LOCK_FILE" ]; then
pid=$(cat "$LOCK_FILE" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
exit 0
fi
fi
echo $$ > "$LOCK_FILE"
trap 'rm -f "$LOCK_FILE"' EXIT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >> "$LOG_FILE"
[ "$QUIET" != "--quiet" ] && echo "$*" || true
}
notify_error() {
local title="$1"
local msg="$2"
log "ERROR: $title$msg"
osascript -e "display notification \"$msg\" with title \"Memory Sync\" subtitle \"$title\" sound name \"Basso\"" 2>/dev/null || true
}
notify_info() {
local msg="$1"
log "INFO: $msg"
[ "$QUIET" != "--quiet" ] && osascript -e "display notification \"$msg\" with title \"Memory Sync\" sound name \"Pop\"" 2>/dev/null || true
}
# Read API key from claude.json
API_KEY=$(python3 -c "
import json, os
with open(os.path.expanduser('~/.claude.json')) as f:
d = json.load(f)
mc = d.get('mcpServers', {})
cm = mc.get('claude_memory', mc.get('claude-memory', {}))
h = cm.get('headers', {})
auth = h.get('Authorization', '')
print(auth.replace('Bearer ', ''))
" 2>/dev/null)
if [ -z "$API_KEY" ]; then
notify_error "Config Error" "No API key found in ~/.claude.json"
exit 1
fi
if [ ! -f "$DB_PATH" ]; then
notify_error "DB Missing" "Local SQLite not found at $DB_PATH"
exit 1
fi
# Run the sync — capture stdout (JSON result) and stderr (errors)
SYNC_OUTPUT=$(python3 - "$DB_PATH" "$API_BASE" "$API_KEY" << 'PYEOF'
import sqlite3, json, urllib.request, urllib.error, sys
from datetime import datetime, timezone
DB_PATH = sys.argv[1]
API_BASE = sys.argv[2]
API_KEY = sys.argv[3]
MAX_LEN = 800
errors = []
stats = {"pushed": 0, "pulled": 0, "deleted": 0, "skipped_dupes": 0, "truncated": 0}
def api_request(method, path, body=None):
url = f"{API_BASE}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def truncate(content):
if len(content) <= MAX_LEN:
return content, False
return content[:MAX_LEN - 15] + " [truncated]", True
# Test connectivity + auth
try:
api_request("GET", "/api/auth-check")
except urllib.error.HTTPError as e:
if e.code in (401, 403):
print(json.dumps({"error": f"auth_failed:{e.code}"}))
sys.exit(0)
print(json.dumps({"error": f"api_error:{e.code}"}))
sys.exit(0)
except Exception as e:
print(json.dumps({"error": f"unreachable:{e}"}))
sys.exit(0)
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
# Get remote state
try:
remote = api_request("GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_contents = {m["content"] for m in remote_memories}
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception as e:
errors.append(f"fetch_remote:{e}")
remote_memories = []
remote_contents = set()
remote_ids = set()
server_time = None
# 1. Process pending deletes
pending_deletes = conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='delete'").fetchall()
for op in pending_deletes:
payload = json.loads(op["payload"])
server_id = payload.get("server_id")
if server_id:
try:
api_request("DELETE", f"/api/memories/{server_id}")
stats["deleted"] += 1
except urllib.error.HTTPError as e:
if e.code != 404:
errors.append(f"delete:{server_id}:HTTP{e.code}")
continue
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
conn.commit()
# 2. Process pending stores
pending_stores = conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='store'").fetchall()
for op in pending_stores:
payload = json.loads(op["payload"])
content = payload.get("content", "")
local_id = payload.get("local_id")
if content in remote_contents:
stats["skipped_dupes"] += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
continue
content_to_send, was_truncated = truncate(content)
if was_truncated:
stats["truncated"] += 1
try:
result = api_request("POST", "/api/memories", {
"content": content_to_send,
"category": payload.get("category", "facts"),
"tags": payload.get("tags", ""),
"expanded_keywords": payload.get("expanded_keywords", ""),
"importance": payload.get("importance", 0.5),
})
server_id = result.get("id")
if server_id and local_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (server_id, local_id))
remote_contents.add(content_to_send)
stats["pushed"] += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
except Exception as e:
errors.append(f"store_pending:{op['id']}:{e}")
conn.commit()
# 3. Push orphans
orphans = conn.execute(
"SELECT id, content, category, tags, expanded_keywords, importance FROM memories WHERE server_id IS NULL"
).fetchall()
for orphan in orphans:
if orphan["content"] in remote_contents:
stats["skipped_dupes"] += 1
for m in remote_memories:
if m["content"] == orphan["content"]:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (m["id"], orphan["id"]))
break
continue
content_to_send, was_truncated = truncate(orphan["content"])
if was_truncated:
stats["truncated"] += 1
try:
result = api_request("POST", "/api/memories", {
"content": content_to_send,
"category": orphan["category"],
"tags": orphan["tags"],
"expanded_keywords": orphan["expanded_keywords"],
"importance": orphan["importance"],
})
server_id = result.get("id")
if server_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (server_id, orphan["id"]))
remote_contents.add(content_to_send)
stats["pushed"] += 1
except Exception as e:
errors.append(f"push_orphan:{orphan['id']}:{e}")
conn.commit()
# 4. Full resync pull: remote → local
if remote_memories:
try:
remote = api_request("GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception as e:
errors.append(f"refetch:{e}")
local_rows = conn.execute("SELECT id, server_id FROM memories WHERE server_id IS NOT NULL").fetchall()
for row in local_rows:
if row["server_id"] not in remote_ids:
conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],))
conn.execute("DELETE FROM memories WHERE server_id IS NULL")
for mem in remote_memories:
server_id = mem["id"]
existing = conn.execute("SELECT id FROM memories WHERE server_id = ?", (server_id,)).fetchone()
if existing:
conn.execute(
"""UPDATE memories SET content=?, category=?, tags=?,
expanded_keywords=?, importance=?, is_sensitive=?,
updated_at=? WHERE server_id=?""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", ""), server_id))
else:
conn.execute(
"""INSERT INTO memories (content, category, tags, expanded_keywords,
importance, is_sensitive, created_at, updated_at, server_id)
VALUES (?,?,?,?,?,?,?,?,?)""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", ""), mem.get("updated_at", ""), server_id))
stats["pulled"] += 1
if server_time:
conn.execute("INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)", (server_time,))
conn.commit()
# Final state
local_total = conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"]
remaining_ops = conn.execute("SELECT COUNT(*) as c FROM pending_ops").fetchone()["c"]
remaining_orphans = conn.execute("SELECT COUNT(*) as c FROM memories WHERE server_id IS NULL").fetchone()["c"]
conn.close()
remote_total = len(remote_memories)
print(json.dumps({
"local": local_total,
"remote": remote_total,
"pushed": stats["pushed"],
"pulled": stats["pulled"],
"deleted": stats["deleted"],
"truncated": stats["truncated"],
"pending_ops": remaining_ops,
"orphans": remaining_orphans,
"errors": len(errors),
"error_details": errors[:5],
"in_sync": local_total == remote_total and remaining_ops == 0 and remaining_orphans == 0,
}))
PYEOF
)
# Parse the JSON output
if [ -z "$SYNC_OUTPUT" ]; then
notify_error "Sync Failed" "No output from sync script"
exit 1
fi
# Check for top-level error
ERROR=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('error',''))" 2>/dev/null)
if [ -n "$ERROR" ]; then
case "$ERROR" in
auth_failed:*)
notify_error "Auth Failed" "API key rejected (${ERROR#auth_failed:}) — update ~/.claude.json"
;;
unreachable:*)
notify_error "API Unreachable" "Cannot reach claude-memory API"
;;
*)
notify_error "API Error" "$ERROR"
;;
esac
exit 1
fi
# Parse sync results
LOCAL=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('local',0))" 2>/dev/null)
REMOTE=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('remote',0))" 2>/dev/null)
PUSHED=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('pushed',0))" 2>/dev/null)
PULLED=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('pulled',0))" 2>/dev/null)
SYNC_ERRORS=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('errors',0))" 2>/dev/null)
IN_SYNC=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('in_sync',False))" 2>/dev/null)
if [ "$IN_SYNC" = "True" ]; then
if [ "$PUSHED" != "0" ] || [ "$PULLED" != "0" ]; then
notify_info "Synced: +${PUSHED} pushed, +${PULLED} pulled ($LOCAL total)"
else
log "OK: In sync ($LOCAL memories)"
fi
elif [ "$SYNC_ERRORS" != "0" ]; then
notify_error "Sync Errors" "${SYNC_ERRORS} errors, local=$LOCAL remote=$REMOTE"
else
notify_error "Drift" "Local=$LOCAL Remote=$REMOTE"
fi

View file

@ -0,0 +1,241 @@
#!/usr/bin/env python3
"""Claude Code hook: sync local SQLite ↔ remote PostgreSQL for claude-memory.
Used as SessionStart hook (full sync) and Stop hook (push pending).
Outputs additional context to stderr for the hook system.
Sends macOS notification on sync failure.
"""
import json
import os
import sqlite3
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
DB_PATH = os.path.expanduser("~/.claude/claude-memory/memory/memory.db")
MAX_LEN = 800
def get_api_config():
"""Read API key from ~/.claude.json SSE config."""
config_path = os.path.expanduser("~/.claude.json")
try:
with open(config_path) as f:
d = json.load(f)
mc = d.get("mcpServers", {})
cm = mc.get("claude_memory", mc.get("claude-memory", {}))
headers = cm.get("headers", {})
auth = headers.get("Authorization", "")
key = auth.replace("Bearer ", "").strip()
url = cm.get("url", "").replace("/mcp/sse", "")
return url, key
except Exception:
return None, None
def api_request(api_base, api_key, method, path, body=None):
url = f"{api_base}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
def notify(title, msg):
"""Send macOS notification."""
try:
subprocess.run([
"osascript", "-e",
f'display notification "{msg}" with title "Memory Sync" subtitle "{title}" sound name "Basso"'
], capture_output=True, timeout=5)
except Exception:
pass
def truncate(content):
if len(content) <= MAX_LEN:
return content
return content[:MAX_LEN - 15] + " [truncated]"
def full_sync(api_base, api_key):
"""Full bidirectional sync. Returns (pushed, pulled, errors)."""
if not os.path.exists(DB_PATH):
return 0, 0, ["no_local_db"]
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
errors = []
pushed = 0
pulled = 0
# Get remote state
try:
remote = api_request(api_base, api_key, "GET", "/api/memories/sync")
except Exception as e:
conn.close()
return 0, 0, [f"fetch_failed:{e}"]
remote_memories = remote.get("memories", [])
remote_contents = {m["content"] for m in remote_memories}
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
# Push pending deletes
for op in conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='delete'").fetchall():
payload = json.loads(op["payload"])
sid = payload.get("server_id")
if sid:
try:
api_request(api_base, api_key, "DELETE", f"/api/memories/{sid}")
except urllib.error.HTTPError as e:
if e.code != 404:
errors.append(f"del:{sid}")
continue
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
pushed += 1
conn.commit()
# Push pending stores
for op in conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='store'").fetchall():
payload = json.loads(op["payload"])
content = payload.get("content", "")
local_id = payload.get("local_id")
if content in remote_contents:
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
continue
try:
result = api_request(api_base, api_key, "POST", "/api/memories", {
"content": truncate(content),
"category": payload.get("category", "facts"),
"tags": payload.get("tags", ""),
"expanded_keywords": payload.get("expanded_keywords", ""),
"importance": payload.get("importance", 0.5),
})
sid = result.get("id")
if sid and local_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (sid, local_id))
remote_contents.add(content)
pushed += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
except Exception as e:
errors.append(f"store:{op['id']}")
conn.commit()
# Push orphans
for orphan in conn.execute(
"SELECT id, content, category, tags, expanded_keywords, importance FROM memories WHERE server_id IS NULL"
).fetchall():
if orphan["content"] in remote_contents:
for m in remote_memories:
if m["content"] == orphan["content"]:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (m["id"], orphan["id"]))
break
continue
try:
result = api_request(api_base, api_key, "POST", "/api/memories", {
"content": truncate(orphan["content"]),
"category": orphan["category"],
"tags": orphan["tags"],
"expanded_keywords": orphan["expanded_keywords"],
"importance": orphan["importance"],
})
sid = result.get("id")
if sid:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (sid, orphan["id"]))
remote_contents.add(orphan["content"])
pushed += 1
except Exception as e:
errors.append(f"orphan:{orphan['id']}")
conn.commit()
# Pull: re-fetch remote and do full resync
try:
remote = api_request(api_base, api_key, "GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception:
pass
# Remove stale local
for row in conn.execute("SELECT id, server_id FROM memories WHERE server_id IS NOT NULL").fetchall():
if row["server_id"] not in remote_ids:
conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],))
conn.execute("DELETE FROM memories WHERE server_id IS NULL")
# Upsert server records
for mem in remote_memories:
sid = mem["id"]
existing = conn.execute("SELECT id FROM memories WHERE server_id = ?", (sid,)).fetchone()
if existing:
conn.execute(
"""UPDATE memories SET content=?, category=?, tags=?,
expanded_keywords=?, importance=?, is_sensitive=?,
updated_at=? WHERE server_id=?""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", ""), sid))
else:
conn.execute(
"""INSERT INTO memories (content, category, tags, expanded_keywords,
importance, is_sensitive, created_at, updated_at, server_id)
VALUES (?,?,?,?,?,?,?,?,?)""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", ""), mem.get("updated_at", ""), sid))
pulled += 1
if server_time:
conn.execute("INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)", (server_time,))
conn.commit()
local_total = conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"]
conn.close()
return pushed, pulled, errors
def main():
api_base, api_key = get_api_config()
if not api_base or not api_key:
# No SSE config — nothing to sync
sys.exit(0)
# Quick connectivity + auth check
try:
api_request(api_base, api_key, "GET", "/api/auth-check")
except urllib.error.HTTPError as e:
if e.code in (401, 403):
notify("Auth Failed", f"Memory API key rejected (HTTP {e.code})")
hook_output = {"additional_context": f"Memory sync auth failed (HTTP {e.code}). API key in ~/.claude.json may need updating."}
print(json.dumps(hook_output))
sys.exit(0)
notify("API Error", f"Memory API returned HTTP {e.code}")
sys.exit(0)
except Exception as e:
notify("Unreachable", "Cannot reach claude-memory API")
hook_output = {"additional_context": "Memory sync failed: API unreachable. Local SQLite may be stale."}
print(json.dumps(hook_output))
sys.exit(0)
pushed, pulled, errors = full_sync(api_base, api_key)
if errors:
notify("Sync Errors", f"{len(errors)} errors during memory sync")
hook_output = {"additional_context": f"Memory sync completed with {len(errors)} errors. Pushed {pushed}, pulled {pulled}."}
print(json.dumps(hook_output))
elif pushed > 0 or pulled > 0:
hook_output = {"additional_context": f"Memory sync: pushed {pushed}, pulled {pulled} memories."}
print(json.dumps(hook_output))
# If nothing changed, output nothing (silent success)
if __name__ == "__main__":
main()

View file

@ -52,6 +52,18 @@
]
}
],
"PostToolUse": [
{
"matcher": "Bash",
"hooks": [
{
"type": "command",
"command": "python3 /Users/viktorbarzin/.claude/hooks/post-commit-memory.py",
"timeout": 5
}
]
}
],
"PreCompact": [
{
"hooks": [
@ -71,6 +83,15 @@
"command": "node \"/Users/viktorbarzin/.claude/hooks/gsd-check-update.js\""
}
]
},
{
"hooks": [
{
"type": "command",
"command": "python3 /Users/viktorbarzin/.claude/hooks/memory-sync.py",
"timeout": 30
}
]
}
],
"Stop": [
@ -82,16 +103,14 @@
"async": true
}
]
}
],
"PostToolUse": [
},
{
"matcher": "Bash",
"hooks": [
{
"type": "command",
"command": "python3 /Users/viktorbarzin/.claude/hooks/post-commit-memory.py",
"timeout": 5
"command": "python3 /Users/viktorbarzin/.claude/hooks/memory-sync.py",
"timeout": 30,
"async": true
}
]
}
@ -146,6 +165,7 @@
"para-workspace@claude-templates": true,
"pyright-lsp@claude-plugins-official": true,
"ralph-loop@claude-plugins-official": true,
"sl@Meta": true,
"superpowers@claude-plugins-official": true,
"trajectory@Meta": true,
"typescript-lsp@claude-plugins-official": true