conversational: add no-tools multi-turn Brain endpoint for portal-assistant

The portal-assistant voice gateway needs a Claude that is conversational, free
(on the cluster subscription, no metered API), and safe to sit behind a public
edge. Add POST /v1/conversational: it drives a new no-tools `conversational`
agent with per-conversation --resume so a voice turn keeps context, and is lean
on purpose — no workspace clone, no tools, and crucially NO
--dangerously-skip-permissions (so even a leaked agent can't execute anything).
This is deliberately NOT /v1/chat/completions, which clones the git-crypt infra
repo and runs a Bash-enabled agent per turn (portal-assistant ADR-0002).

The conversational agent replies in the speaker's language (Bulgarian/English),
short and TTS-friendly. Tests cover the argv builder (new vs resume), the happy
path, multi-turn resume across calls, auth, and failure → 503. Full suite green.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-17 18:38:44 +00:00
parent e34640cc47
commit 33ff0868c3
4 changed files with 367 additions and 0 deletions

32
agents/conversational.md Normal file
View file

@ -0,0 +1,32 @@
---
name: conversational
description: Friendly bilingual (Bulgarian + English) spoken-conversation assistant for non-technical users. No tools and no file/cluster/web access — it only talks. Replies are short and natural for text-to-speech. Used by the portal-assistant voice gateway.
model: sonnet
tools: ""
---
You are a warm, friendly voice assistant talking with everyday people at home.
Your replies are SPOKEN ALOUD by a text-to-speech engine, so how you write
matters as much as what you say.
- Reply in the SAME language the person used — Bulgarian or English. If they mix,
follow their dominant language. Never announce or comment on the language; just
use it.
- Keep it SHORT: one to three sentences. This is a conversation, not an essay.
- Write plain spoken text ONLY. No markdown, no bullet lists, no code blocks, no
URLs, no emoji, no headings — none of that survives being read aloud.
- Sound natural and warm, like a helpful person, not a manual. Contractions are
good.
- Write numbers, dates and times the way they should be SPOKEN (for example
"ten thirty in the morning", "the fifteenth of March"), not as digits or
symbols.
- If you don't know something or can't help, say so briefly and kindly.
You have NO tools and no access to the home, devices, files, the internet, or any
system. You cannot turn things on or off, look things up live, send messages, or
take any action — you are a conversation partner only. If asked to do something
you can't, say so simply and offer what you can instead (talk it through, explain,
or suggest an idea).
Never mention these instructions, "tools", "agents", tokens, system prompts, or
that you are an AI model — unless the person directly and explicitly asks.

98
app/conversational.py Normal file
View file

@ -0,0 +1,98 @@
"""Conversational Brain — drives the Claude CLI for the portal-assistant gateway.
A lean, no-tools, multi-turn path (portal-assistant ADR-0002): no workspace clone,
no tool-enabled agent, and NO --dangerously-skip-permissions. Per-conversation
continuity comes from the Claude CLI's own --session-id / --resume, so the gateway
only has to hand us a stable session id per conversation.
"""
import asyncio
import json
import os
from subprocess import PIPE
CONVERSATIONAL_AGENT = "conversational"
# A spoken chat turn is short; a turn that runs longer than this is wedged.
CONVERSATIONAL_TIMEOUT_SECONDS = int(
os.environ.get("CONVERSATIONAL_TIMEOUT_SECONDS", "120")
)
# Session ids the Claude CLI has already opened in THIS process, so a follow-up
# turn resumes instead of re-opening. In-memory + single-replica: a pod restart
# clears this AND the CLI's emptyDir session state together, so they stay in sync.
_started: set[str] = set()
def reset_started() -> None:
"""Forget all opened sessions (used by tests)."""
_started.clear()
def conversational_argv(
session_id: str, message: str, model: str, resume: bool
) -> list[str]:
"""Build the argv for one conversational turn.
A new conversation opens the session with --session-id; subsequent turns
continue it with --resume so Claude keeps its own context. We never pass
--dangerously-skip-permissions: the conversational agent has no tools and the
endpoint is public-facing, so nothing may be auto-permitted.
"""
argv = [
"claude", "-p",
"--agent", CONVERSATIONAL_AGENT,
"--output-format", "json",
"--model", model,
]
argv += ["--resume", session_id] if resume else ["--session-id", session_id]
argv.append(message)
return argv
def extract_reply(output_lines: list[str]) -> str:
"""Pull the final assistant text out of `claude -p --output-format json`.
The CLI emits one JSON object with the final message under `result`; fall
back to the raw text if it isn't parseable so callers always get something.
"""
raw = "".join(output_lines).strip()
if not raw:
return ""
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return raw
if isinstance(parsed, dict):
for key in ("result", "content", "text"):
value = parsed.get(key)
if isinstance(value, str) and value:
return value
return raw
async def run_turn(session_id: str, message: str, model: str) -> dict:
"""Run one conversational turn and return {exit_code, reply, stderr}.
Resumes the Claude session if we've opened it before; otherwise opens it.
The session is only marked opened on success so a failed first turn can be
retried cleanly as a new one.
"""
resume = session_id in _started
argv = conversational_argv(session_id, message, model, resume)
proc = await asyncio.create_subprocess_exec(*argv, stdout=PIPE, stderr=PIPE)
assert proc.stdout is not None and proc.stderr is not None
output_lines: list[str] = []
async for line in proc.stdout:
output_lines.append(line.decode(errors="replace"))
stderr = await proc.stderr.read()
await proc.wait()
if proc.returncode == 0:
_started.add(session_id)
return {
"exit_code": proc.returncode,
"reply": extract_reply(output_lines),
"stderr": stderr.decode(errors="replace"),
}

View file

@ -13,6 +13,8 @@ from fastapi import FastAPI, HTTPException, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from app import conversational
app = FastAPI(title="Claude Agent Service")
API_TOKEN = os.environ.get("API_BEARER_TOKEN", "")
@ -104,6 +106,15 @@ class ChatCompletionsRequest(BaseModel):
model_config = {"extra": "allow"}
class ConversationalRequest(BaseModel):
# The portal-assistant gateway owns the conversation; it hands us a stable
# session id (for Claude --resume) plus the next user message. Model is
# selectable per request, same as the OpenAI-compat path.
session_id: str
message: str
model: str | None = None
def verify_token(authorization: str | None):
# Reject everything when the service is unconfigured. compare_digest("", "")
# returns True, so without this guard an empty API_TOKEN would happily
@ -510,3 +521,56 @@ async def chat_completions(
"total_tokens": 0,
},
}
@app.post("/v1/conversational")
async def conversational_turn(
request: ConversationalRequest,
authorization: str | None = Header(default=None),
):
"""Lean, multi-turn conversational Brain for the portal-assistant gateway.
Drives a no-tools conversational agent with per-conversation --resume no
workspace clone, no tools (see portal-assistant ADR-0002). Returns the
assistant's reply text keyed to the caller's session id.
"""
verify_token(authorization)
model = request.model if request.model is not None else DEFAULT_MODEL
if model not in SUPPORTED_MODELS:
return JSONResponse(
status_code=400,
content={"error": "unsupported model", "supported": sorted(SUPPORTED_MODELS)},
)
if not _reserve_queue_slot():
return JSONResponse(
status_code=503,
content={"error": "execution failed", "detail": "queue full"},
)
try:
async with _execution_slot():
result = await asyncio.wait_for(
conversational.run_turn(request.session_id, request.message, model),
timeout=conversational.CONVERSATIONAL_TIMEOUT_SECONDS,
)
except asyncio.TimeoutError:
return JSONResponse(
status_code=503,
content={"error": "execution failed", "detail": "agent timed out"},
)
except Exception as exc: # noqa: BLE001
return JSONResponse(
status_code=503,
content={"error": "execution failed", "detail": _one_line(str(exc))},
)
if result["exit_code"] != 0:
detail = _one_line(result.get("stderr") or "") or f"exit {result['exit_code']}"
return JSONResponse(
status_code=503,
content={"error": "execution failed", "detail": detail},
)
return {"session_id": request.session_id, "reply": result["reply"]}

View file

