claude-agent-service/tests/test_afk_t3_live.py
Viktor Barzin e34640cc47
Some checks failed
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build (push) Has been cancelled
Build and Push / deploy (push) Has been cancelled
Build and Push / notify-failure (push) Has been cancelled
afk: wire the T3 adapter to the REAL orchestration contract + fix priority
The T3 dispatch adapter was written against a guessed wire shape that the test
fake accepted but the live t3-afk server 400s — so the previously-green suite did
NOT mean the loop was actually wired to T3. Reverse-engineered the real contract
from the v0.0.27 binary, verified it live against t3-afk (including multi-turn),
and rewrote the adapter to match:

- dispatch sends BARE commands keyed by `type` (not a `command` string), with
  client-minted threadId/commandId/messageId + createdAt; the server replies
  {sequence}, so dispatch returns the id it generated (never one parsed back).
- a thread lives in a project (workspaceRoot = the repo checkout the agent runs
  in), so dispatch ensures the repo's project (snapshot -> project.create iff
  absent) before thread.create + thread.turn.start.
- add send_turn() for follow-up turns on an existing thread — multi-turn context
  retention is verified live (turn 2 recalled turn 1).
- watcher reads thread liveness from latestTurn.state (completed->idle,
  running/in_progress/pending->running, errored->error), not a non-existent
  top-level `status` field.

Guard against recurrence: the test fake now REJECTS any command lacking a `type`
discriminator (the original bug fails loudly), plus an opt-in live smoke test
(tests/test_afk_t3_live.py) so "green" can mean "wired to T3".

Also align dispatch_policy to lower-priority-value-first (P0 before P1), matching
tracker conventions and Issue.priority's own docstring — it had deliberately
diverged to higher-first. Loop still ships DISABLED (kill switch on, empty
allowlist). 416 tests pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 22:27:00 +00:00

92 lines
3.6 KiB
Python

"""LIVE smoke test for ``app.afk.t3_client`` against a real T3 instance.
Skipped by default. The unit tests (``test_afk_t3_client``) pin the wire shape
against a contract-accurate fake; this file proves the *same code* actually talks
to a live T3 — the guard that "green tests" mean "wired to T3", which the earlier
fake-only suite did NOT provide (it was green while the real server 400'd).
It is opt-in because the orchestration API is in-cluster (ClusterIP + an
Authentik-gated ingress), so it can't run in CI without cluster access. Run it
from inside the cluster, or via a port-forward, with a bearer minted on the pod::
# bearer (on the t3-afk pod, as the node user):
# t3 auth session issue --token-only --base-dir /data/t3 --ttl 30m
kubectl -n t3-afk port-forward deploy/t3-afk 3773:3773 &
T3_AFK_BASE_URL=http://127.0.0.1:3773 T3_AFK_TOKEN=<bearer> \
python3 -m pytest tests/test_afk_t3_live.py -v
The read-only snapshot check is always safe. The full dispatch round-trip
(create thread + turn + verify it appears, then delete it) only runs with
``T3_AFK_SMOKE_DISPATCH=1`` since it spends a (tiny) agent turn.
"""
import os
import time
import pytest
from app.afk import t3_client
_BASE_URL = os.environ.get("T3_AFK_BASE_URL")
_TOKEN = os.environ.get("T3_AFK_TOKEN")
pytestmark = pytest.mark.skipif(
not (_BASE_URL and _TOKEN),
reason="set T3_AFK_BASE_URL + T3_AFK_TOKEN to run the live T3 smoke test",
)
def _real_client():
import httpx # local import so the module imports fine without httpx installed
return t3_client.T3Client(
base_url=_BASE_URL,
http=httpx.Client(timeout=30.0),
bearer_provider=lambda: _TOKEN,
)
def test_live_snapshot_has_the_real_shape():
"""A real snapshot parses and carries the keys the watcher/adapter depend on:
``threads`` + ``projects``, and any thread exposes ``latestTurn`` (the
liveness source) — not a top-level ``status``."""
snap = _real_client().snapshot()
assert isinstance(snap, dict)
assert "threads" in snap and "projects" in snap
for thread in snap["threads"]:
assert "id" in thread
# liveness lives under latestTurn.state (the contract this suite guards)
assert "status" not in thread, "real threads have no top-level status field"
@pytest.mark.skipif(
os.environ.get("T3_AFK_SMOKE_DISPATCH") != "1",
reason="set T3_AFK_SMOKE_DISPATCH=1 to run the dispatch round-trip (spends a turn)",
)
def test_live_dispatch_round_trip_then_cleanup():
"""End-to-end against the real server: ``dispatch`` (ensure-project + create +
turn) succeeds and the new thread shows up in the snapshot. Cleans up the
thread it created so the cockpit isn't littered."""
import httpx
repo = "afk-smoke/roundtrip"
client = _real_client()
thread_id = client.dispatch(repo, 1, "Reply with just: ok. Do not use any tools.")
assert isinstance(thread_id, str) and thread_id
# The thread must appear in the fleet read-model (poll briefly — dispatch is
# accepted asynchronously).
found = False
for _ in range(10):
if any(t.get("id") == thread_id for t in client.snapshot().get("threads", [])):
found = True
break
time.sleep(1.0)
assert found, f"dispatched thread {thread_id} never appeared in the snapshot"
# Cleanup: delete the throwaway thread (raw command — not part of the adapter).
httpx.post(
f"{_BASE_URL.rstrip('/')}/api/orchestration/dispatch",
headers={"Authorization": f"Bearer {_TOKEN}"},
json={"type": "thread.delete", "commandId": t3_client._uuid(), "threadId": thread_id},
timeout=30.0,
).raise_for_status()