From 94330755d8e40abdd332841cd6fb341567897d38 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Wed, 8 Apr 2026 10:55:21 +0000 Subject: [PATCH] feat: add streamable-http MCP transport alongside SSE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SSE transport has reliability issues through Cloudflare/Traefik proxies (connections drop, causing init failures on reconnect). Streamable HTTP is stateless — each request carries its own session, avoiding persistent connection issues. New endpoint: POST/GET/DELETE /mcp/mcp (streamable-http) Existing: GET /mcp/sse + POST /mcp/messages/ (SSE, unchanged) Co-Authored-By: Claude Opus 4.6 (1M context) --- src/claude_memory/api/app.py | 49 +++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/src/claude_memory/api/app.py b/src/claude_memory/api/app.py index c912dd2..69096da 100644 --- a/src/claude_memory/api/app.py +++ b/src/claude_memory/api/app.py @@ -15,6 +15,7 @@ from fastapi.responses import Response from fastapi.staticfiles import StaticFiles from mcp.server.fastmcp import FastMCP from mcp.server.sse import SseServerTransport +from mcp.server.streamable_http import StreamableHTTPServerTransport from starlette.routing import Mount, Route from starlette.types import ASGIApp, Receive, Scope, Send @@ -1296,11 +1297,57 @@ class HandleSSE: ) +# Streamable HTTP transport — stateless, no persistent SSE connection needed. +# Each request carries its own init+tool call. More reliable through proxies. +class HandleStreamableHTTP: + """ASGI app for streamable-http MCP connections.""" + def __init__(self) -> None: + self._transport: StreamableHTTPServerTransport | None = None + + async def __call__(self, scope: Any, receive: Any, send: Any) -> None: + user_id = "default" + for name, value in scope.get("headers", []): + if name == b"authorization": + token = value.decode().removeprefix("Bearer ").strip() + resolved = _resolve_user_from_token(token) + if resolved: + user_id = resolved + break + _current_user.set(user_id) + + session_id = None + for name, value in scope.get("headers", []): + if name == b"mcp-session-id": + session_id = value.decode() + break + + transport = StreamableHTTPServerTransport( + mcp_session_id=session_id, + is_json_response_enabled=True, + ) + async with transport.connect() as (read_stream, write_stream): + import anyio + async with anyio.create_task_group() as tg: + async def run_server() -> None: + await mcp_server._mcp_server.run( + read_stream, write_stream, + mcp_server._mcp_server.create_initialization_options(), + ) + + tg.start_soon(run_server) + await transport.handle_request(scope, receive, send) + tg.cancel_scope.cancel() + + +streamable_handler = HandleStreamableHTTP() + # Static files for UI (before MCP mount) app.mount("/static", StaticFiles(directory=UI_DIR), name="static") -# Client connects to /mcp/sse, posts to /mcp/messages/ +# Client connects to /mcp/sse, posts to /mcp/messages/ (SSE transport) +# Client can also POST to /mcp/mcp (streamable-http transport) app.router.routes.insert(0, Mount("/mcp", routes=[ Route("/sse", endpoint=HandleSSE()), Mount("/messages", app=sse_transport.handle_post_message), + Route("/mcp", endpoint=streamable_handler, methods=["GET", "POST", "DELETE"]), ]))