chat-completions: stream conversational turns (SSE token relay) for realtime voice
Some checks failed
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build (push) Has been cancelled
Build and Push / deploy (push) Has been cancelled
Build and Push / notify-failure (push) Has been cancelled

Adds stream=true support to POST /v1/chat/completions (it previously 400'd).
When streaming, it runs the no-tools `conversational` agent via
`claude -p --output-format stream-json --include-partial-messages --verbose`
and relays each content_block_delta as an OpenAI chat.completion.chunk SSE
event, ending with finish_reason=stop + [DONE]. Free CLI/subscription auth, no
tools, no API key.

Stateless by design: the full message history is flattened into the prompt
(prior assistant turns kept), so an OpenAI-style client that re-sends history
each turn — e.g. Pipecat's OpenAILLMService — can stream from us directly. The
non-streaming path (recruiter-triage workspace agent) is unchanged.

This is phase 1 of the Pipecat realtime full-duplex voice-agent rebuild for
portal-assistant (continuous audio, VAD endpointing, barge-in, ~seconds to
first words). New pure helpers (stream_argv/delta_text/openai_chunk/
synthesise_chat_prompt) are unit-tested; the SSE endpoint has a mocked-subprocess
integration test. 429 passing.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-17 22:22:38 +00:00
parent 4e48214c0b
commit a29bffdda3
4 changed files with 304 additions and 8 deletions

View file

@ -96,3 +96,110 @@ async def run_turn(session_id: str, message: str, model: str) -> dict:
"reply": extract_reply(output_lines),
"stderr": stderr.decode(errors="replace"),
}
# ---------------------------------------------------------------------------
# Streaming (OpenAI-compatible) path — token-level deltas for the realtime
# voice agent. Pipecat's OpenAILLMService streams from /v1/chat/completions and
# re-sends the FULL history each turn, so this path is STATELESS: the whole
# dialogue goes in the prompt and we run a fresh CLI with stream-json to relay
# incremental tokens as OpenAI chat-completion SSE chunks. (run_turn above stays
# the session-based path for the non-streaming gateway.)
# ---------------------------------------------------------------------------
def stream_argv(prompt: str, model: str) -> list[str]:
"""Argv for a STREAMING conversational turn (token deltas via stream-json).
Stateless the full conversation is in `prompt` (no --session-id/--resume).
`--include-partial-messages` makes the CLI emit `content_block_delta` token
events; `--verbose` is required by the CLI for stream-json under --print. No
--dangerously-skip-permissions: the conversational agent has no tools.
"""
return [
"claude", "-p",
"--agent", CONVERSATIONAL_AGENT,
"--model", model,
"--output-format", "stream-json",
"--include-partial-messages",
"--verbose",
prompt,
]
def delta_text(line: str) -> str | None:
"""Extract the incremental assistant text from one stream-json line.
Returns the text of a `content_block_delta` / `text_delta` event, or None
for any other event (system, message_start, content_block_stop, result) or
an unparseable line.
"""
line = line.strip()
if not line:
return None
try:
event = json.loads(line)
except json.JSONDecodeError:
return None
if not isinstance(event, dict) or event.get("type") != "stream_event":
return None
inner = event.get("event") or {}
if inner.get("type") != "content_block_delta":
return None
delta = inner.get("delta") or {}
if delta.get("type") == "text_delta":
return delta.get("text") or None
return None
def openai_chunk(
completion_id: str,
model: str,
created: int,
*,
role: str | None = None,
content: str | None = None,
finish_reason: str | None = None,
) -> str:
"""Format one OpenAI `chat.completion.chunk` as an SSE `data:` line.
ensure_ascii=False keeps Cyrillic (Bulgarian) intact on the wire.
"""
delta: dict[str, str] = {}
if role is not None:
delta["role"] = role
if content is not None:
delta["content"] = content
payload = {
"id": completion_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return "data: " + json.dumps(payload, ensure_ascii=False) + "\n\n"
def synthesise_chat_prompt(messages) -> str:
"""Flatten OpenAI chat messages into a dialogue prompt for the conversational
agent, KEEPING prior assistant turns.
Pipecat re-sends the full message history every call, so multi-turn context
is preserved here (statelessly) by replaying the dialogue. Each message is a
duck-typed object with `.role` and `.content`. System messages become a
preamble; user/assistant turns are rendered as a `User:`/`Assistant:`
dialogue ending on the latest user turn.
"""
system = [m.content for m in messages if m.role == "system" and m.content]
turns = []
for m in messages:
if m.role == "user" and m.content:
turns.append("User: " + m.content)
elif m.role == "assistant" and m.content:
turns.append("Assistant: " + m.content)
parts = []
if system:
parts.append("\n\n".join(system))
if turns:
parts.append("\n".join(turns))
return "\n\n".join(parts).strip()

View file

@ -2,6 +2,8 @@ import asyncio
import hmac
import json
import os
import shutil
import tempfile
import time
import uuid
from contextlib import asynccontextmanager
@ -10,7 +12,7 @@ from subprocess import PIPE
from typing import Any, Literal
from fastapi import FastAPI, HTTPException, Header
from fastapi.responses import JSONResponse
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from app import conversational
@ -446,9 +448,6 @@ async def chat_completions(
):
verify_token(authorization)
if request.stream:
raise HTTPException(status_code=400, detail="streaming not supported")
model = request.model if request.model is not None else DEFAULT_MODEL
if model not in SUPPORTED_MODELS:
return JSONResponse(
@ -459,6 +458,64 @@ async def chat_completions(
},
)
# Streaming path (the realtime voice agent / Pipecat). Token-level deltas via
# the conversational (no-tools) agent in stream-json mode, relayed as
# OpenAI chat.completion.chunk SSE. Stateless: the full history is in the
# prompt (the client re-sends it each turn). No workspace clone — the
# conversational agent reads no files.
if request.stream:
if not _reserve_queue_slot():
return JSONResponse(
status_code=503,
content={"error": "execution failed", "detail": "queue full"},
)
prompt = conversational.synthesise_chat_prompt(request.messages)
completion_id = "chatcmpl-" + uuid.uuid4().hex[:24]
created = int(time.time())
spawn = asyncio.create_subprocess_exec # bound alias (keeps subprocess use tidy)
async def event_stream():
workspace = tempfile.mkdtemp(prefix="conv-stream-")
proc = None
try:
async with _execution_slot():
proc = await spawn(
*conversational.stream_argv(prompt, model),
cwd=workspace, stdout=PIPE, stderr=PIPE,
)
assert proc.stdout is not None
yield conversational.openai_chunk(
completion_id, model, created, role="assistant"
)
try:
async with asyncio.timeout(
conversational.CONVERSATIONAL_TIMEOUT_SECONDS
):
async for raw in proc.stdout:
text = conversational.delta_text(
raw.decode(errors="replace")
)
if text:
yield conversational.openai_chunk(
completion_id, model, created, content=text
)
except asyncio.TimeoutError:
pass # wedged turn — close the stream cleanly
yield conversational.openai_chunk(
completion_id, model, created, finish_reason="stop"
)
yield "data: [DONE]\n\n"
finally:
if proc is not None and proc.returncode is None:
try:
proc.kill()
await proc.wait()
except ProcessLookupError:
pass
shutil.rmtree(workspace, ignore_errors=True)
return StreamingResponse(event_stream(), media_type="text/event-stream")
prompt = _synthesise_prompt(request.messages)
if not _reserve_queue_slot():

View file

@ -171,3 +171,79 @@ async def test_conversational_returns_503_on_failure(auth_header):
)
assert r.status_code == 503
assert r.json()["error"] == "execution failed"
# --------------------------------------------------------------------------- #
# streaming helpers (OpenAI-compatible token relay for the realtime voice agent)
# --------------------------------------------------------------------------- #
from collections import namedtuple # noqa: E402
_Msg = namedtuple("_Msg", "role content")
def test_stream_argv_uses_stream_json_and_is_stateless():
argv = conversational.stream_argv("hello", "sonnet")
assert argv[:2] == ["claude", "-p"]
assert "--agent" in argv and "conversational" in argv
assert "stream-json" in argv
assert "--include-partial-messages" in argv
assert "--verbose" in argv
assert "--model" in argv and "sonnet" in argv
assert argv[-1] == "hello"
# stateless + no tools
assert "--resume" not in argv and "--session-id" not in argv
assert "--dangerously-skip-permissions" not in argv
def test_delta_text_extracts_content_block_delta():
line = json.dumps({
"type": "stream_event",
"event": {"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "Слон"}},
})
assert conversational.delta_text(line) == "Слон"
def test_delta_text_ignores_non_text_events():
for ev in [
{"type": "system"},
{"type": "stream_event", "event": {"type": "message_start"}},
{"type": "stream_event", "event": {"type": "content_block_delta",
"delta": {"type": "input_json_delta", "partial_json": "{"}}},
{"type": "result"},
]:
assert conversational.delta_text(json.dumps(ev)) is None
assert conversational.delta_text("") is None
assert conversational.delta_text("not json") is None
def test_openai_chunk_valid_sse_and_keeps_cyrillic():
s = conversational.openai_chunk("chatcmpl-x", "sonnet", 123, content="две")
assert s.startswith("data: ") and s.endswith("\n\n")
payload = json.loads(s[len("data: "):].strip())
assert payload["object"] == "chat.completion.chunk"
assert payload["choices"][0]["delta"]["content"] == "две"
assert payload["choices"][0]["finish_reason"] is None
assert "две" in s # not unicode-escaped
def test_openai_chunk_role_and_finish():
role = conversational.openai_chunk("id", "m", 1, role="assistant")
assert json.loads(role[6:].strip())["choices"][0]["delta"] == {"role": "assistant"}
stop = conversational.openai_chunk("id", "m", 1, finish_reason="stop")
c = json.loads(stop[6:].strip())["choices"][0]
assert c["finish_reason"] == "stop" and c["delta"] == {}
def test_synthesise_chat_prompt_keeps_assistant_turns():
msgs = [
_Msg("system", "Be brief."),
_Msg("user", "Здравей"),
_Msg("assistant", "Здравей! Как си?"),
_Msg("user", "Добре, ти?"),
]
p = conversational.synthesise_chat_prompt(msgs)
assert "Be brief." in p
assert "User: Здравей" in p
assert "Assistant: Здравей! Как си?" in p
assert p.strip().endswith("User: Добре, ти?")

View file

@ -98,14 +98,15 @@ async def test_chat_completions_happy_path(auth_header):
@pytest.mark.asyncio
async def test_chat_completions_rejects_streaming(auth_header):
"""stream=true is not supported and must 400 with a clear message."""
async def test_chat_completions_streaming_rejects_unsupported_model(auth_header):
"""Streaming is supported now; model validation still runs first, so an
unsupported model 400s before any CLI is spawned."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/v1/chat/completions",
json={
"model": "haiku",
"model": "gpt-4",
"messages": [{"role": "user", "content": "hi"}],
"stream": True,
},
@ -113,7 +114,7 @@ async def test_chat_completions_rejects_streaming(auth_header):
)
assert response.status_code == 400
body = response.json()
assert "streaming not supported" in json.dumps(body).lower()
assert "unsupported model" in json.dumps(body).lower()
@pytest.mark.asyncio
@ -370,3 +371,58 @@ async def test_chat_completions_response_model_echoes_default_when_missing(auth_
)
assert status == 200
assert body["model"] == "sonnet"
def _delta_line(text: str) -> str:
return json.dumps({
"type": "stream_event",
"event": {"type": "content_block_delta",
"delta": {"type": "text_delta", "text": text}},
})
@pytest.mark.asyncio
async def test_chat_completions_streaming_relays_token_sse(auth_header):
"""stream=true relays CLI stream-json token deltas as OpenAI SSE chunks."""
cli_output = "\n".join([
json.dumps({"type": "system"}),
json.dumps({"type": "stream_event", "event": {"type": "message_start"}}),
_delta_line("Две"),
_delta_line(" точки."),
json.dumps({"type": "result", "subtype": "success"}),
]).encode()
mock_proc = _mock_subprocess_returning(cli_output, returncode=0)
with patch("app.main.asyncio.create_subprocess_exec", return_value=mock_proc):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/v1/chat/completions",
json={
"model": "sonnet",
"stream": True,
"messages": [{"role": "user", "content": "Колко е?"}],
},
headers=auth_header,
)
assert response.status_code == 200, response.text
assert response.headers["content-type"].startswith("text/event-stream")
body = response.text
assert "chat.completion.chunk" in body
assert body.rstrip().endswith("data: [DONE]")
# Reassemble the streamed assistant content from the delta chunks.
content = ""
saw_role = False
for line in body.splitlines():
if not line.startswith("data: ") or line.strip() == "data: [DONE]":
continue
payload = json.loads(line[len("data: "):])
assert payload["object"] == "chat.completion.chunk"
delta = payload["choices"][0]["delta"]
if delta.get("role") == "assistant":
saw_role = True
content += delta.get("content", "")
assert saw_role
assert content == "Две точки."