From a29bffdda3cbacae9d7855f982562b7563f0c0dd Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Wed, 17 Jun 2026 22:22:38 +0000 Subject: [PATCH] chat-completions: stream conversational turns (SSE token relay) for realtime voice MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds stream=true support to POST /v1/chat/completions (it previously 400'd). When streaming, it runs the no-tools `conversational` agent via `claude -p --output-format stream-json --include-partial-messages --verbose` and relays each content_block_delta as an OpenAI chat.completion.chunk SSE event, ending with finish_reason=stop + [DONE]. Free CLI/subscription auth, no tools, no API key. Stateless by design: the full message history is flattened into the prompt (prior assistant turns kept), so an OpenAI-style client that re-sends history each turn — e.g. Pipecat's OpenAILLMService — can stream from us directly. The non-streaming path (recruiter-triage workspace agent) is unchanged. This is phase 1 of the Pipecat realtime full-duplex voice-agent rebuild for portal-assistant (continuous audio, VAD endpointing, barge-in, ~seconds to first words). New pure helpers (stream_argv/delta_text/openai_chunk/ synthesise_chat_prompt) are unit-tested; the SSE endpoint has a mocked-subprocess integration test. 429 passing. Co-Authored-By: Claude Opus 4.8 --- app/conversational.py | 107 +++++++++++++++++++++++++++++++++++ app/main.py | 65 +++++++++++++++++++-- tests/test_conversational.py | 76 +++++++++++++++++++++++++ tests/test_openai_compat.py | 64 +++++++++++++++++++-- 4 files changed, 304 insertions(+), 8 deletions(-) diff --git a/app/conversational.py b/app/conversational.py index 92da758..a7d21b7 100644 --- a/app/conversational.py +++ b/app/conversational.py @@ -96,3 +96,110 @@ async def run_turn(session_id: str, message: str, model: str) -> dict: "reply": extract_reply(output_lines), "stderr": stderr.decode(errors="replace"), } + + +# --------------------------------------------------------------------------- +# Streaming (OpenAI-compatible) path — token-level deltas for the realtime +# voice agent. Pipecat's OpenAILLMService streams from /v1/chat/completions and +# re-sends the FULL history each turn, so this path is STATELESS: the whole +# dialogue goes in the prompt and we run a fresh CLI with stream-json to relay +# incremental tokens as OpenAI chat-completion SSE chunks. (run_turn above stays +# the session-based path for the non-streaming gateway.) +# --------------------------------------------------------------------------- + + +def stream_argv(prompt: str, model: str) -> list[str]: + """Argv for a STREAMING conversational turn (token deltas via stream-json). + + Stateless — the full conversation is in `prompt` (no --session-id/--resume). + `--include-partial-messages` makes the CLI emit `content_block_delta` token + events; `--verbose` is required by the CLI for stream-json under --print. No + --dangerously-skip-permissions: the conversational agent has no tools. + """ + return [ + "claude", "-p", + "--agent", CONVERSATIONAL_AGENT, + "--model", model, + "--output-format", "stream-json", + "--include-partial-messages", + "--verbose", + prompt, + ] + + +def delta_text(line: str) -> str | None: + """Extract the incremental assistant text from one stream-json line. + + Returns the text of a `content_block_delta` / `text_delta` event, or None + for any other event (system, message_start, content_block_stop, result) or + an unparseable line. + """ + line = line.strip() + if not line: + return None + try: + event = json.loads(line) + except json.JSONDecodeError: + return None + if not isinstance(event, dict) or event.get("type") != "stream_event": + return None + inner = event.get("event") or {} + if inner.get("type") != "content_block_delta": + return None + delta = inner.get("delta") or {} + if delta.get("type") == "text_delta": + return delta.get("text") or None + return None + + +def openai_chunk( + completion_id: str, + model: str, + created: int, + *, + role: str | None = None, + content: str | None = None, + finish_reason: str | None = None, +) -> str: + """Format one OpenAI `chat.completion.chunk` as an SSE `data:` line. + + ensure_ascii=False keeps Cyrillic (Bulgarian) intact on the wire. + """ + delta: dict[str, str] = {} + if role is not None: + delta["role"] = role + if content is not None: + delta["content"] = content + payload = { + "id": completion_id, + "object": "chat.completion.chunk", + "created": created, + "model": model, + "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}], + } + return "data: " + json.dumps(payload, ensure_ascii=False) + "\n\n" + + +def synthesise_chat_prompt(messages) -> str: + """Flatten OpenAI chat messages into a dialogue prompt for the conversational + agent, KEEPING prior assistant turns. + + Pipecat re-sends the full message history every call, so multi-turn context + is preserved here (statelessly) by replaying the dialogue. Each message is a + duck-typed object with `.role` and `.content`. System messages become a + preamble; user/assistant turns are rendered as a `User:`/`Assistant:` + dialogue ending on the latest user turn. + """ + system = [m.content for m in messages if m.role == "system" and m.content] + turns = [] + for m in messages: + if m.role == "user" and m.content: + turns.append("User: " + m.content) + elif m.role == "assistant" and m.content: + turns.append("Assistant: " + m.content) + parts = [] + if system: + parts.append("\n\n".join(system)) + if turns: + parts.append("\n".join(turns)) + return "\n\n".join(parts).strip() diff --git a/app/main.py b/app/main.py index 37a3eb8..33c16f8 100644 --- a/app/main.py +++ b/app/main.py @@ -2,6 +2,8 @@ import asyncio import hmac import json import os +import shutil +import tempfile import time import uuid from contextlib import asynccontextmanager @@ -10,7 +12,7 @@ from subprocess import PIPE from typing import Any, Literal from fastapi import FastAPI, HTTPException, Header -from fastapi.responses import JSONResponse +from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field from app import conversational @@ -446,9 +448,6 @@ async def chat_completions( ): verify_token(authorization) - if request.stream: - raise HTTPException(status_code=400, detail="streaming not supported") - model = request.model if request.model is not None else DEFAULT_MODEL if model not in SUPPORTED_MODELS: return JSONResponse( @@ -459,6 +458,64 @@ async def chat_completions( }, ) + # Streaming path (the realtime voice agent / Pipecat). Token-level deltas via + # the conversational (no-tools) agent in stream-json mode, relayed as + # OpenAI chat.completion.chunk SSE. Stateless: the full history is in the + # prompt (the client re-sends it each turn). No workspace clone — the + # conversational agent reads no files. + if request.stream: + if not _reserve_queue_slot(): + return JSONResponse( + status_code=503, + content={"error": "execution failed", "detail": "queue full"}, + ) + prompt = conversational.synthesise_chat_prompt(request.messages) + completion_id = "chatcmpl-" + uuid.uuid4().hex[:24] + created = int(time.time()) + spawn = asyncio.create_subprocess_exec # bound alias (keeps subprocess use tidy) + + async def event_stream(): + workspace = tempfile.mkdtemp(prefix="conv-stream-") + proc = None + try: + async with _execution_slot(): + proc = await spawn( + *conversational.stream_argv(prompt, model), + cwd=workspace, stdout=PIPE, stderr=PIPE, + ) + assert proc.stdout is not None + yield conversational.openai_chunk( + completion_id, model, created, role="assistant" + ) + try: + async with asyncio.timeout( + conversational.CONVERSATIONAL_TIMEOUT_SECONDS + ): + async for raw in proc.stdout: + text = conversational.delta_text( + raw.decode(errors="replace") + ) + if text: + yield conversational.openai_chunk( + completion_id, model, created, content=text + ) + except asyncio.TimeoutError: + pass # wedged turn — close the stream cleanly + yield conversational.openai_chunk( + completion_id, model, created, finish_reason="stop" + ) + yield "data: [DONE]\n\n" + finally: + if proc is not None and proc.returncode is None: + try: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass + shutil.rmtree(workspace, ignore_errors=True) + + return StreamingResponse(event_stream(), media_type="text/event-stream") + prompt = _synthesise_prompt(request.messages) if not _reserve_queue_slot(): diff --git a/tests/test_conversational.py b/tests/test_conversational.py index 057e8b9..cce63ed 100644 --- a/tests/test_conversational.py +++ b/tests/test_conversational.py @@ -171,3 +171,79 @@ async def test_conversational_returns_503_on_failure(auth_header): ) assert r.status_code == 503 assert r.json()["error"] == "execution failed" + + +# --------------------------------------------------------------------------- # +# streaming helpers (OpenAI-compatible token relay for the realtime voice agent) +# --------------------------------------------------------------------------- # +from collections import namedtuple # noqa: E402 + +_Msg = namedtuple("_Msg", "role content") + + +def test_stream_argv_uses_stream_json_and_is_stateless(): + argv = conversational.stream_argv("hello", "sonnet") + assert argv[:2] == ["claude", "-p"] + assert "--agent" in argv and "conversational" in argv + assert "stream-json" in argv + assert "--include-partial-messages" in argv + assert "--verbose" in argv + assert "--model" in argv and "sonnet" in argv + assert argv[-1] == "hello" + # stateless + no tools + assert "--resume" not in argv and "--session-id" not in argv + assert "--dangerously-skip-permissions" not in argv + + +def test_delta_text_extracts_content_block_delta(): + line = json.dumps({ + "type": "stream_event", + "event": {"type": "content_block_delta", + "delta": {"type": "text_delta", "text": "Слон"}}, + }) + assert conversational.delta_text(line) == "Слон" + + +def test_delta_text_ignores_non_text_events(): + for ev in [ + {"type": "system"}, + {"type": "stream_event", "event": {"type": "message_start"}}, + {"type": "stream_event", "event": {"type": "content_block_delta", + "delta": {"type": "input_json_delta", "partial_json": "{"}}}, + {"type": "result"}, + ]: + assert conversational.delta_text(json.dumps(ev)) is None + assert conversational.delta_text("") is None + assert conversational.delta_text("not json") is None + + +def test_openai_chunk_valid_sse_and_keeps_cyrillic(): + s = conversational.openai_chunk("chatcmpl-x", "sonnet", 123, content="две") + assert s.startswith("data: ") and s.endswith("\n\n") + payload = json.loads(s[len("data: "):].strip()) + assert payload["object"] == "chat.completion.chunk" + assert payload["choices"][0]["delta"]["content"] == "две" + assert payload["choices"][0]["finish_reason"] is None + assert "две" in s # not unicode-escaped + + +def test_openai_chunk_role_and_finish(): + role = conversational.openai_chunk("id", "m", 1, role="assistant") + assert json.loads(role[6:].strip())["choices"][0]["delta"] == {"role": "assistant"} + stop = conversational.openai_chunk("id", "m", 1, finish_reason="stop") + c = json.loads(stop[6:].strip())["choices"][0] + assert c["finish_reason"] == "stop" and c["delta"] == {} + + +def test_synthesise_chat_prompt_keeps_assistant_turns(): + msgs = [ + _Msg("system", "Be brief."), + _Msg("user", "Здравей"), + _Msg("assistant", "Здравей! Как си?"), + _Msg("user", "Добре, ти?"), + ] + p = conversational.synthesise_chat_prompt(msgs) + assert "Be brief." in p + assert "User: Здравей" in p + assert "Assistant: Здравей! Как си?" in p + assert p.strip().endswith("User: Добре, ти?") diff --git a/tests/test_openai_compat.py b/tests/test_openai_compat.py index 3441972..2716c67 100644 --- a/tests/test_openai_compat.py +++ b/tests/test_openai_compat.py @@ -98,14 +98,15 @@ async def test_chat_completions_happy_path(auth_header): @pytest.mark.asyncio -async def test_chat_completions_rejects_streaming(auth_header): - """stream=true is not supported and must 400 with a clear message.""" +async def test_chat_completions_streaming_rejects_unsupported_model(auth_header): + """Streaming is supported now; model validation still runs first, so an + unsupported model 400s before any CLI is spawned.""" transport = ASGITransport(app=app) async with AsyncClient(transport=transport, base_url="http://test") as client: response = await client.post( "/v1/chat/completions", json={ - "model": "haiku", + "model": "gpt-4", "messages": [{"role": "user", "content": "hi"}], "stream": True, }, @@ -113,7 +114,7 @@ async def test_chat_completions_rejects_streaming(auth_header): ) assert response.status_code == 400 body = response.json() - assert "streaming not supported" in json.dumps(body).lower() + assert "unsupported model" in json.dumps(body).lower() @pytest.mark.asyncio @@ -370,3 +371,58 @@ async def test_chat_completions_response_model_echoes_default_when_missing(auth_ ) assert status == 200 assert body["model"] == "sonnet" + + +def _delta_line(text: str) -> str: + return json.dumps({ + "type": "stream_event", + "event": {"type": "content_block_delta", + "delta": {"type": "text_delta", "text": text}}, + }) + + +@pytest.mark.asyncio +async def test_chat_completions_streaming_relays_token_sse(auth_header): + """stream=true relays CLI stream-json token deltas as OpenAI SSE chunks.""" + cli_output = "\n".join([ + json.dumps({"type": "system"}), + json.dumps({"type": "stream_event", "event": {"type": "message_start"}}), + _delta_line("Две"), + _delta_line(" точки."), + json.dumps({"type": "result", "subtype": "success"}), + ]).encode() + mock_proc = _mock_subprocess_returning(cli_output, returncode=0) + + with patch("app.main.asyncio.create_subprocess_exec", return_value=mock_proc): + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + response = await client.post( + "/v1/chat/completions", + json={ + "model": "sonnet", + "stream": True, + "messages": [{"role": "user", "content": "Колко е?"}], + }, + headers=auth_header, + ) + + assert response.status_code == 200, response.text + assert response.headers["content-type"].startswith("text/event-stream") + body = response.text + assert "chat.completion.chunk" in body + assert body.rstrip().endswith("data: [DONE]") + + # Reassemble the streamed assistant content from the delta chunks. + content = "" + saw_role = False + for line in body.splitlines(): + if not line.startswith("data: ") or line.strip() == "data: [DONE]": + continue + payload = json.loads(line[len("data: "):]) + assert payload["object"] == "chat.completion.chunk" + delta = payload["choices"][0]["delta"] + if delta.get("role") == "assistant": + saw_role = True + content += delta.get("content", "") + assert saw_role + assert content == "Две точки."