feat: add streamable-http MCP transport alongside SSE
SSE transport has reliability issues through Cloudflare/Traefik proxies (connections drop, causing init failures on reconnect). Streamable HTTP is stateless — each request carries its own session, avoiding persistent connection issues. New endpoint: POST/GET/DELETE /mcp/mcp (streamable-http) Existing: GET /mcp/sse + POST /mcp/messages/ (SSE, unchanged) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4d7988b6ac
commit
94330755d8
1 changed files with 48 additions and 1 deletions
|
|
@ -15,6 +15,7 @@ from fastapi.responses import Response
|
|||
from fastapi.staticfiles import StaticFiles
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
from mcp.server.sse import SseServerTransport
|
||||
from mcp.server.streamable_http import StreamableHTTPServerTransport
|
||||
from starlette.routing import Mount, Route
|
||||
from starlette.types import ASGIApp, Receive, Scope, Send
|
||||
|
||||
|
|
@ -1296,11 +1297,57 @@ class HandleSSE:
|
|||
)
|
||||
|
||||
|
||||
# Streamable HTTP transport — stateless, no persistent SSE connection needed.
|
||||
# Each request carries its own init+tool call. More reliable through proxies.
|
||||
class HandleStreamableHTTP:
|
||||
"""ASGI app for streamable-http MCP connections."""
|
||||
def __init__(self) -> None:
|
||||
self._transport: StreamableHTTPServerTransport | None = None
|
||||
|
||||
async def __call__(self, scope: Any, receive: Any, send: Any) -> None:
|
||||
user_id = "default"
|
||||
for name, value in scope.get("headers", []):
|
||||
if name == b"authorization":
|
||||
token = value.decode().removeprefix("Bearer ").strip()
|
||||
resolved = _resolve_user_from_token(token)
|
||||
if resolved:
|
||||
user_id = resolved
|
||||
break
|
||||
_current_user.set(user_id)
|
||||
|
||||
session_id = None
|
||||
for name, value in scope.get("headers", []):
|
||||
if name == b"mcp-session-id":
|
||||
session_id = value.decode()
|
||||
break
|
||||
|
||||
transport = StreamableHTTPServerTransport(
|
||||
mcp_session_id=session_id,
|
||||
is_json_response_enabled=True,
|
||||
)
|
||||
async with transport.connect() as (read_stream, write_stream):
|
||||
import anyio
|
||||
async with anyio.create_task_group() as tg:
|
||||
async def run_server() -> None:
|
||||
await mcp_server._mcp_server.run(
|
||||
read_stream, write_stream,
|
||||
mcp_server._mcp_server.create_initialization_options(),
|
||||
)
|
||||
|
||||
tg.start_soon(run_server)
|
||||
await transport.handle_request(scope, receive, send)
|
||||
tg.cancel_scope.cancel()
|
||||
|
||||
|
||||
streamable_handler = HandleStreamableHTTP()
|
||||
|
||||
# Static files for UI (before MCP mount)
|
||||
app.mount("/static", StaticFiles(directory=UI_DIR), name="static")
|
||||
|
||||
# Client connects to /mcp/sse, posts to /mcp/messages/
|
||||
# Client connects to /mcp/sse, posts to /mcp/messages/ (SSE transport)
|
||||
# Client can also POST to /mcp/mcp (streamable-http transport)
|
||||
app.router.routes.insert(0, Mount("/mcp", routes=[
|
||||
Route("/sse", endpoint=HandleSSE()),
|
||||
Mount("/messages", app=sse_transport.handle_post_message),
|
||||
Route("/mcp", endpoint=streamable_handler, methods=["GET", "POST", "DELETE"]),
|
||||
]))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue