dot_files/dot_claude/hooks/memory-sync.py

241 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""Claude Code hook: sync local SQLite ↔ remote PostgreSQL for claude-memory.
Used as SessionStart hook (full sync) and Stop hook (push pending).
Outputs additional context to stderr for the hook system.
Sends macOS notification on sync failure.
"""
import json
import os
import sqlite3
import subprocess
import sys
import urllib.error
import urllib.request
from datetime import datetime, timezone
DB_PATH = os.path.expanduser("~/.claude/claude-memory/memory/memory.db")
MAX_LEN = 800
def get_api_config():
"""Read API key from ~/.claude.json SSE config."""
config_path = os.path.expanduser("~/.claude.json")
try:
with open(config_path) as f:
d = json.load(f)
mc = d.get("mcpServers", {})
cm = mc.get("claude_memory", mc.get("claude-memory", {}))
headers = cm.get("headers", {})
auth = headers.get("Authorization", "")
key = auth.replace("Bearer ", "").strip()
url = cm.get("url", "").replace("/mcp/sse", "")
return url, key
except Exception:
return None, None
def api_request(api_base, api_key, method, path, body=None):
url = f"{api_base}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
def notify(title, msg):
"""Send macOS notification."""
try:
subprocess.run([
"osascript", "-e",
f'display notification "{msg}" with title "Memory Sync" subtitle "{title}" sound name "Basso"'
], capture_output=True, timeout=5)
except Exception:
pass
def truncate(content):
if len(content) <= MAX_LEN:
return content
return content[:MAX_LEN - 15] + " [truncated]"
def full_sync(api_base, api_key):
"""Full bidirectional sync. Returns (pushed, pulled, errors)."""
if not os.path.exists(DB_PATH):
return 0, 0, ["no_local_db"]
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
errors = []
pushed = 0
pulled = 0
# Get remote state
try:
remote = api_request(api_base, api_key, "GET", "/api/memories/sync")
except Exception as e:
conn.close()
return 0, 0, [f"fetch_failed:{e}"]
remote_memories = remote.get("memories", [])
remote_contents = {m["content"] for m in remote_memories}
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
# Push pending deletes
for op in conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='delete'").fetchall():
payload = json.loads(op["payload"])
sid = payload.get("server_id")
if sid:
try:
api_request(api_base, api_key, "DELETE", f"/api/memories/{sid}")
except urllib.error.HTTPError as e:
if e.code != 404:
errors.append(f"del:{sid}")
continue
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
pushed += 1
conn.commit()
# Push pending stores
for op in conn.execute("SELECT id, payload FROM pending_ops WHERE op_type='store'").fetchall():
payload = json.loads(op["payload"])
content = payload.get("content", "")
local_id = payload.get("local_id")
if content in remote_contents:
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
continue
try:
result = api_request(api_base, api_key, "POST", "/api/memories", {
"content": truncate(content),
"category": payload.get("category", "facts"),
"tags": payload.get("tags", ""),
"expanded_keywords": payload.get("expanded_keywords", ""),
"importance": payload.get("importance", 0.5),
})
sid = result.get("id")
if sid and local_id:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (sid, local_id))
remote_contents.add(content)
pushed += 1
conn.execute("DELETE FROM pending_ops WHERE id = ?", (op["id"],))
except Exception as e:
errors.append(f"store:{op['id']}")
conn.commit()
# Push orphans
for orphan in conn.execute(
"SELECT id, content, category, tags, expanded_keywords, importance FROM memories WHERE server_id IS NULL"
).fetchall():
if orphan["content"] in remote_contents:
for m in remote_memories:
if m["content"] == orphan["content"]:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (m["id"], orphan["id"]))
break
continue
try:
result = api_request(api_base, api_key, "POST", "/api/memories", {
"content": truncate(orphan["content"]),
"category": orphan["category"],
"tags": orphan["tags"],
"expanded_keywords": orphan["expanded_keywords"],
"importance": orphan["importance"],
})
sid = result.get("id")
if sid:
conn.execute("UPDATE memories SET server_id = ? WHERE id = ?", (sid, orphan["id"]))
remote_contents.add(orphan["content"])
pushed += 1
except Exception as e:
errors.append(f"orphan:{orphan['id']}")
conn.commit()
# Pull: re-fetch remote and do full resync
try:
remote = api_request(api_base, api_key, "GET", "/api/memories/sync")
remote_memories = remote.get("memories", [])
remote_ids = {m["id"] for m in remote_memories}
server_time = remote.get("server_time")
except Exception:
pass
# Remove stale local
for row in conn.execute("SELECT id, server_id FROM memories WHERE server_id IS NOT NULL").fetchall():
if row["server_id"] not in remote_ids:
conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],))
conn.execute("DELETE FROM memories WHERE server_id IS NULL")
# Upsert server records
for mem in remote_memories:
sid = mem["id"]
existing = conn.execute("SELECT id FROM memories WHERE server_id = ?", (sid,)).fetchone()
if existing:
conn.execute(
"""UPDATE memories SET content=?, category=?, tags=?,
expanded_keywords=?, importance=?, is_sensitive=?,
updated_at=? WHERE server_id=?""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", ""), sid))
else:
conn.execute(
"""INSERT INTO memories (content, category, tags, expanded_keywords,
importance, is_sensitive, created_at, updated_at, server_id)
VALUES (?,?,?,?,?,?,?,?,?)""",
(mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", ""), mem.get("updated_at", ""), sid))
pulled += 1
if server_time:
conn.execute("INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)", (server_time,))
conn.commit()
local_total = conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"]
conn.close()
return pushed, pulled, errors
def main():
api_base, api_key = get_api_config()
if not api_base or not api_key:
# No SSE config — nothing to sync
sys.exit(0)
# Quick connectivity + auth check
try:
api_request(api_base, api_key, "GET", "/api/auth-check")
except urllib.error.HTTPError as e:
if e.code in (401, 403):
notify("Auth Failed", f"Memory API key rejected (HTTP {e.code})")
hook_output = {"additional_context": f"Memory sync auth failed (HTTP {e.code}). API key in ~/.claude.json may need updating."}
print(json.dumps(hook_output))
sys.exit(0)
notify("API Error", f"Memory API returned HTTP {e.code}")
sys.exit(0)
except Exception as e:
notify("Unreachable", "Cannot reach claude-memory API")
hook_output = {"additional_context": "Memory sync failed: API unreachable. Local SQLite may be stale."}
print(json.dumps(hook_output))
sys.exit(0)
pushed, pulled, errors = full_sync(api_base, api_key)
if errors:
notify("Sync Errors", f"{len(errors)} errors during memory sync")
hook_output = {"additional_context": f"Memory sync completed with {len(errors)} errors. Pushed {pushed}, pulled {pulled}."}
print(json.dumps(hook_output))
elif pushed > 0 or pulled > 0:
hook_output = {"additional_context": f"Memory sync: pushed {pushed}, pulled {pulled} memories."}
print(json.dumps(hook_output))
# If nothing changed, output nothing (silent success)
if __name__ == "__main__":
main()