dot_files/dot_claude/claude-memory/executable_sync-memories.sh

316 lines
11 KiB
Bash
Raw Permalink Normal View History

#!/bin/bash
# Sync local SQLite ↔ remote PostgreSQL for claude-memory.
# Runs as a cron job — sends macOS notification on failure or drift.
# Usage: sync-memories.sh [--quiet]
set -uo pipefail
DB_PATH="$HOME/.claude/claude-memory/memory/memory.db"
API_BASE="https://claude-memory.viktorbarzin.me"
LOCK_FILE="/tmp/claude-memory-sync.lock"
LOG_FILE="$HOME/.claude/claude-memory/sync.log"
QUIET="${1:-}"
# Prevent concurrent runs
if [ -f "$LOCK_FILE" ]; then
pid=$(cat "$LOCK_FILE" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
exit 0
fi
fi
echo $$ > "$LOCK_FILE"
trap 'rm -f "$LOCK_FILE"' EXIT
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >> "$LOG_FILE"
[ "$QUIET" != "--quiet" ] && echo "$*" || true
}
notify_error() {
local title="$1"
local msg="$2"
log "ERROR: $title$msg"
osascript -e "display notification \"$msg\" with title \"Memory Sync\" subtitle \"$title\" sound name \"Basso\"" 2>/dev/null || true
}
notify_info() {
local msg="$1"
log "INFO: $msg"
[ "$QUIET" != "--quiet" ] && osascript -e "display notification \"$msg\" with title \"Memory Sync\" sound name \"Pop\"" 2>/dev/null || true
}
# Read API key from claude.json
API_KEY=$(python3 -c "
import json, os
with open(os.path.expanduser('~/.claude.json')) as f:
d = json.load(f)
mc = d.get('mcpServers', {})
cm = mc.get('claude_memory', mc.get('claude-memory', {}))
h = cm.get('headers', {})
auth = h.get('Authorization', '')
print(auth.replace('Bearer ', ''))
" 2>/dev/null)
if [ -z "$API_KEY" ]; then
notify_error "Config Error" "No API key found in ~/.claude.json"
exit 1
fi
if [ ! -f "$DB_PATH" ]; then
notify_error "DB Missing" "Local SQLite not found at $DB_PATH"
exit 1
fi
# Run the sync — capture stdout (JSON result) and stderr (errors)
SYNC_OUTPUT=$(python3 - "$DB_PATH" "$API_BASE" "$API_KEY" << 'PYEOF'
import sqlite3, json, urllib.request, urllib.error, sys
from datetime import datetime, timezone
DB_PATH = sys.argv[1]
API_BASE = sys.argv[2]
API_KEY = sys.argv[3]
MAX_LEN = 800
errors = []
stats = {"pushed": 0, "pulled": 0, "deleted": 0, "skipped_dupes": 0, "truncated": 0}
def api_request(method, path, body=None):
url = f"{API_BASE}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def truncate(content):
if len(content) <= MAX_LEN:
return content, False
return content[:MAX_LEN - 15] + " [truncated]", True
# Test connectivity + auth
try:
api_request("GET", "/api/auth-check")
except urllib.error.HTTPError as e:
if e.code in (401, 403):
print(json.dumps({"error": f"auth_failed:{e.code}"}))
sys.exit(0)
print(json.dumps({"error": f"api_error:{e.code}"}))
sys.exit(0)
except Exception as e:
print(json.dumps({"error": f"unreachable:{e}"}))
sys.exit(0)
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
# Get remote state
try:
remote = api_request("GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_contents = {m["content"] for m in remote_memories}
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception as e:
errors.append(f"fetch_remote:{e}")
remote_memories = []
remote_contents = set()
remote_ids = set()
server_time = None
# 1. Process pending deletes
pending_deletes = conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='delete'").fetchall()
for op in pending_deletes:
payload = json.loads(op["payload"])
server_id = payload.get("server_id")
if server_id:
try:
api_request("DELETE", f"/api/memories/{server_id}")
stats["deleted"] += 1
except urllib.error.HTTPError as e:
if e.code != 404:
errors.append(f"delete:{server_id}:HTTP{e.code}")
continue
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
conn.commit()
# 2. Process pending stores
pending_stores = conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='store'").fetchall()
for op in pending_stores:
payload = json.loads(op["payload"])
content = payload.get("content", "")
local_id = payload.get("local_id")
if content in remote_contents:
stats["skipped_dupes"] += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
continue
content_to_send, was_truncated = truncate(content)
if was_truncated:
stats["truncated"] += 1
try:
result = api_request("POST", "/api/memories", {
"content": content_to_send,
"category": payload.get("category", "facts"),
"tags": payload.get("tags", ""),
"expanded_keywords": payload.get("expanded_keywords", ""),
"importance": payload.get("importance", 0.5),
})
server_id = result.get("id")
if server_id and local_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (server_id, local_id))
remote_contents.add(content_to_send)
stats["pushed"] += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
except Exception as e:
errors.append(f"store_pending:{op['id']}:{e}")
conn.commit()
# 3. Push orphans
orphans = conn.execute(
"SELECT id, content, category, tags, expanded_keywords, importance FROM memories WHERE server_id IS NULL"
).fetchall()
for orphan in orphans:
if orphan["content"] in remote_contents:
stats["skipped_dupes"] += 1
for m in remote_memories:
if m["content"] == orphan["content"]:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (m["id"], orphan["id"]))
break
continue
content_to_send, was_truncated = truncate(orphan["content"])
if was_truncated:
stats["truncated"] += 1
try:
result = api_request("POST", "/api/memories", {
"content": content_to_send,
"category": orphan["category"],
"tags": orphan["tags"],
"expanded_keywords": orphan["expanded_keywords"],
"importance": orphan["importance"],
})
server_id = result.get("id")
if server_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (server_id, orphan["id"]))
remote_contents.add(content_to_send)
stats["pushed"] += 1
except Exception as e:
errors.append(f"push_orphan:{orphan['id']}:{e}")
conn.commit()
# 4. Full resync pull: remote → local
if remote_memories:
try:
remote = api_request("GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception as e:
errors.append(f"refetch:{e}")
local_rows = conn.execute("SELECT id, server_id FROM memories WHERE server_id IS NOT NULL").fetchall()
for row in local_rows:
if row["server_id"] not in remote_ids:
conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],))
conn.execute("DELETE FROM memories WHERE server_id IS NULL")
for mem in remote_memories:
server_id = mem["id"]
existing = conn.execute("SELECT id FROM memories WHERE server_id = ?", (server_id,)).fetchone()
if existing:
conn.execute(
"""UPDATE memories SET content=?, category=?, tags=?,
expanded_keywords=?, importance=?, is_sensitive=?,
updated_at=? WHERE server_id=?""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", ""), server_id))
else:
conn.execute(
"""INSERT INTO memories (content, category, tags, expanded_keywords,
importance, is_sensitive, created_at, updated_at, server_id)
VALUES (?,?,?,?,?,?,?,?,?)""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", ""), mem.get("updated_at", ""), server_id))
stats["pulled"] += 1
if server_time:
conn.execute("INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)", (server_time,))
conn.commit()
# Final state
local_total = conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"]
remaining_ops = conn.execute("SELECT COUNT(*) as c FROM pending_ops").fetchone()["c"]
remaining_orphans = conn.execute("SELECT COUNT(*) as c FROM memories WHERE server_id IS NULL").fetchone()["c"]
conn.close()
remote_total = len(remote_memories)
print(json.dumps({
"local": local_total,
"remote": remote_total,
"pushed": stats["pushed"],
"pulled": stats["pulled"],
"deleted": stats["deleted"],
"truncated": stats["truncated"],
"pending_ops": remaining_ops,
"orphans": remaining_orphans,
"errors": len(errors),
"error_details": errors[:5],
"in_sync": local_total == remote_total and remaining_ops == 0 and remaining_orphans == 0,
}))
PYEOF
)
# Parse the JSON output
if [ -z "$SYNC_OUTPUT" ]; then
notify_error "Sync Failed" "No output from sync script"
exit 1
fi
# Check for top-level error
ERROR=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('error',''))" 2>/dev/null)
if [ -n "$ERROR" ]; then
case "$ERROR" in
auth_failed:*)
notify_error "Auth Failed" "API key rejected (${ERROR#auth_failed:}) — update ~/.claude.json"
;;
unreachable:*)
notify_error "API Unreachable" "Cannot reach claude-memory API"
;;
*)
notify_error "API Error" "$ERROR"
;;
esac
exit 1
fi
# Parse sync results
LOCAL=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('local',0))" 2>/dev/null)
REMOTE=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('remote',0))" 2>/dev/null)
PUSHED=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('pushed',0))" 2>/dev/null)
PULLED=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('pulled',0))" 2>/dev/null)
SYNC_ERRORS=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('errors',0))" 2>/dev/null)
IN_SYNC=$(echo "$SYNC_OUTPUT" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('in_sync',False))" 2>/dev/null)
if [ "$IN_SYNC" = "True" ]; then
if [ "$PUSHED" != "0" ] || [ "$PULLED" != "0" ]; then
notify_info "Synced: +${PUSHED} pushed, +${PULLED} pulled ($LOCAL total)"
else
log "OK: In sync ($LOCAL memories)"
fi
elif [ "$SYNC_ERRORS" != "0" ]; then
notify_error "Sync Errors" "${SYNC_ERRORS} errors, local=$LOCAL remote=$REMOTE"
else
notify_error "Drift" "Local=$LOCAL Remote=$REMOTE"
fi