move claude-memory MCP server to ~/.claude for global access

Previously lived in ~/code/claude-memory-mcp/src/, which was only
accessible from sessions in ~/code/. Now at ~/.claude/claude-memory/
so all Claude sessions can use it regardless of working directory.
This commit is contained in:
Viktor Barzin 2026-03-18 22:29:00 +00:00
parent 68e7cde5c3
commit a748a63e00
No known key found for this signature in database
GPG key ID: 0EB088298288D958
4 changed files with 1291 additions and 2 deletions

View file

@ -0,0 +1,3 @@
"""Claude Memory MCP — standalone memory server with multi-user support."""
__version__ = "1.0.0"

View file

@ -0,0 +1,703 @@
#!/usr/bin/env python3
"""
Claude Memory MCP Server standalone memory server with multi-user support.
Supports three modes:
1. SQLite-only: local file-based storage when no API key is configured
2. Hybrid (default when API key set): local SQLite cache + background sync
3. HTTP-only (legacy): direct HTTP to API, no local cache (MEMORY_SYNC_DISABLE=1)
Uses only stdlib (urllib) no pip install required.
"""
import json
import logging
import os
import sqlite3
import sys
import urllib.error
import urllib.request
from typing import Any
logger = logging.getLogger(__name__)
PROTOCOL_VERSION = "2024-11-05"
SERVER_NAME = "claude-memory"
SERVER_VERSION = "2.0.0"
# API configuration — support both MEMORY_* (primary) and CLAUDE_MEMORY_* (fallback) env vars
API_BASE_URL = os.environ.get("MEMORY_API_URL") or os.environ.get("CLAUDE_MEMORY_API_URL", "http://localhost:8080")
API_KEY = os.environ.get("MEMORY_API_KEY") or os.environ.get("CLAUDE_MEMORY_API_KEY", "")
# Mode detection
SYNC_DISABLED = os.environ.get("MEMORY_SYNC_DISABLE", "") == "1"
HYBRID_MODE = bool(API_KEY) and not SYNC_DISABLED
HTTP_ONLY = bool(API_KEY) and SYNC_DISABLED
SQLITE_ONLY = not API_KEY
def _api_request(method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]:
"""Make an HTTP request to the memory API."""
url = f"{API_BASE_URL}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result: dict[str, Any] = json.loads(resp.read().decode())
return result
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else str(e)
raise RuntimeError(f"API error {e.code}: {error_body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"API connection error: {e.reason}") from e
# ─── SQLite initialization ────────────────────────────────────────────────────
def _get_db_path(db_path: str | None = None) -> str:
"""Resolve the SQLite database path."""
if db_path is not None:
return db_path
memory_home = os.path.expandvars(
os.path.expanduser(os.environ.get("MEMORY_HOME", "~/.claude/claude-memory"))
)
db_path = os.environ.get(
"MEMORY_DB",
os.path.join(memory_home, "memory", "memory.db"),
)
resolved = os.path.expandvars(os.path.expanduser(db_path))
# Migration fallback: if the new path doesn't exist but legacy metaclaw path does, use that
if not os.path.exists(resolved):
legacy_home = os.path.expanduser("~/.claude/metaclaw")
legacy_db = os.path.join(legacy_home, "memory", "memory.db")
if os.path.exists(legacy_db):
return legacy_db
return resolved
SCHEMA_VERSION = 2
def _migrate_sqlite(conn: sqlite3.Connection) -> None:
"""Version-based SQLite schema migrations."""
current = conn.execute("PRAGMA user_version").fetchone()[0]
if current < 1:
# Add server_id column for hybrid mode sync
cursor = conn.execute("PRAGMA table_info(memories)")
columns = {row["name"] for row in cursor.fetchall()}
if "server_id" not in columns:
conn.execute("ALTER TABLE memories ADD COLUMN server_id INTEGER")
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_server_id ON memories(server_id)"
)
if current < 2:
# Ensure pending_ops has retry_count (sync.py also handles this, but belt-and-suspenders)
conn.execute("""
CREATE TABLE IF NOT EXISTS pending_ops (
id INTEGER PRIMARY KEY AUTOINCREMENT,
op_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
retry_count INTEGER DEFAULT 0
)
""")
# Add retry_count if pending_ops already exists without it
cursor = conn.execute("PRAGMA table_info(pending_ops)")
po_columns = {row["name"] for row in cursor.fetchall()}
if "retry_count" not in po_columns:
conn.execute("ALTER TABLE pending_ops ADD COLUMN retry_count INTEGER DEFAULT 0")
conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}")
conn.commit()
def _init_sqlite(db_path: str | None = None) -> tuple[sqlite3.Connection, str]:
"""Initialize SQLite database."""
from pathlib import Path
db_path = _get_db_path(db_path)
Path(os.path.dirname(db_path)).mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
category TEXT DEFAULT 'facts',
tags TEXT DEFAULT '',
expanded_keywords TEXT DEFAULT '',
importance REAL DEFAULT 0.5,
is_sensitive INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# Version-based schema migrations
_migrate_sqlite(conn)
cursor.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content, category, tags, expanded_keywords,
content='memories', content_rowid='id'
)
""")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content, category, tags, expanded_keywords)
VALUES (new.id, new.content, new.category, new.tags, new.expanded_keywords);
END
""")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, category, tags, expanded_keywords)
VALUES ('delete', old.id, old.content, old.category, old.tags, old.expanded_keywords);
END
""")
cursor.execute("""
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content, category, tags, expanded_keywords)
VALUES ('delete', old.id, old.content, old.category, old.tags, old.expanded_keywords);
INSERT INTO memories_fts(rowid, content, category, tags, expanded_keywords)
VALUES (new.id, new.content, new.category, new.tags, new.expanded_keywords);
END
""")
conn.commit()
return conn, db_path
# ─── Tool definitions ────────────────────────────────────────────────────────
TOOLS = [
{
"name": "memory_store",
"description": "Store a fact or memory in persistent storage. Use this to remember important information about the user, their preferences, projects, decisions, or people they mention.",
"inputSchema": {
"type": "object",
"properties": {
"content": {"type": "string", "description": "The fact or memory to store"},
"category": {
"type": "string",
"enum": ["facts", "preferences", "projects", "people", "decisions"],
"description": "Category for organizing the memory",
"default": "facts",
},
"tags": {"type": "string", "description": "Comma-separated tags", "default": ""},
"importance": {
"type": "number",
"description": "Importance 0.0-1.0",
"default": 0.5,
"minimum": 0.0,
"maximum": 1.0,
},
"expanded_keywords": {
"type": "string",
"description": "REQUIRED. Space-separated semantically related search terms (MINIMUM 5 words). Generate keywords that someone might search for when this memory would be relevant. Include synonyms, related concepts, and adjacent topics.",
},
"force_sensitive": {
"type": "boolean",
"description": "If true, mark this memory as sensitive regardless of auto-detection. Sensitive memories have their content encrypted at rest.",
"default": False,
},
},
"required": ["content", "expanded_keywords"],
},
},
{
"name": "memory_recall",
"description": "Retrieve relevant memories based on context. Uses full-text search to find stored memories.",
"inputSchema": {
"type": "object",
"properties": {
"context": {"type": "string", "description": "The context or topic to recall memories about"},
"expanded_query": {
"type": "string",
"description": "REQUIRED. Space-separated semantically related search terms (MINIMUM 5 words).",
},
"category": {
"type": "string",
"enum": ["facts", "preferences", "projects", "people", "decisions"],
"description": "Optional: filter results to a specific category",
},
"sort_by": {
"type": "string",
"enum": ["importance", "relevance"],
"description": "Sort order",
"default": "importance",
},
"limit": {"type": "integer", "description": "Max results", "default": 10},
},
"required": ["context", "expanded_query"],
},
},
{
"name": "memory_list",
"description": "List recent memories, optionally filtered by category.",
"inputSchema": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["facts", "preferences", "projects", "people", "decisions"],
},
"limit": {"type": "integer", "default": 20},
},
},
},
{
"name": "memory_delete",
"description": "Delete a memory by ID.",
"inputSchema": {
"type": "object",
"properties": {
"id": {"type": "integer", "description": "The ID of the memory to delete"},
},
"required": ["id"],
},
},
{
"name": "secret_get",
"description": "Retrieve the decrypted content of a sensitive memory. Only works for memories marked as sensitive.",
"inputSchema": {
"type": "object",
"properties": {
"id": {"type": "integer", "description": "The ID of the sensitive memory to retrieve"},
},
"required": ["id"],
},
},
{
"name": "memory_count",
"description": "Get memory counts by category from local cache and sync status. Useful for diagnostics.",
"inputSchema": {
"type": "object",
"properties": {},
},
},
]
class MemoryServer:
"""MCP server for persistent memory management."""
def __init__(self, sqlite_db_path: str | None = None) -> None:
self.sqlite_conn: sqlite3.Connection | None = None
self.sync_engine: Any = None
if SQLITE_ONLY or HYBRID_MODE:
self.sqlite_conn, resolved_path = _init_sqlite(sqlite_db_path)
if HYBRID_MODE:
from claude_memory.sync import SyncEngine
sync_interval = int(os.environ.get("MEMORY_SYNC_INTERVAL", "60"))
self.sync_engine = SyncEngine(
db_path=resolved_path,
api_base_url=API_BASE_URL,
api_key=API_KEY,
sync_interval=sync_interval,
)
self.sync_engine.start()
def __del__(self) -> None:
if self.sync_engine:
self.sync_engine.stop()
# ── Tool methods ────────────────────────────────────────────────
def memory_store(self, args: dict[str, Any]) -> str:
content = args.get("content")
if not content:
raise ValueError("content is required")
category = args.get("category", "facts")
tags = args.get("tags", "")
importance = max(0.0, min(1.0, float(args.get("importance", 0.5))))
expanded_keywords = args.get("expanded_keywords", "")
force_sensitive = bool(args.get("force_sensitive", False))
if HTTP_ONLY:
result = _api_request("POST", "/api/memories", {
"content": content,
"category": category,
"tags": tags,
"expanded_keywords": expanded_keywords,
"importance": importance,
"force_sensitive": force_sensitive,
})
return f"Stored memory #{result['id']} in category '{result['category']}' with importance {result['importance']:.1f}"
# SQLite-only or Hybrid: write to local SQLite first
result_text = self._sqlite_store(content, category, tags, importance, expanded_keywords, force_sensitive)
if HYBRID_MODE and self.sync_engine:
# Extract local_id from result text
local_id = int(result_text.split("#")[1].split(" ")[0])
self.sync_engine.try_sync_store(
local_id, content, category, tags, expanded_keywords, importance, force_sensitive
)
return result_text
def memory_recall(self, args: dict[str, Any]) -> str:
context = args.get("context")
if not context:
raise ValueError("context is required")
expanded_query = args.get("expanded_query", "")
category = args.get("category")
sort_by = args.get("sort_by", "importance")
limit = args.get("limit", 10)
if HTTP_ONLY:
result = _api_request("POST", "/api/memories/recall", {
"context": context,
"expanded_query": expanded_query,
"category": category,
"sort_by": sort_by,
"limit": limit,
})
rows = result.get("memories", [])
if not rows:
filter_desc = f" in category '{category}'" if category else ""
return f"No memories found matching: {context}{filter_desc}"
sort_desc = "by relevance" if sort_by == "relevance" else "by importance"
filter_desc = f" in '{category}'" if category else ""
results = []
for row in rows:
results.append(
f"#{row['id']} [{row['category']}] (importance: {row['importance']:.1f}) {row['content']}"
f"\n Tags: {row.get('tags') or 'none'} | Stored: {row['created_at']}"
)
return f"Found {len(rows)} memories{filter_desc} ({sort_desc}):\n\n" + "\n\n".join(results)
# SQLite-only or Hybrid: always read from local cache
return self._sqlite_recall(context, expanded_query, category, sort_by, limit)
def memory_list(self, args: dict[str, Any]) -> str:
category = args.get("category")
limit = args.get("limit", 20)
if HTTP_ONLY:
params = f"?limit={limit}"
if category:
params += f"&category={category}"
result = _api_request("GET", f"/api/memories{params}")
rows = result.get("memories", [])
if not rows:
return f"No memories in category '{category}'" if category else "No memories stored yet"
results = []
for row in rows:
results.append(
f"#{row['id']} [{row['category']}] {row['content']}"
f"\n Importance: {row['importance']:.1f} | Tags: {row.get('tags') or 'none'} | Stored: {row['created_at']}"
)
header = "Recent memories"
if category:
header += f" in '{category}'"
return header + f" ({len(rows)} shown):\n\n" + "\n\n".join(results)
# SQLite-only or Hybrid: always read from local cache
return self._sqlite_list(category, limit)
def memory_delete(self, args: dict[str, Any]) -> str:
memory_id = args.get("id")
if memory_id is None:
raise ValueError("id is required")
if HTTP_ONLY:
result = _api_request("DELETE", f"/api/memories/{memory_id}")
return f"Deleted memory #{result['deleted']}: {result['preview']}..."
# SQLite-only or Hybrid: delete from local SQLite
# In hybrid mode, also try to sync delete to server
server_id: int | None = None
if HYBRID_MODE and self.sync_engine and self.sqlite_conn:
cursor = self.sqlite_conn.cursor()
cursor.execute("SELECT server_id FROM memories WHERE id = ?", (memory_id,))
row = cursor.fetchone()
server_id = row["server_id"] if row and row["server_id"] else None
result_text = self._sqlite_delete(memory_id)
if HYBRID_MODE and self.sync_engine and server_id:
self.sync_engine.try_sync_delete(server_id)
return result_text
def secret_get(self, args: dict[str, Any]) -> str:
memory_id = args.get("id")
if memory_id is None:
raise ValueError("id is required")
if HTTP_ONLY or HYBRID_MODE:
# Secrets should be fetched from API when available
try:
result = _api_request("POST", f"/api/memories/{memory_id}/secret")
return f"#{result['id']} [{result['category']}] {result['content']}"
except Exception:
if HYBRID_MODE:
# Fall back to local SQLite
return self._sqlite_secret_get(memory_id)
raise
return self._sqlite_secret_get(memory_id)
def memory_count(self, args: dict[str, Any]) -> str:
if self.sync_engine:
counts = self.sync_engine.get_counts()
lines = [f"Local memories: {counts['total']}"]
for cat, n in counts["by_category"].items():
lines.append(f" {cat}: {n}")
lines.append(f"Orphans (no server_id): {counts['orphans_no_server_id']}")
lines.append(f"Pending ops: {counts['pending_ops']}")
lines.append(f"Last sync: {counts['last_sync_ts'] or 'never'}")
lines.append(f"Auth failed: {counts['auth_failed']}")
lines.append(f"Last sync success: {counts['last_sync_success']}")
return "\n".join(lines)
if self.sqlite_conn:
cursor = self.sqlite_conn.cursor()
cursor.execute("SELECT COUNT(*) as c FROM memories")
total = cursor.fetchone()["c"]
cursor.execute("SELECT category, COUNT(*) as c FROM memories GROUP BY category ORDER BY c DESC")
by_cat = cursor.fetchall()
lines = [f"Local memories (SQLite-only): {total}"]
for row in by_cat:
lines.append(f" {row['category']}: {row['c']}")
return "\n".join(lines)
return "No storage available"
# ── SQLite methods ──────────────────────────────────────────────
def _sqlite_store(self, content: str, category: str, tags: str, importance: float, expanded_keywords: str, force_sensitive: bool = False) -> str:
from datetime import datetime, timezone
assert self.sqlite_conn is not None
now = datetime.now(timezone.utc).isoformat()
is_sensitive = 1 if force_sensitive else 0
cursor = self.sqlite_conn.cursor()
cursor.execute(
"INSERT INTO memories (content, category, tags, expanded_keywords, importance, is_sensitive, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(content, category, tags, expanded_keywords, importance, is_sensitive, now, now),
)
self.sqlite_conn.commit()
return f"Stored memory #{cursor.lastrowid} in category '{category}' with importance {importance:.1f}"
def _sqlite_recall(self, context: str, expanded_query: str, category: str | None, sort_by: str, limit: int) -> str:
assert self.sqlite_conn is not None
all_terms = f"{context} {expanded_query}".strip()
words = [w.replace(chr(34), "") for w in all_terms.split() if w]
and_query = " AND ".join(f'"{w}"' for w in words)
or_query = " OR ".join(f'"{w}"' for w in words)
# Hybrid scoring: blend BM25 relevance with importance
# bm25() returns negative values (lower = better match), so negate it
order = (
"(-bm25(memories_fts) * 0.7 + m.importance * 0.3) DESC"
if sort_by == "relevance"
else "(-bm25(memories_fts) * 0.4 + m.importance * 0.6) DESC"
)
base_select = (
"SELECT m.id, m.content, m.category, m.tags, m.importance, m.created_at "
"FROM memories m JOIN memories_fts fts ON m.id = fts.rowid "
)
cursor = self.sqlite_conn.cursor()
rows: list[Any] = []
try:
# Try AND first for precise matches, fall back to OR for broader results
cat_filter = "AND m.category = ?" if category else ""
for fts_query in (and_query, or_query):
params = [fts_query, category, limit] if category else [fts_query, limit]
cursor.execute(
f"{base_select}WHERE memories_fts MATCH ? {cat_filter} ORDER BY {order} LIMIT ?",
tuple(p for p in params if p is not None),
)
rows = cursor.fetchall()
if rows:
break
except sqlite3.OperationalError:
like = f"%{context}%"
if category:
cursor.execute(
"SELECT id, content, category, tags, importance, created_at FROM memories "
"WHERE (content LIKE ? OR tags LIKE ?) AND category = ? ORDER BY importance DESC LIMIT ?",
(like, like, category, limit),
)
else:
cursor.execute(
"SELECT id, content, category, tags, importance, created_at FROM memories "
"WHERE content LIKE ? OR tags LIKE ? ORDER BY importance DESC LIMIT ?",
(like, like, limit),
)
rows = cursor.fetchall()
if not rows:
return f"No memories found matching: {context}"
results = []
for row in rows:
results.append(
f"#{row['id']} [{row['category']}] (importance: {row['importance']:.1f}) {row['content']}"
f"\n Tags: {row['tags'] or 'none'} | Stored: {row['created_at']}"
)
return (
f"Found {len(rows)} memories (by {'relevance' if sort_by == 'relevance' else 'importance'}):\n\n"
+ "\n\n".join(results)
)
def _sqlite_list(self, category: str | None, limit: int) -> str:
assert self.sqlite_conn is not None
cursor = self.sqlite_conn.cursor()
if category:
cursor.execute(
"SELECT id, content, category, tags, importance, created_at FROM memories "
"WHERE category = ? ORDER BY created_at DESC LIMIT ?",
(category, limit),
)
else:
cursor.execute(
"SELECT id, content, category, tags, importance, created_at FROM memories "
"ORDER BY created_at DESC LIMIT ?",
(limit,),
)
rows = cursor.fetchall()
if not rows:
return f"No memories in category '{category}'" if category else "No memories stored yet"
results = []
for row in rows:
results.append(
f"#{row['id']} [{row['category']}] {row['content']}"
f"\n Importance: {row['importance']:.1f} | Tags: {row['tags'] or 'none'} | Stored: {row['created_at']}"
)
header = "Recent memories" + (f" in '{category}'" if category else "")
return header + f" ({len(rows)} shown):\n\n" + "\n\n".join(results)
def _sqlite_delete(self, memory_id: int) -> str:
assert self.sqlite_conn is not None
cursor = self.sqlite_conn.cursor()
cursor.execute("SELECT id, content FROM memories WHERE id = ?", (memory_id,))
row = cursor.fetchone()
if not row:
return f"Memory #{memory_id} not found"
preview = row["content"][:50]
cursor.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
self.sqlite_conn.commit()
return f"Deleted memory #{memory_id}: {preview}..."
def _sqlite_secret_get(self, memory_id: int) -> str:
assert self.sqlite_conn is not None
cursor = self.sqlite_conn.cursor()
cursor.execute(
"SELECT id, content, category, is_sensitive FROM memories WHERE id = ?",
(memory_id,),
)
row = cursor.fetchone()
if not row:
return f"Memory #{memory_id} not found"
if not row["is_sensitive"]:
return f"Memory #{memory_id} is not marked as sensitive"
return f"#{row['id']} [{row['category']}] {row['content']}"
# ── MCP protocol ─────────────────────────────────────────────────
def handle_initialize(self, params: dict[str, Any]) -> dict[str, Any]:
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {}},
"serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION},
}
def handle_tools_list(self, params: dict[str, Any]) -> dict[str, Any]:
return {"tools": TOOLS}
def handle_tools_call(self, params: dict[str, Any]) -> dict[str, Any]:
tool_name: str = params.get("name", "")
arguments: dict[str, Any] = params.get("arguments", {})
try:
handler = {
"memory_store": self.memory_store,
"memory_recall": self.memory_recall,
"memory_list": self.memory_list,
"memory_delete": self.memory_delete,
"secret_get": self.secret_get,
"memory_count": self.memory_count,
}.get(tool_name)
if handler is None:
return {"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}], "isError": True}
result = handler(arguments)
return {"content": [{"type": "text", "text": result}]}
except Exception as e:
return {"content": [{"type": "text", "text": f"Error: {e!s}"}], "isError": True}
def process_message(self, message: dict[str, Any]) -> dict[str, Any] | None:
method = message.get("method")
params = message.get("params", {})
msg_id = message.get("id")
if msg_id is None:
return None
result = None
error = None
try:
if method == "initialize":
result = self.handle_initialize(params)
elif method == "tools/list":
result = self.handle_tools_list(params)
elif method == "tools/call":
result = self.handle_tools_call(params)
else:
error = {"code": -32601, "message": f"Method not found: {method}"}
except Exception as e:
error = {"code": -32603, "message": str(e)}
response: dict[str, Any] = {"jsonrpc": "2.0", "id": msg_id}
if error:
response["error"] = error
else:
response["result"] = result
return response
def run(self) -> None:
try:
for line in sys.stdin:
line = line.strip()
if not line or line.startswith("Content-Length:"):
continue
try:
message = json.loads(line)
except json.JSONDecodeError:
continue
response = self.process_message(message)
if response is not None:
print(json.dumps(response), flush=True)
finally:
if self.sync_engine:
self.sync_engine.stop()
def main() -> None:
# Suppress all stderr output — MCP clients (e.g. Claude Code) may treat
# any stderr as a fatal error and refuse to load the server.
sys.stderr = open(os.devnull, "w")
logging.disable(logging.CRITICAL)
server = MemoryServer()
server.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,583 @@
"""Background sync between local SQLite cache and remote API.
Uses only stdlib no pip install required.
"""
import json
import logging
import sqlite3
import threading
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
# Max retries before an individual pending op is permanently skipped
MAX_OP_RETRIES = 5
# Full resync every N sync cycles (~10 min at 60s interval)
FULL_RESYNC_EVERY = 10
class SyncEngine:
"""Background sync between local SQLite cache and remote API."""
def __init__(self, db_path: str, api_base_url: str, api_key: str, sync_interval: int = 60):
self.db_path = db_path
self.api_base_url = api_base_url.rstrip("/")
self.api_key = api_key
self.sync_interval = sync_interval
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
self._last_sync_success = False
self._auth_failed = False
# Own connection for thread safety
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, timeout=30.0, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA busy_timeout=30000")
self._lock = threading.Lock()
self._init_sync_tables()
def _init_sync_tables(self) -> None:
"""Create sync-specific tables if they don't exist."""
with self._lock:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS pending_ops (
id INTEGER PRIMARY KEY AUTOINCREMENT,
op_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL,
retry_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS sync_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
""")
# Add server_id column to memories if missing
cursor = self._conn.execute("PRAGMA table_info(memories)")
columns = {row["name"] for row in cursor.fetchall()}
if "server_id" not in columns:
self._conn.execute("ALTER TABLE memories ADD COLUMN server_id INTEGER")
self._conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_server_id ON memories(server_id)"
)
# Add retry_count column to pending_ops if missing (migration)
cursor = self._conn.execute("PRAGMA table_info(pending_ops)")
po_columns = {row["name"] for row in cursor.fetchall()}
if "retry_count" not in po_columns:
self._conn.execute("ALTER TABLE pending_ops ADD COLUMN retry_count INTEGER DEFAULT 0")
self._conn.commit()
@property
def last_sync_ts(self) -> str | None:
with self._lock:
cursor = self._conn.execute(
"SELECT value FROM sync_meta WHERE key = 'last_sync_ts'"
)
row = cursor.fetchone()
return row["value"] if row else None
@last_sync_ts.setter
def last_sync_ts(self, value: str) -> None:
with self._lock:
self._conn.execute(
"INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)",
(value,),
)
self._conn.commit()
@property
def api_available(self) -> bool:
return self._last_sync_success
def start(self) -> None:
"""Start background sync thread. Runs a full resync on startup."""
# Full sync on startup (blocking, before background thread)
try:
self._full_resync()
self._last_sync_success = True
self._auth_failed = False
except Exception as e:
logger.warning("Startup full sync failed: %s", e)
self._thread = threading.Thread(target=self._sync_loop, daemon=True)
self._thread.start()
def stop(self) -> None:
"""Signal background thread to stop and wait."""
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
self._conn.close()
def _sync_loop(self) -> None:
"""Periodic sync loop running in background thread."""
cycle = 0
while not self._stop_event.is_set():
self._stop_event.wait(self.sync_interval)
if self._stop_event.is_set():
break
cycle += 1
try:
# If auth previously failed, try a lightweight check first
if self._auth_failed:
if not self._check_auth():
continue # Still failing, skip this cycle
if cycle % FULL_RESYNC_EVERY == 0:
self._full_resync()
else:
self._sync_once()
self._last_sync_success = True
except Exception as e:
logger.warning("Sync cycle failed: %s", e)
self._last_sync_success = False
def _check_auth(self) -> bool:
"""Lightweight auth check. Returns True if auth is OK."""
try:
self._api_request("GET", "/api/auth-check")
self._auth_failed = False
logger.info("Auth check passed — resuming sync")
return True
except urllib.error.HTTPError as e:
if e.code in (401, 403):
logger.warning(
"Auth still failing (HTTP %d) — API key mismatch. "
"Update MEMORY_API_KEY in ~/.claude.json", e.code
)
return False
# Non-auth error (e.g. 500) — try the auth-check endpoint might not exist,
# fall back to /health
pass
except Exception:
pass
# Fallback: try /health (unauthenticated)
try:
url = f"{self.api_base_url}/health"
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=5):
pass
# Server is reachable but auth-check failed — auth is still broken
return False
except Exception:
# Server unreachable — not an auth problem
return False
def _sync_once(self) -> None:
"""Push pending ops, then pull remote changes. Both run independently."""
push_ok = self._push_pending_ops()
pull_ok = self._pull_changes()
if not push_ok and not pull_ok:
raise RuntimeError("Both push and pull failed")
def _full_resync(self) -> None:
"""Full cache replacement from server — handles drift, deletes, schema changes."""
# Step 1: Push orphaned local-only records (deduplicated)
self._push_orphans()
# Step 2: Pull everything from server (no since filter = non-deleted only)
result = self._api_request("GET", "/api/memories/sync")
memories = result.get("memories", [])
server_time = result.get("server_time")
server_ids = {m["id"] for m in memories}
with self._lock:
# Delete local records whose server_id no longer exists on server
local_rows = self._conn.execute(
"SELECT id, server_id FROM memories WHERE server_id IS NOT NULL"
).fetchall()
for row in local_rows:
if row["server_id"] not in server_ids:
self._conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],))
# Delete remaining orphans (already pushed or duplicates)
self._conn.execute("DELETE FROM memories WHERE server_id IS NULL")
# Upsert all server records
for mem in memories:
server_id = mem["id"]
existing = self._conn.execute(
"SELECT id FROM memories WHERE server_id = ?", (server_id,)
).fetchone()
if existing:
self._conn.execute(
"""UPDATE memories SET content=?, category=?, tags=?,
expanded_keywords=?, importance=?, is_sensitive=?,
updated_at=? WHERE server_id=?""",
(
mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", ""), server_id,
),
)
else:
self._conn.execute(
"""INSERT INTO memories (content, category, tags, expanded_keywords,
importance, is_sensitive, created_at, updated_at, server_id)
VALUES (?,?,?,?,?,?,?,?,?)""",
(
mem["content"], mem["category"], mem.get("tags", ""),
mem.get("expanded_keywords", ""), mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", ""), mem.get("updated_at", ""), server_id,
),
)
self._conn.commit()
if server_time:
self.last_sync_ts = server_time
def _push_orphans(self) -> None:
"""Push local-only records to server, skipping content duplicates."""
with self._lock:
orphans = self._conn.execute(
"SELECT id, content, category, tags, expanded_keywords, importance "
"FROM memories WHERE server_id IS NULL"
).fetchall()
if not orphans:
return
# Get all server content for dedup comparison
result = self._api_request("GET", "/api/memories/sync")
server_contents = {m["content"] for m in result.get("memories", [])}
for orphan in orphans:
if orphan["content"] in server_contents:
continue # Skip duplicate
try:
resp = self._api_request("POST", "/api/memories", {
"content": orphan["content"],
"category": orphan["category"],
"tags": orphan["tags"],
"expanded_keywords": orphan["expanded_keywords"],
"importance": orphan["importance"],
})
server_id = resp.get("id")
if server_id:
with self._lock:
self._conn.execute(
"UPDATE memories SET server_id=? WHERE id=?",
(server_id, orphan["id"]),
)
self._conn.commit()
except Exception:
pass # Will be cleaned up by the full resync delete step
def _api_request(self, method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]:
"""Make an HTTP request to the memory API."""
url = f"{self.api_base_url}{path}"
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result: dict[str, Any] = json.loads(resp.read().decode())
return result
except urllib.error.HTTPError as e:
if e.code in (401, 403):
self._auth_failed = True
logger.warning(
"Auth failed (HTTP %d) — API key may have rotated. "
"Update MEMORY_API_KEY in ~/.claude.json", e.code
)
raise
def _push_pending_ops(self) -> bool:
"""Push queued operations to the API server. Returns True on success."""
with self._lock:
cursor = self._conn.execute(
"SELECT id, op_type, payload, retry_count FROM pending_ops ORDER BY id"
)
ops = cursor.fetchall()
if not ops:
return True
all_ok = True
for op in ops:
op_id = op["id"]
op_type = op["op_type"]
payload = json.loads(op["payload"])
retry_count = op["retry_count"] or 0
# Skip ops that have exceeded retry limit
if retry_count >= MAX_OP_RETRIES:
logger.warning(
"Skipping op %d (%s) after %d retries — removing from queue",
op_id, op_type, retry_count,
)
with self._lock:
self._conn.execute("DELETE FROM pending_ops WHERE id = ?", (op_id,))
self._conn.commit()
continue
try:
if op_type == "store":
result = self._api_request("POST", "/api/memories", payload)
server_id = result.get("id")
if server_id and payload.get("local_id"):
with self._lock:
self._conn.execute(
"UPDATE memories SET server_id = ? WHERE id = ?",
(server_id, payload["local_id"]),
)
self._conn.commit()
elif op_type == "delete":
server_id = payload.get("server_id")
if server_id:
try:
self._api_request("DELETE", f"/api/memories/{server_id}")
except urllib.error.HTTPError as e:
if e.code == 404:
pass # Already deleted on server
else:
raise
# Remove from pending queue on success
with self._lock:
self._conn.execute("DELETE FROM pending_ops WHERE id = ?", (op_id,))
self._conn.commit()
except urllib.error.HTTPError as e:
if e.code in (401, 403):
self._auth_failed = True
logger.warning("Auth failed (HTTP %d) — aborting push", e.code)
return False # Abort entire push — no point retrying with bad key
# Increment retry count for non-auth errors
with self._lock:
self._conn.execute(
"UPDATE pending_ops SET retry_count = retry_count + 1 WHERE id = ?",
(op_id,),
)
self._conn.commit()
logger.warning("Failed to push op %d (%s): HTTP %d", op_id, op_type, e.code)
all_ok = False
except Exception as e:
with self._lock:
self._conn.execute(
"UPDATE pending_ops SET retry_count = retry_count + 1 WHERE id = ?",
(op_id,),
)
self._conn.commit()
logger.warning("Failed to push op %d (%s): %s", op_id, op_type, e)
all_ok = False
return all_ok
def _pull_changes(self) -> bool:
"""Pull changes from server since last sync. Returns True on success."""
try:
params = ""
ts = self.last_sync_ts
if ts:
params = f"?since={urllib.parse.quote(ts, safe='')}"
result = self._api_request("GET", f"/api/memories/sync{params}")
memories = result.get("memories", [])
server_time = result.get("server_time")
with self._lock:
for mem in memories:
server_id = mem["id"]
deleted_at = mem.get("deleted_at")
if deleted_at:
# Remove from local cache
self._conn.execute(
"DELETE FROM memories WHERE server_id = ?", (server_id,)
)
else:
# Upsert by server_id (server wins)
existing = self._conn.execute(
"SELECT id FROM memories WHERE server_id = ?", (server_id,)
).fetchone()
if existing:
self._conn.execute(
"""UPDATE memories SET content = ?, category = ?, tags = ?,
expanded_keywords = ?, importance = ?, is_sensitive = ?,
updated_at = ? WHERE server_id = ?""",
(
mem["content"],
mem["category"],
mem.get("tags", ""),
mem.get("expanded_keywords", ""),
mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("updated_at", datetime.now(timezone.utc).isoformat()),
server_id,
),
)
else:
self._conn.execute(
"""INSERT INTO memories
(content, category, tags, expanded_keywords, importance,
is_sensitive, created_at, updated_at, server_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
mem["content"],
mem["category"],
mem.get("tags", ""),
mem.get("expanded_keywords", ""),
mem["importance"],
1 if mem.get("is_sensitive") else 0,
mem.get("created_at", datetime.now(timezone.utc).isoformat()),
mem.get("updated_at", datetime.now(timezone.utc).isoformat()),
server_id,
),
)
self._conn.commit()
if server_time:
self.last_sync_ts = server_time
return True
except Exception as e:
logger.warning("Pull changes failed: %s", e)
return False
def enqueue_store(
self,
local_id: int,
content: str,
category: str,
tags: str,
expanded_keywords: str,
importance: float,
force_sensitive: bool = False,
) -> None:
"""Queue a store operation for later sync."""
payload = {
"local_id": local_id,
"content": content,
"category": category,
"tags": tags,
"expanded_keywords": expanded_keywords,
"importance": importance,
"force_sensitive": force_sensitive,
}
now = datetime.now(timezone.utc).isoformat()
with self._lock:
self._conn.execute(
"INSERT INTO pending_ops (op_type, payload, created_at) VALUES (?, ?, ?)",
("store", json.dumps(payload), now),
)
self._conn.commit()
def enqueue_delete(self, server_id: int) -> None:
"""Queue a delete operation for later sync."""
payload = {"server_id": server_id}
now = datetime.now(timezone.utc).isoformat()
with self._lock:
self._conn.execute(
"INSERT INTO pending_ops (op_type, payload, created_at) VALUES (?, ?, ?)",
("delete", json.dumps(payload), now),
)
self._conn.commit()
def try_sync_store(
self,
local_id: int,
content: str,
category: str,
tags: str,
expanded_keywords: str,
importance: float,
force_sensitive: bool = False,
) -> int | None:
"""Try to sync a store immediately. Returns server_id or None if failed."""
if self._auth_failed:
self.enqueue_store(
local_id, content, category, tags, expanded_keywords, importance, force_sensitive
)
return None
try:
result = self._api_request("POST", "/api/memories", {
"content": content,
"category": category,
"tags": tags,
"expanded_keywords": expanded_keywords,
"importance": importance,
"force_sensitive": force_sensitive,
})
server_id = result.get("id")
if server_id:
with self._lock:
self._conn.execute(
"UPDATE memories SET server_id = ? WHERE id = ?",
(server_id, local_id),
)
self._conn.commit()
return server_id
except Exception:
self.enqueue_store(
local_id, content, category, tags, expanded_keywords, importance, force_sensitive
)
return None
def try_sync_delete(self, server_id: int) -> bool:
"""Try to sync a delete immediately. Returns True if successful."""
if self._auth_failed:
self.enqueue_delete(server_id)
return False
try:
self._api_request("DELETE", f"/api/memories/{server_id}")
return True
except urllib.error.HTTPError as e:
if e.code == 404:
return True # Already deleted on server — not an error
self.enqueue_delete(server_id)
return False
except Exception:
self.enqueue_delete(server_id)
return False
def get_counts(self) -> dict[str, Any]:
"""Get memory counts for diagnostics."""
with self._lock:
total = self._conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"]
by_cat = self._conn.execute(
"SELECT category, COUNT(*) as c FROM memories GROUP BY category ORDER BY c DESC"
).fetchall()
orphans = self._conn.execute(
"SELECT COUNT(*) as c FROM memories WHERE server_id IS NULL"
).fetchone()["c"]
pending = self._conn.execute(
"SELECT COUNT(*) as c FROM pending_ops"
).fetchone()["c"]
return {
"total": total,
"by_category": {row["category"]: row["c"] for row in by_cat},
"orphans_no_server_id": orphans,
"pending_ops": pending,
"last_sync_ts": self.last_sync_ts,
"auth_failed": self._auth_failed,
"last_sync_success": self._last_sync_success,
}

View file

@ -3,5 +3,5 @@
# This avoids committing API keys in plaintext JSON configs.
source ~/.oh-my-zsh/custom/secrets.zsh 2>/dev/null
export MEMORY_API_URL="${MEMORY_API_URL:-https://claude-memory.viktorbarzin.me}"
export PYTHONPATH="/Users/viktorbarzin/code/claude-memory-mcp/src"
exec python3 /Users/viktorbarzin/code/claude-memory-mcp/src/claude_memory/mcp_server.py "$@"
export PYTHONPATH="$HOME/.claude/claude-memory"
exec python3 "$HOME/.claude/claude-memory/claude_memory/mcp_server.py" "$@"