claude-agent-service/tests/test_main.py
2026-05-07 17:07:12 +00:00

214 lines
8.1 KiB
Python

import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from httpx import ASGITransport, AsyncClient
from app import main as app_main
from app.main import ExecuteRequest, app
def test_execute_request_default_timeout_is_45_minutes():
# The service-upgrade agent can take 20-45m for CAUTION-class bumps
# (multi-release changelog summarisation, Woodpecker CI polling, DB
# backup waits). 15m cuts off too many real runs — see beads code-cfy.
assert ExecuteRequest(prompt="p", agent="a").timeout_seconds == 2700
@pytest.fixture
def auth_header():
return {"Authorization": "Bearer test-token"}
@pytest.mark.asyncio
async def test_health_returns_ok():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"
@pytest.mark.asyncio
async def test_execute_rejects_missing_auth():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post("/execute", json={
"prompt": "test",
"agent": "test-agent",
})
assert response.status_code == 401
@pytest.mark.asyncio
async def test_execute_rejects_wrong_token():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={"prompt": "test", "agent": "test-agent"},
headers={"Authorization": "Bearer wrong-token"},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_execute_rejects_missing_prompt(auth_header):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={"agent": "test-agent"},
headers=auth_header,
)
assert response.status_code == 422
@pytest.mark.asyncio
async def test_execute_starts_job(auth_header):
mock_process = AsyncMock()
mock_process.stdout = AsyncMock()
mock_process.stdout.__aiter__ = MagicMock(return_value=iter([]))
mock_process.stderr = AsyncMock()
mock_process.stderr.read = AsyncMock(return_value=b"")
mock_process.wait = AsyncMock(return_value=0)
mock_process.returncode = 0
with patch("app.main.asyncio.create_subprocess_exec", return_value=mock_process):
with patch("app.main.run_git_sync", new_callable=AsyncMock):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={"prompt": "test prompt", "agent": "test-agent"},
headers=auth_header,
)
assert response.status_code == 202
body = response.json()
assert "job_id" in body
assert body["status"] == "running"
@pytest.mark.asyncio
async def test_get_job_not_found(auth_header):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/jobs/nonexistent", headers=auth_header)
assert response.status_code == 404
@pytest.mark.asyncio
async def test_execute_stores_metadata_on_job(auth_header):
mock_process = AsyncMock()
mock_process.stdout = AsyncMock()
mock_process.stdout.__aiter__ = MagicMock(return_value=iter([]))
mock_process.stderr = AsyncMock()
mock_process.stderr.read = AsyncMock(return_value=b"")
mock_process.wait = AsyncMock(return_value=0)
mock_process.returncode = 0
metadata = {"task_id": "code-xyz", "source": "beadboard"}
with patch("app.main.asyncio.create_subprocess_exec", return_value=mock_process):
with patch("app.main.run_git_sync", new_callable=AsyncMock):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={
"prompt": "test prompt",
"agent": "beads-task-runner",
"metadata": metadata,
},
headers=auth_header,
)
assert response.status_code == 202
job_id = response.json()["job_id"]
job_response = await client.get(f"/jobs/{job_id}", headers=auth_header)
assert job_response.status_code == 200
assert job_response.json()["metadata"] == metadata
@pytest.mark.asyncio
async def test_execute_respects_sequential_lock(auth_header):
hold_event = asyncio.Event()
release_event = asyncio.Event()
async def slow_subprocess(*args, **kwargs):
mock = AsyncMock()
mock.stdout = AsyncMock()
async def slow_iter():
hold_event.set()
await release_event.wait()
return
yield # noqa: F841 - unreachable yield makes this an async generator
mock.stdout.__aiter__ = MagicMock(side_effect=slow_iter)
mock.stderr = AsyncMock()
mock.stderr.read = AsyncMock(return_value=b"")
mock.wait = AsyncMock(return_value=0)
mock.returncode = 0
return mock
with patch("app.main.asyncio.create_subprocess_exec", side_effect=slow_subprocess):
with patch("app.main.run_git_sync", new_callable=AsyncMock):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
task1 = asyncio.create_task(client.post(
"/execute",
json={"prompt": "first", "agent": "agent1"},
headers=auth_header,
))
await hold_event.wait()
response2 = await client.post(
"/execute",
json={"prompt": "second", "agent": "agent2"},
headers=auth_header,
)
assert response2.status_code == 409
release_event.set()
response1 = await task1
assert response1.status_code == 202
@pytest.mark.asyncio
async def test_execute_rejects_empty_api_token_header():
# When the service is booted without an API_BEARER_TOKEN (misconfiguration),
# every request must be rejected — including requests with an empty Bearer
# header. Without the guard, hmac.compare_digest("", "") would return True.
with patch.object(app_main, "API_TOKEN", ""):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={"prompt": "test", "agent": "test-agent"},
headers={"Authorization": "Bearer "},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_execute_accepts_correct_bearer_token():
mock_process = AsyncMock()
mock_process.stdout = AsyncMock()
mock_process.stdout.__aiter__ = MagicMock(return_value=iter([]))
mock_process.stderr = AsyncMock()
mock_process.stderr.read = AsyncMock(return_value=b"")
mock_process.wait = AsyncMock(return_value=0)
mock_process.returncode = 0
with patch.object(app_main, "API_TOKEN", "secret"):
with patch("app.main.asyncio.create_subprocess_exec", return_value=mock_process):
with patch("app.main.run_git_sync", new_callable=AsyncMock):
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.post(
"/execute",
json={"prompt": "test", "agent": "test-agent"},
headers={"Authorization": "Bearer secret"},
)
assert response.status_code == 202