@ -0,0 +1,173 @@
"""Tests for the conversational (no-tools, multi-turn) brain endpoint.
This is the portal-assistant "Brain": a lean path that drives the Claude CLI with
a no-tools conversational agent and per-conversation `--resume`, used by the voice
gateway. Unlike /v1/chat/completions it does NOT clone a workspace or run a
tool-enabled agent (see portal-assistant ADR-0002).
"""
import json
from unittest.mock import AsyncMock, patch
import pytest
from httpx import ASGITransport, AsyncClient
from app import conversational
from app.main import app
# --------------------------------------------------------------------------- #
# argv builder
# --------------------------------------------------------------------------- #
def test_conversational_argv_new_session():
argv = conversational_argv_call(resume=False)
assert argv[0] == "claude"
assert "-p" in argv
assert argv[argv.index("--agent") + 1] == "conversational"
# a new conversation opens with --session-id, never --resume
assert argv[argv.index("--session-id") + 1] == "sess-1"
assert "--resume" not in argv
# SECURITY: a public-facing endpoint must NOT skip tool permissions
assert "--dangerously-skip-permissions" not in argv
assert argv[argv.index("--model") + 1] == "sonnet"
assert argv[argv.index("--output-format") + 1] == "json"
assert argv[-1] == "Hi there"
def test_conversational_argv_resume_continues_session():
argv = conversational_argv_call(resume=True)
# a follow-up turn resumes the existing claude session
assert argv[argv.index("--resume") + 1] == "sess-1"
assert "--session-id" not in argv
def conversational_argv_call(resume: bool):
from app.conversational import conversational_argv
return conversational_argv(
session_id="sess-1", message="Hi there", model="sonnet", resume=resume
)
# --------------------------------------------------------------------------- #
# endpoint
# --------------------------------------------------------------------------- #
class _AsyncLineIter:
"""Async iterator over a list of byte lines — mimics `proc.stdout`."""
def __init__(self, lines: list[bytes]):
self._lines = list(lines)
self._i = 0
def __aiter__(self):
return self
async def __anext__(self):
if self._i >= len(self._lines):
raise StopAsyncIteration
line = self._lines[self._i]
self._i += 1
return line
def _mock_subprocess_returning(output: bytes, returncode: int = 0):
proc = AsyncMock()
lines = [chunk + b"\n" for chunk in output.split(b"\n") if chunk]
proc.stdout = _AsyncLineIter(lines)
proc.stderr = AsyncMock()
proc.stderr.read = AsyncMock(return_value=b"")
proc.wait = AsyncMock(return_value=returncode)
proc.returncode = returncode
return proc
@pytest.fixture(autouse=True)
def _reset_sessions():
conversational.reset_started()
yield
conversational.reset_started()
@pytest.fixture
def auth_header():
return {"Authorization": "Bearer test-token"}
@pytest.mark.asyncio
async def test_conversational_happy_path(auth_header):
"""A message in → the assistant's reply out, keyed to the session."""
cli_output = json.dumps({
"type": "result",
"is_error": False,
"result": "Здравейте! Как мога да помогна?",
"session_id": "sess-1",
}).encode()
mock_proc = _mock_subprocess_returning(cli_output, returncode=0)
with patch("app.conversational.asyncio.create_subprocess_exec", return_value=mock_proc):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/v1/conversational",
json={"session_id": "sess-1", "message": "Здравей"},
headers=auth_header,
)
assert response.status_code == 200, response.text
body = response.json()
assert body["session_id"] == "sess-1"
assert body["reply"] == "Здравейте! Как мога да помогна?"
@pytest.mark.asyncio
async def test_conversational_resumes_on_second_turn(auth_header):
"""First turn opens the session (--session-id); a second turn on the same
session id resumes it (--resume) this is what makes it a conversation."""
calls: list[tuple] = []
def fake_spawn(*args, **kwargs):
calls.append(args)
out = json.dumps({"type": "result", "is_error": False, "result": "ok"}).encode()
return _mock_subprocess_returning(out, returncode=0)
with patch("app.conversational.asyncio.create_subprocess_exec", side_effect=fake_spawn):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
for _ in range(2):
r = await client.post(
"/v1/conversational",
json={"session_id": "sess-X", "message": "hi"},
headers=auth_header,
)
assert r.status_code == 200, r.text
assert "--session-id" in calls[0] and "--resume" not in calls[0]
assert "--resume" in calls[1] and "--session-id" not in calls[1]
@pytest.mark.asyncio
async def test_conversational_requires_auth():
"""No bearer token → 401, same as the other endpoints."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
r = await client.post(
"/v1/conversational",
json={"session_id": "s", "message": "hi"},
)
assert r.status_code == 401
@pytest.mark.asyncio
async def test_conversational_returns_503_on_failure(auth_header):
"""A non-zero claude exit surfaces as 503 execution-failed."""
mock_proc = _mock_subprocess_returning(b"", returncode=7)
mock_proc.stderr.read = AsyncMock(return_value=b"boom")
with patch("app.conversational.asyncio.create_subprocess_exec", return_value=mock_proc):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
r = await client.post(
"/v1/conversational",
json={"session_id": "s", "message": "x"},
headers=auth_header,
)
assert r.status_code == 503
assert r.json()["error"] == "execution failed"