diff --git a/dot_claude/claude-memory/claude_memory/__init__.py b/dot_claude/claude-memory/claude_memory/__init__.py new file mode 100644 index 0000000..2865899 --- /dev/null +++ b/dot_claude/claude-memory/claude_memory/__init__.py @@ -0,0 +1,3 @@ +"""Claude Memory MCP — standalone memory server with multi-user support.""" + +__version__ = "1.0.0" diff --git a/dot_claude/claude-memory/claude_memory/mcp_server.py b/dot_claude/claude-memory/claude_memory/mcp_server.py new file mode 100644 index 0000000..1cb943d --- /dev/null +++ b/dot_claude/claude-memory/claude_memory/mcp_server.py @@ -0,0 +1,703 @@ +#!/usr/bin/env python3 +""" +Claude Memory MCP Server — standalone memory server with multi-user support. + +Supports three modes: + 1. SQLite-only: local file-based storage when no API key is configured + 2. Hybrid (default when API key set): local SQLite cache + background sync + 3. HTTP-only (legacy): direct HTTP to API, no local cache (MEMORY_SYNC_DISABLE=1) + +Uses only stdlib (urllib) — no pip install required. +""" + +import json +import logging +import os +import sqlite3 +import sys +import urllib.error +import urllib.request +from typing import Any + +logger = logging.getLogger(__name__) + +PROTOCOL_VERSION = "2024-11-05" +SERVER_NAME = "claude-memory" +SERVER_VERSION = "2.0.0" + +# API configuration — support both MEMORY_* (primary) and CLAUDE_MEMORY_* (fallback) env vars +API_BASE_URL = os.environ.get("MEMORY_API_URL") or os.environ.get("CLAUDE_MEMORY_API_URL", "http://localhost:8080") +API_KEY = os.environ.get("MEMORY_API_KEY") or os.environ.get("CLAUDE_MEMORY_API_KEY", "") + +# Mode detection +SYNC_DISABLED = os.environ.get("MEMORY_SYNC_DISABLE", "") == "1" +HYBRID_MODE = bool(API_KEY) and not SYNC_DISABLED +HTTP_ONLY = bool(API_KEY) and SYNC_DISABLED +SQLITE_ONLY = not API_KEY + + +def _api_request(method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: + """Make an HTTP request to the memory API.""" + url = f"{API_BASE_URL}{path}" + data = json.dumps(body).encode() if body else None + req = urllib.request.Request( + url, + data=data, + method=method, + headers={ + "Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + result: dict[str, Any] = json.loads(resp.read().decode()) + return result + except urllib.error.HTTPError as e: + error_body = e.read().decode() if e.fp else str(e) + raise RuntimeError(f"API error {e.code}: {error_body}") from e + except urllib.error.URLError as e: + raise RuntimeError(f"API connection error: {e.reason}") from e + + +# ─── SQLite initialization ──────────────────────────────────────────────────── + +def _get_db_path(db_path: str | None = None) -> str: + """Resolve the SQLite database path.""" + if db_path is not None: + return db_path + + memory_home = os.path.expandvars( + os.path.expanduser(os.environ.get("MEMORY_HOME", "~/.claude/claude-memory")) + ) + db_path = os.environ.get( + "MEMORY_DB", + os.path.join(memory_home, "memory", "memory.db"), + ) + resolved = os.path.expandvars(os.path.expanduser(db_path)) + + # Migration fallback: if the new path doesn't exist but legacy metaclaw path does, use that + if not os.path.exists(resolved): + legacy_home = os.path.expanduser("~/.claude/metaclaw") + legacy_db = os.path.join(legacy_home, "memory", "memory.db") + if os.path.exists(legacy_db): + return legacy_db + + return resolved + + +SCHEMA_VERSION = 2 + + +def _migrate_sqlite(conn: sqlite3.Connection) -> None: + """Version-based SQLite schema migrations.""" + current = conn.execute("PRAGMA user_version").fetchone()[0] + if current < 1: + # Add server_id column for hybrid mode sync + cursor = conn.execute("PRAGMA table_info(memories)") + columns = {row["name"] for row in cursor.fetchall()} + if "server_id" not in columns: + conn.execute("ALTER TABLE memories ADD COLUMN server_id INTEGER") + conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_server_id ON memories(server_id)" + ) + if current < 2: + # Ensure pending_ops has retry_count (sync.py also handles this, but belt-and-suspenders) + conn.execute(""" + CREATE TABLE IF NOT EXISTS pending_ops ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + op_type TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL, + retry_count INTEGER DEFAULT 0 + ) + """) + # Add retry_count if pending_ops already exists without it + cursor = conn.execute("PRAGMA table_info(pending_ops)") + po_columns = {row["name"] for row in cursor.fetchall()} + if "retry_count" not in po_columns: + conn.execute("ALTER TABLE pending_ops ADD COLUMN retry_count INTEGER DEFAULT 0") + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.commit() + + +def _init_sqlite(db_path: str | None = None) -> tuple[sqlite3.Connection, str]: + """Initialize SQLite database.""" + from pathlib import Path + + db_path = _get_db_path(db_path) + Path(os.path.dirname(db_path)).mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(db_path, timeout=30.0) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS memories ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + category TEXT DEFAULT 'facts', + tags TEXT DEFAULT '', + expanded_keywords TEXT DEFAULT '', + importance REAL DEFAULT 0.5, + is_sensitive INTEGER DEFAULT 0, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + # Version-based schema migrations + _migrate_sqlite(conn) + + cursor.execute(""" + CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( + content, category, tags, expanded_keywords, + content='memories', content_rowid='id' + ) + """) + cursor.execute(""" + CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN + INSERT INTO memories_fts(rowid, content, category, tags, expanded_keywords) + VALUES (new.id, new.content, new.category, new.tags, new.expanded_keywords); + END + """) + cursor.execute(""" + CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, content, category, tags, expanded_keywords) + VALUES ('delete', old.id, old.content, old.category, old.tags, old.expanded_keywords); + END + """) + cursor.execute(""" + CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, content, category, tags, expanded_keywords) + VALUES ('delete', old.id, old.content, old.category, old.tags, old.expanded_keywords); + INSERT INTO memories_fts(rowid, content, category, tags, expanded_keywords) + VALUES (new.id, new.content, new.category, new.tags, new.expanded_keywords); + END + """) + conn.commit() + return conn, db_path + + +# ─── Tool definitions ──────────────────────────────────────────────────────── + +TOOLS = [ + { + "name": "memory_store", + "description": "Store a fact or memory in persistent storage. Use this to remember important information about the user, their preferences, projects, decisions, or people they mention.", + "inputSchema": { + "type": "object", + "properties": { + "content": {"type": "string", "description": "The fact or memory to store"}, + "category": { + "type": "string", + "enum": ["facts", "preferences", "projects", "people", "decisions"], + "description": "Category for organizing the memory", + "default": "facts", + }, + "tags": {"type": "string", "description": "Comma-separated tags", "default": ""}, + "importance": { + "type": "number", + "description": "Importance 0.0-1.0", + "default": 0.5, + "minimum": 0.0, + "maximum": 1.0, + }, + "expanded_keywords": { + "type": "string", + "description": "REQUIRED. Space-separated semantically related search terms (MINIMUM 5 words). Generate keywords that someone might search for when this memory would be relevant. Include synonyms, related concepts, and adjacent topics.", + }, + "force_sensitive": { + "type": "boolean", + "description": "If true, mark this memory as sensitive regardless of auto-detection. Sensitive memories have their content encrypted at rest.", + "default": False, + }, + }, + "required": ["content", "expanded_keywords"], + }, + }, + { + "name": "memory_recall", + "description": "Retrieve relevant memories based on context. Uses full-text search to find stored memories.", + "inputSchema": { + "type": "object", + "properties": { + "context": {"type": "string", "description": "The context or topic to recall memories about"}, + "expanded_query": { + "type": "string", + "description": "REQUIRED. Space-separated semantically related search terms (MINIMUM 5 words).", + }, + "category": { + "type": "string", + "enum": ["facts", "preferences", "projects", "people", "decisions"], + "description": "Optional: filter results to a specific category", + }, + "sort_by": { + "type": "string", + "enum": ["importance", "relevance"], + "description": "Sort order", + "default": "importance", + }, + "limit": {"type": "integer", "description": "Max results", "default": 10}, + }, + "required": ["context", "expanded_query"], + }, + }, + { + "name": "memory_list", + "description": "List recent memories, optionally filtered by category.", + "inputSchema": { + "type": "object", + "properties": { + "category": { + "type": "string", + "enum": ["facts", "preferences", "projects", "people", "decisions"], + }, + "limit": {"type": "integer", "default": 20}, + }, + }, + }, + { + "name": "memory_delete", + "description": "Delete a memory by ID.", + "inputSchema": { + "type": "object", + "properties": { + "id": {"type": "integer", "description": "The ID of the memory to delete"}, + }, + "required": ["id"], + }, + }, + { + "name": "secret_get", + "description": "Retrieve the decrypted content of a sensitive memory. Only works for memories marked as sensitive.", + "inputSchema": { + "type": "object", + "properties": { + "id": {"type": "integer", "description": "The ID of the sensitive memory to retrieve"}, + }, + "required": ["id"], + }, + }, + { + "name": "memory_count", + "description": "Get memory counts by category from local cache and sync status. Useful for diagnostics.", + "inputSchema": { + "type": "object", + "properties": {}, + }, + }, +] + + +class MemoryServer: + """MCP server for persistent memory management.""" + + def __init__(self, sqlite_db_path: str | None = None) -> None: + self.sqlite_conn: sqlite3.Connection | None = None + self.sync_engine: Any = None + + if SQLITE_ONLY or HYBRID_MODE: + self.sqlite_conn, resolved_path = _init_sqlite(sqlite_db_path) + + if HYBRID_MODE: + from claude_memory.sync import SyncEngine + sync_interval = int(os.environ.get("MEMORY_SYNC_INTERVAL", "60")) + self.sync_engine = SyncEngine( + db_path=resolved_path, + api_base_url=API_BASE_URL, + api_key=API_KEY, + sync_interval=sync_interval, + ) + self.sync_engine.start() + + def __del__(self) -> None: + if self.sync_engine: + self.sync_engine.stop() + + # ── Tool methods ──────────────────────────────────────────────── + + def memory_store(self, args: dict[str, Any]) -> str: + content = args.get("content") + if not content: + raise ValueError("content is required") + category = args.get("category", "facts") + tags = args.get("tags", "") + importance = max(0.0, min(1.0, float(args.get("importance", 0.5)))) + expanded_keywords = args.get("expanded_keywords", "") + force_sensitive = bool(args.get("force_sensitive", False)) + + if HTTP_ONLY: + result = _api_request("POST", "/api/memories", { + "content": content, + "category": category, + "tags": tags, + "expanded_keywords": expanded_keywords, + "importance": importance, + "force_sensitive": force_sensitive, + }) + return f"Stored memory #{result['id']} in category '{result['category']}' with importance {result['importance']:.1f}" + + # SQLite-only or Hybrid: write to local SQLite first + result_text = self._sqlite_store(content, category, tags, importance, expanded_keywords, force_sensitive) + + if HYBRID_MODE and self.sync_engine: + # Extract local_id from result text + local_id = int(result_text.split("#")[1].split(" ")[0]) + self.sync_engine.try_sync_store( + local_id, content, category, tags, expanded_keywords, importance, force_sensitive + ) + + return result_text + + def memory_recall(self, args: dict[str, Any]) -> str: + context = args.get("context") + if not context: + raise ValueError("context is required") + expanded_query = args.get("expanded_query", "") + category = args.get("category") + sort_by = args.get("sort_by", "importance") + limit = args.get("limit", 10) + + if HTTP_ONLY: + result = _api_request("POST", "/api/memories/recall", { + "context": context, + "expanded_query": expanded_query, + "category": category, + "sort_by": sort_by, + "limit": limit, + }) + rows = result.get("memories", []) + if not rows: + filter_desc = f" in category '{category}'" if category else "" + return f"No memories found matching: {context}{filter_desc}" + + sort_desc = "by relevance" if sort_by == "relevance" else "by importance" + filter_desc = f" in '{category}'" if category else "" + results = [] + for row in rows: + results.append( + f"#{row['id']} [{row['category']}] (importance: {row['importance']:.1f}) {row['content']}" + f"\n Tags: {row.get('tags') or 'none'} | Stored: {row['created_at']}" + ) + return f"Found {len(rows)} memories{filter_desc} ({sort_desc}):\n\n" + "\n\n".join(results) + + # SQLite-only or Hybrid: always read from local cache + return self._sqlite_recall(context, expanded_query, category, sort_by, limit) + + def memory_list(self, args: dict[str, Any]) -> str: + category = args.get("category") + limit = args.get("limit", 20) + + if HTTP_ONLY: + params = f"?limit={limit}" + if category: + params += f"&category={category}" + result = _api_request("GET", f"/api/memories{params}") + rows = result.get("memories", []) + if not rows: + return f"No memories in category '{category}'" if category else "No memories stored yet" + + results = [] + for row in rows: + results.append( + f"#{row['id']} [{row['category']}] {row['content']}" + f"\n Importance: {row['importance']:.1f} | Tags: {row.get('tags') or 'none'} | Stored: {row['created_at']}" + ) + header = "Recent memories" + if category: + header += f" in '{category}'" + return header + f" ({len(rows)} shown):\n\n" + "\n\n".join(results) + + # SQLite-only or Hybrid: always read from local cache + return self._sqlite_list(category, limit) + + def memory_delete(self, args: dict[str, Any]) -> str: + memory_id = args.get("id") + if memory_id is None: + raise ValueError("id is required") + + if HTTP_ONLY: + result = _api_request("DELETE", f"/api/memories/{memory_id}") + return f"Deleted memory #{result['deleted']}: {result['preview']}..." + + # SQLite-only or Hybrid: delete from local SQLite + # In hybrid mode, also try to sync delete to server + server_id: int | None = None + if HYBRID_MODE and self.sync_engine and self.sqlite_conn: + cursor = self.sqlite_conn.cursor() + cursor.execute("SELECT server_id FROM memories WHERE id = ?", (memory_id,)) + row = cursor.fetchone() + server_id = row["server_id"] if row and row["server_id"] else None + + result_text = self._sqlite_delete(memory_id) + + if HYBRID_MODE and self.sync_engine and server_id: + self.sync_engine.try_sync_delete(server_id) + + return result_text + + def secret_get(self, args: dict[str, Any]) -> str: + memory_id = args.get("id") + if memory_id is None: + raise ValueError("id is required") + + if HTTP_ONLY or HYBRID_MODE: + # Secrets should be fetched from API when available + try: + result = _api_request("POST", f"/api/memories/{memory_id}/secret") + return f"#{result['id']} [{result['category']}] {result['content']}" + except Exception: + if HYBRID_MODE: + # Fall back to local SQLite + return self._sqlite_secret_get(memory_id) + raise + + return self._sqlite_secret_get(memory_id) + + def memory_count(self, args: dict[str, Any]) -> str: + if self.sync_engine: + counts = self.sync_engine.get_counts() + lines = [f"Local memories: {counts['total']}"] + for cat, n in counts["by_category"].items(): + lines.append(f" {cat}: {n}") + lines.append(f"Orphans (no server_id): {counts['orphans_no_server_id']}") + lines.append(f"Pending ops: {counts['pending_ops']}") + lines.append(f"Last sync: {counts['last_sync_ts'] or 'never'}") + lines.append(f"Auth failed: {counts['auth_failed']}") + lines.append(f"Last sync success: {counts['last_sync_success']}") + return "\n".join(lines) + + if self.sqlite_conn: + cursor = self.sqlite_conn.cursor() + cursor.execute("SELECT COUNT(*) as c FROM memories") + total = cursor.fetchone()["c"] + cursor.execute("SELECT category, COUNT(*) as c FROM memories GROUP BY category ORDER BY c DESC") + by_cat = cursor.fetchall() + lines = [f"Local memories (SQLite-only): {total}"] + for row in by_cat: + lines.append(f" {row['category']}: {row['c']}") + return "\n".join(lines) + + return "No storage available" + + # ── SQLite methods ────────────────────────────────────────────── + + def _sqlite_store(self, content: str, category: str, tags: str, importance: float, expanded_keywords: str, force_sensitive: bool = False) -> str: + from datetime import datetime, timezone + + assert self.sqlite_conn is not None + now = datetime.now(timezone.utc).isoformat() + is_sensitive = 1 if force_sensitive else 0 + cursor = self.sqlite_conn.cursor() + cursor.execute( + "INSERT INTO memories (content, category, tags, expanded_keywords, importance, is_sensitive, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (content, category, tags, expanded_keywords, importance, is_sensitive, now, now), + ) + self.sqlite_conn.commit() + return f"Stored memory #{cursor.lastrowid} in category '{category}' with importance {importance:.1f}" + + def _sqlite_recall(self, context: str, expanded_query: str, category: str | None, sort_by: str, limit: int) -> str: + assert self.sqlite_conn is not None + all_terms = f"{context} {expanded_query}".strip() + words = [w.replace(chr(34), "") for w in all_terms.split() if w] + and_query = " AND ".join(f'"{w}"' for w in words) + or_query = " OR ".join(f'"{w}"' for w in words) + + # Hybrid scoring: blend BM25 relevance with importance + # bm25() returns negative values (lower = better match), so negate it + order = ( + "(-bm25(memories_fts) * 0.7 + m.importance * 0.3) DESC" + if sort_by == "relevance" + else "(-bm25(memories_fts) * 0.4 + m.importance * 0.6) DESC" + ) + + base_select = ( + "SELECT m.id, m.content, m.category, m.tags, m.importance, m.created_at " + "FROM memories m JOIN memories_fts fts ON m.id = fts.rowid " + ) + cursor = self.sqlite_conn.cursor() + rows: list[Any] = [] + try: + # Try AND first for precise matches, fall back to OR for broader results + cat_filter = "AND m.category = ?" if category else "" + for fts_query in (and_query, or_query): + params = [fts_query, category, limit] if category else [fts_query, limit] + cursor.execute( + f"{base_select}WHERE memories_fts MATCH ? {cat_filter} ORDER BY {order} LIMIT ?", + tuple(p for p in params if p is not None), + ) + rows = cursor.fetchall() + if rows: + break + except sqlite3.OperationalError: + like = f"%{context}%" + if category: + cursor.execute( + "SELECT id, content, category, tags, importance, created_at FROM memories " + "WHERE (content LIKE ? OR tags LIKE ?) AND category = ? ORDER BY importance DESC LIMIT ?", + (like, like, category, limit), + ) + else: + cursor.execute( + "SELECT id, content, category, tags, importance, created_at FROM memories " + "WHERE content LIKE ? OR tags LIKE ? ORDER BY importance DESC LIMIT ?", + (like, like, limit), + ) + rows = cursor.fetchall() + + if not rows: + return f"No memories found matching: {context}" + + results = [] + for row in rows: + results.append( + f"#{row['id']} [{row['category']}] (importance: {row['importance']:.1f}) {row['content']}" + f"\n Tags: {row['tags'] or 'none'} | Stored: {row['created_at']}" + ) + return ( + f"Found {len(rows)} memories (by {'relevance' if sort_by == 'relevance' else 'importance'}):\n\n" + + "\n\n".join(results) + ) + + def _sqlite_list(self, category: str | None, limit: int) -> str: + assert self.sqlite_conn is not None + cursor = self.sqlite_conn.cursor() + if category: + cursor.execute( + "SELECT id, content, category, tags, importance, created_at FROM memories " + "WHERE category = ? ORDER BY created_at DESC LIMIT ?", + (category, limit), + ) + else: + cursor.execute( + "SELECT id, content, category, tags, importance, created_at FROM memories " + "ORDER BY created_at DESC LIMIT ?", + (limit,), + ) + rows = cursor.fetchall() + if not rows: + return f"No memories in category '{category}'" if category else "No memories stored yet" + + results = [] + for row in rows: + results.append( + f"#{row['id']} [{row['category']}] {row['content']}" + f"\n Importance: {row['importance']:.1f} | Tags: {row['tags'] or 'none'} | Stored: {row['created_at']}" + ) + header = "Recent memories" + (f" in '{category}'" if category else "") + return header + f" ({len(rows)} shown):\n\n" + "\n\n".join(results) + + def _sqlite_delete(self, memory_id: int) -> str: + assert self.sqlite_conn is not None + cursor = self.sqlite_conn.cursor() + cursor.execute("SELECT id, content FROM memories WHERE id = ?", (memory_id,)) + row = cursor.fetchone() + if not row: + return f"Memory #{memory_id} not found" + preview = row["content"][:50] + cursor.execute("DELETE FROM memories WHERE id = ?", (memory_id,)) + self.sqlite_conn.commit() + return f"Deleted memory #{memory_id}: {preview}..." + + def _sqlite_secret_get(self, memory_id: int) -> str: + assert self.sqlite_conn is not None + cursor = self.sqlite_conn.cursor() + cursor.execute( + "SELECT id, content, category, is_sensitive FROM memories WHERE id = ?", + (memory_id,), + ) + row = cursor.fetchone() + if not row: + return f"Memory #{memory_id} not found" + if not row["is_sensitive"]: + return f"Memory #{memory_id} is not marked as sensitive" + return f"#{row['id']} [{row['category']}] {row['content']}" + + # ── MCP protocol ───────────────────────────────────────────────── + + def handle_initialize(self, params: dict[str, Any]) -> dict[str, Any]: + return { + "protocolVersion": PROTOCOL_VERSION, + "capabilities": {"tools": {}}, + "serverInfo": {"name": SERVER_NAME, "version": SERVER_VERSION}, + } + + def handle_tools_list(self, params: dict[str, Any]) -> dict[str, Any]: + return {"tools": TOOLS} + + def handle_tools_call(self, params: dict[str, Any]) -> dict[str, Any]: + tool_name: str = params.get("name", "") + arguments: dict[str, Any] = params.get("arguments", {}) + try: + handler = { + "memory_store": self.memory_store, + "memory_recall": self.memory_recall, + "memory_list": self.memory_list, + "memory_delete": self.memory_delete, + "secret_get": self.secret_get, + "memory_count": self.memory_count, + }.get(tool_name) + if handler is None: + return {"content": [{"type": "text", "text": f"Unknown tool: {tool_name}"}], "isError": True} + result = handler(arguments) + return {"content": [{"type": "text", "text": result}]} + except Exception as e: + return {"content": [{"type": "text", "text": f"Error: {e!s}"}], "isError": True} + + def process_message(self, message: dict[str, Any]) -> dict[str, Any] | None: + method = message.get("method") + params = message.get("params", {}) + msg_id = message.get("id") + if msg_id is None: + return None + result = None + error = None + try: + if method == "initialize": + result = self.handle_initialize(params) + elif method == "tools/list": + result = self.handle_tools_list(params) + elif method == "tools/call": + result = self.handle_tools_call(params) + else: + error = {"code": -32601, "message": f"Method not found: {method}"} + except Exception as e: + error = {"code": -32603, "message": str(e)} + response: dict[str, Any] = {"jsonrpc": "2.0", "id": msg_id} + if error: + response["error"] = error + else: + response["result"] = result + return response + + def run(self) -> None: + try: + for line in sys.stdin: + line = line.strip() + if not line or line.startswith("Content-Length:"): + continue + try: + message = json.loads(line) + except json.JSONDecodeError: + continue + response = self.process_message(message) + if response is not None: + print(json.dumps(response), flush=True) + finally: + if self.sync_engine: + self.sync_engine.stop() + + +def main() -> None: + # Suppress all stderr output — MCP clients (e.g. Claude Code) may treat + # any stderr as a fatal error and refuse to load the server. + sys.stderr = open(os.devnull, "w") + logging.disable(logging.CRITICAL) + + server = MemoryServer() + server.run() + + +if __name__ == "__main__": + main() diff --git a/dot_claude/claude-memory/claude_memory/sync.py b/dot_claude/claude-memory/claude_memory/sync.py new file mode 100644 index 0000000..b4d1f99 --- /dev/null +++ b/dot_claude/claude-memory/claude_memory/sync.py @@ -0,0 +1,583 @@ +"""Background sync between local SQLite cache and remote API. + +Uses only stdlib — no pip install required. +""" + +import json +import logging +import sqlite3 +import threading +import urllib.error +import urllib.parse +import urllib.request +from typing import Any +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +# Max retries before an individual pending op is permanently skipped +MAX_OP_RETRIES = 5 + +# Full resync every N sync cycles (~10 min at 60s interval) +FULL_RESYNC_EVERY = 10 + + +class SyncEngine: + """Background sync between local SQLite cache and remote API.""" + + def __init__(self, db_path: str, api_base_url: str, api_key: str, sync_interval: int = 60): + self.db_path = db_path + self.api_base_url = api_base_url.rstrip("/") + self.api_key = api_key + self.sync_interval = sync_interval + + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._last_sync_success = False + self._auth_failed = False + + # Own connection for thread safety + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(db_path, timeout=30.0, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute("PRAGMA busy_timeout=30000") + self._lock = threading.Lock() + + self._init_sync_tables() + + def _init_sync_tables(self) -> None: + """Create sync-specific tables if they don't exist.""" + with self._lock: + self._conn.executescript(""" + CREATE TABLE IF NOT EXISTS pending_ops ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + op_type TEXT NOT NULL, + payload TEXT NOT NULL, + created_at TEXT NOT NULL, + retry_count INTEGER DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS sync_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ); + """) + # Add server_id column to memories if missing + cursor = self._conn.execute("PRAGMA table_info(memories)") + columns = {row["name"] for row in cursor.fetchall()} + if "server_id" not in columns: + self._conn.execute("ALTER TABLE memories ADD COLUMN server_id INTEGER") + self._conn.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_server_id ON memories(server_id)" + ) + + # Add retry_count column to pending_ops if missing (migration) + cursor = self._conn.execute("PRAGMA table_info(pending_ops)") + po_columns = {row["name"] for row in cursor.fetchall()} + if "retry_count" not in po_columns: + self._conn.execute("ALTER TABLE pending_ops ADD COLUMN retry_count INTEGER DEFAULT 0") + + self._conn.commit() + + @property + def last_sync_ts(self) -> str | None: + with self._lock: + cursor = self._conn.execute( + "SELECT value FROM sync_meta WHERE key = 'last_sync_ts'" + ) + row = cursor.fetchone() + return row["value"] if row else None + + @last_sync_ts.setter + def last_sync_ts(self, value: str) -> None: + with self._lock: + self._conn.execute( + "INSERT OR REPLACE INTO sync_meta (key, value) VALUES ('last_sync_ts', ?)", + (value,), + ) + self._conn.commit() + + @property + def api_available(self) -> bool: + return self._last_sync_success + + def start(self) -> None: + """Start background sync thread. Runs a full resync on startup.""" + # Full sync on startup (blocking, before background thread) + try: + self._full_resync() + self._last_sync_success = True + self._auth_failed = False + except Exception as e: + logger.warning("Startup full sync failed: %s", e) + + self._thread = threading.Thread(target=self._sync_loop, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Signal background thread to stop and wait.""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + self._conn.close() + + def _sync_loop(self) -> None: + """Periodic sync loop running in background thread.""" + cycle = 0 + while not self._stop_event.is_set(): + self._stop_event.wait(self.sync_interval) + if self._stop_event.is_set(): + break + cycle += 1 + try: + # If auth previously failed, try a lightweight check first + if self._auth_failed: + if not self._check_auth(): + continue # Still failing, skip this cycle + + if cycle % FULL_RESYNC_EVERY == 0: + self._full_resync() + else: + self._sync_once() + self._last_sync_success = True + except Exception as e: + logger.warning("Sync cycle failed: %s", e) + self._last_sync_success = False + + def _check_auth(self) -> bool: + """Lightweight auth check. Returns True if auth is OK.""" + try: + self._api_request("GET", "/api/auth-check") + self._auth_failed = False + logger.info("Auth check passed — resuming sync") + return True + except urllib.error.HTTPError as e: + if e.code in (401, 403): + logger.warning( + "Auth still failing (HTTP %d) — API key mismatch. " + "Update MEMORY_API_KEY in ~/.claude.json", e.code + ) + return False + # Non-auth error (e.g. 500) — try the auth-check endpoint might not exist, + # fall back to /health + pass + except Exception: + pass + + # Fallback: try /health (unauthenticated) + try: + url = f"{self.api_base_url}/health" + req = urllib.request.Request(url, method="GET") + with urllib.request.urlopen(req, timeout=5): + pass + # Server is reachable but auth-check failed — auth is still broken + return False + except Exception: + # Server unreachable — not an auth problem + return False + + def _sync_once(self) -> None: + """Push pending ops, then pull remote changes. Both run independently.""" + push_ok = self._push_pending_ops() + pull_ok = self._pull_changes() + if not push_ok and not pull_ok: + raise RuntimeError("Both push and pull failed") + + def _full_resync(self) -> None: + """Full cache replacement from server — handles drift, deletes, schema changes.""" + # Step 1: Push orphaned local-only records (deduplicated) + self._push_orphans() + + # Step 2: Pull everything from server (no since filter = non-deleted only) + result = self._api_request("GET", "/api/memories/sync") + memories = result.get("memories", []) + server_time = result.get("server_time") + server_ids = {m["id"] for m in memories} + + with self._lock: + # Delete local records whose server_id no longer exists on server + local_rows = self._conn.execute( + "SELECT id, server_id FROM memories WHERE server_id IS NOT NULL" + ).fetchall() + for row in local_rows: + if row["server_id"] not in server_ids: + self._conn.execute("DELETE FROM memories WHERE id = ?", (row["id"],)) + + # Delete remaining orphans (already pushed or duplicates) + self._conn.execute("DELETE FROM memories WHERE server_id IS NULL") + + # Upsert all server records + for mem in memories: + server_id = mem["id"] + existing = self._conn.execute( + "SELECT id FROM memories WHERE server_id = ?", (server_id,) + ).fetchone() + + if existing: + self._conn.execute( + """UPDATE memories SET content=?, category=?, tags=?, + expanded_keywords=?, importance=?, is_sensitive=?, + updated_at=? WHERE server_id=?""", + ( + mem["content"], mem["category"], mem.get("tags", ""), + mem.get("expanded_keywords", ""), mem["importance"], + 1 if mem.get("is_sensitive") else 0, + mem.get("updated_at", ""), server_id, + ), + ) + else: + self._conn.execute( + """INSERT INTO memories (content, category, tags, expanded_keywords, + importance, is_sensitive, created_at, updated_at, server_id) + VALUES (?,?,?,?,?,?,?,?,?)""", + ( + mem["content"], mem["category"], mem.get("tags", ""), + mem.get("expanded_keywords", ""), mem["importance"], + 1 if mem.get("is_sensitive") else 0, + mem.get("created_at", ""), mem.get("updated_at", ""), server_id, + ), + ) + + self._conn.commit() + + if server_time: + self.last_sync_ts = server_time + + def _push_orphans(self) -> None: + """Push local-only records to server, skipping content duplicates.""" + with self._lock: + orphans = self._conn.execute( + "SELECT id, content, category, tags, expanded_keywords, importance " + "FROM memories WHERE server_id IS NULL" + ).fetchall() + + if not orphans: + return + + # Get all server content for dedup comparison + result = self._api_request("GET", "/api/memories/sync") + server_contents = {m["content"] for m in result.get("memories", [])} + + for orphan in orphans: + if orphan["content"] in server_contents: + continue # Skip duplicate + try: + resp = self._api_request("POST", "/api/memories", { + "content": orphan["content"], + "category": orphan["category"], + "tags": orphan["tags"], + "expanded_keywords": orphan["expanded_keywords"], + "importance": orphan["importance"], + }) + server_id = resp.get("id") + if server_id: + with self._lock: + self._conn.execute( + "UPDATE memories SET server_id=? WHERE id=?", + (server_id, orphan["id"]), + ) + self._conn.commit() + except Exception: + pass # Will be cleaned up by the full resync delete step + + def _api_request(self, method: str, path: str, body: dict[str, Any] | None = None) -> dict[str, Any]: + """Make an HTTP request to the memory API.""" + url = f"{self.api_base_url}{path}" + data = json.dumps(body).encode() if body else None + req = urllib.request.Request( + url, + data=data, + method=method, + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + result: dict[str, Any] = json.loads(resp.read().decode()) + return result + except urllib.error.HTTPError as e: + if e.code in (401, 403): + self._auth_failed = True + logger.warning( + "Auth failed (HTTP %d) — API key may have rotated. " + "Update MEMORY_API_KEY in ~/.claude.json", e.code + ) + raise + + def _push_pending_ops(self) -> bool: + """Push queued operations to the API server. Returns True on success.""" + with self._lock: + cursor = self._conn.execute( + "SELECT id, op_type, payload, retry_count FROM pending_ops ORDER BY id" + ) + ops = cursor.fetchall() + + if not ops: + return True + + all_ok = True + for op in ops: + op_id = op["id"] + op_type = op["op_type"] + payload = json.loads(op["payload"]) + retry_count = op["retry_count"] or 0 + + # Skip ops that have exceeded retry limit + if retry_count >= MAX_OP_RETRIES: + logger.warning( + "Skipping op %d (%s) after %d retries — removing from queue", + op_id, op_type, retry_count, + ) + with self._lock: + self._conn.execute("DELETE FROM pending_ops WHERE id = ?", (op_id,)) + self._conn.commit() + continue + + try: + if op_type == "store": + result = self._api_request("POST", "/api/memories", payload) + server_id = result.get("id") + if server_id and payload.get("local_id"): + with self._lock: + self._conn.execute( + "UPDATE memories SET server_id = ? WHERE id = ?", + (server_id, payload["local_id"]), + ) + self._conn.commit() + elif op_type == "delete": + server_id = payload.get("server_id") + if server_id: + try: + self._api_request("DELETE", f"/api/memories/{server_id}") + except urllib.error.HTTPError as e: + if e.code == 404: + pass # Already deleted on server + else: + raise + + # Remove from pending queue on success + with self._lock: + self._conn.execute("DELETE FROM pending_ops WHERE id = ?", (op_id,)) + self._conn.commit() + + except urllib.error.HTTPError as e: + if e.code in (401, 403): + self._auth_failed = True + logger.warning("Auth failed (HTTP %d) — aborting push", e.code) + return False # Abort entire push — no point retrying with bad key + # Increment retry count for non-auth errors + with self._lock: + self._conn.execute( + "UPDATE pending_ops SET retry_count = retry_count + 1 WHERE id = ?", + (op_id,), + ) + self._conn.commit() + logger.warning("Failed to push op %d (%s): HTTP %d", op_id, op_type, e.code) + all_ok = False + except Exception as e: + with self._lock: + self._conn.execute( + "UPDATE pending_ops SET retry_count = retry_count + 1 WHERE id = ?", + (op_id,), + ) + self._conn.commit() + logger.warning("Failed to push op %d (%s): %s", op_id, op_type, e) + all_ok = False + + return all_ok + + def _pull_changes(self) -> bool: + """Pull changes from server since last sync. Returns True on success.""" + try: + params = "" + ts = self.last_sync_ts + if ts: + params = f"?since={urllib.parse.quote(ts, safe='')}" + + result = self._api_request("GET", f"/api/memories/sync{params}") + memories = result.get("memories", []) + server_time = result.get("server_time") + + with self._lock: + for mem in memories: + server_id = mem["id"] + deleted_at = mem.get("deleted_at") + + if deleted_at: + # Remove from local cache + self._conn.execute( + "DELETE FROM memories WHERE server_id = ?", (server_id,) + ) + else: + # Upsert by server_id (server wins) + existing = self._conn.execute( + "SELECT id FROM memories WHERE server_id = ?", (server_id,) + ).fetchone() + + if existing: + self._conn.execute( + """UPDATE memories SET content = ?, category = ?, tags = ?, + expanded_keywords = ?, importance = ?, is_sensitive = ?, + updated_at = ? WHERE server_id = ?""", + ( + mem["content"], + mem["category"], + mem.get("tags", ""), + mem.get("expanded_keywords", ""), + mem["importance"], + 1 if mem.get("is_sensitive") else 0, + mem.get("updated_at", datetime.now(timezone.utc).isoformat()), + server_id, + ), + ) + else: + self._conn.execute( + """INSERT INTO memories + (content, category, tags, expanded_keywords, importance, + is_sensitive, created_at, updated_at, server_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + mem["content"], + mem["category"], + mem.get("tags", ""), + mem.get("expanded_keywords", ""), + mem["importance"], + 1 if mem.get("is_sensitive") else 0, + mem.get("created_at", datetime.now(timezone.utc).isoformat()), + mem.get("updated_at", datetime.now(timezone.utc).isoformat()), + server_id, + ), + ) + self._conn.commit() + + if server_time: + self.last_sync_ts = server_time + return True + + except Exception as e: + logger.warning("Pull changes failed: %s", e) + return False + + def enqueue_store( + self, + local_id: int, + content: str, + category: str, + tags: str, + expanded_keywords: str, + importance: float, + force_sensitive: bool = False, + ) -> None: + """Queue a store operation for later sync.""" + payload = { + "local_id": local_id, + "content": content, + "category": category, + "tags": tags, + "expanded_keywords": expanded_keywords, + "importance": importance, + "force_sensitive": force_sensitive, + } + now = datetime.now(timezone.utc).isoformat() + with self._lock: + self._conn.execute( + "INSERT INTO pending_ops (op_type, payload, created_at) VALUES (?, ?, ?)", + ("store", json.dumps(payload), now), + ) + self._conn.commit() + + def enqueue_delete(self, server_id: int) -> None: + """Queue a delete operation for later sync.""" + payload = {"server_id": server_id} + now = datetime.now(timezone.utc).isoformat() + with self._lock: + self._conn.execute( + "INSERT INTO pending_ops (op_type, payload, created_at) VALUES (?, ?, ?)", + ("delete", json.dumps(payload), now), + ) + self._conn.commit() + + def try_sync_store( + self, + local_id: int, + content: str, + category: str, + tags: str, + expanded_keywords: str, + importance: float, + force_sensitive: bool = False, + ) -> int | None: + """Try to sync a store immediately. Returns server_id or None if failed.""" + if self._auth_failed: + self.enqueue_store( + local_id, content, category, tags, expanded_keywords, importance, force_sensitive + ) + return None + try: + result = self._api_request("POST", "/api/memories", { + "content": content, + "category": category, + "tags": tags, + "expanded_keywords": expanded_keywords, + "importance": importance, + "force_sensitive": force_sensitive, + }) + server_id = result.get("id") + if server_id: + with self._lock: + self._conn.execute( + "UPDATE memories SET server_id = ? WHERE id = ?", + (server_id, local_id), + ) + self._conn.commit() + return server_id + except Exception: + self.enqueue_store( + local_id, content, category, tags, expanded_keywords, importance, force_sensitive + ) + return None + + def try_sync_delete(self, server_id: int) -> bool: + """Try to sync a delete immediately. Returns True if successful.""" + if self._auth_failed: + self.enqueue_delete(server_id) + return False + try: + self._api_request("DELETE", f"/api/memories/{server_id}") + return True + except urllib.error.HTTPError as e: + if e.code == 404: + return True # Already deleted on server — not an error + self.enqueue_delete(server_id) + return False + except Exception: + self.enqueue_delete(server_id) + return False + + def get_counts(self) -> dict[str, Any]: + """Get memory counts for diagnostics.""" + with self._lock: + total = self._conn.execute("SELECT COUNT(*) as c FROM memories").fetchone()["c"] + by_cat = self._conn.execute( + "SELECT category, COUNT(*) as c FROM memories GROUP BY category ORDER BY c DESC" + ).fetchall() + orphans = self._conn.execute( + "SELECT COUNT(*) as c FROM memories WHERE server_id IS NULL" + ).fetchone()["c"] + pending = self._conn.execute( + "SELECT COUNT(*) as c FROM pending_ops" + ).fetchone()["c"] + + return { + "total": total, + "by_category": {row["category"]: row["c"] for row in by_cat}, + "orphans_no_server_id": orphans, + "pending_ops": pending, + "last_sync_ts": self.last_sync_ts, + "auth_failed": self._auth_failed, + "last_sync_success": self._last_sync_success, + } diff --git a/dot_local/bin/executable_claude-memory-mcp-wrapper b/dot_local/bin/executable_claude-memory-mcp-wrapper index 6d6718c..471cebe 100644 --- a/dot_local/bin/executable_claude-memory-mcp-wrapper +++ b/dot_local/bin/executable_claude-memory-mcp-wrapper @@ -3,5 +3,5 @@ # This avoids committing API keys in plaintext JSON configs. source ~/.oh-my-zsh/custom/secrets.zsh 2>/dev/null export MEMORY_API_URL="${MEMORY_API_URL:-https://claude-memory.viktorbarzin.me}" -export PYTHONPATH="/Users/viktorbarzin/code/claude-memory-mcp/src" -exec python3 /Users/viktorbarzin/code/claude-memory-mcp/src/claude_memory/mcp_server.py "$@" +export PYTHONPATH="$HOME/.claude/claude-memory" +exec python3 "$HOME/.claude/claude-memory/claude_memory/mcp_server.py" "$@"