afk: wire the T3 adapter to the REAL orchestration contract + fix priority
Some checks failed
Build and Push / lint-and-test (push) Has been cancelled
Build and Push / build (push) Has been cancelled
Build and Push / deploy (push) Has been cancelled
Build and Push / notify-failure (push) Has been cancelled

The T3 dispatch adapter was written against a guessed wire shape that the test
fake accepted but the live t3-afk server 400s — so the previously-green suite did
NOT mean the loop was actually wired to T3. Reverse-engineered the real contract
from the v0.0.27 binary, verified it live against t3-afk (including multi-turn),
and rewrote the adapter to match:

- dispatch sends BARE commands keyed by `type` (not a `command` string), with
  client-minted threadId/commandId/messageId + createdAt; the server replies
  {sequence}, so dispatch returns the id it generated (never one parsed back).
- a thread lives in a project (workspaceRoot = the repo checkout the agent runs
  in), so dispatch ensures the repo's project (snapshot -> project.create iff
  absent) before thread.create + thread.turn.start.
- add send_turn() for follow-up turns on an existing thread — multi-turn context
  retention is verified live (turn 2 recalled turn 1).
- watcher reads thread liveness from latestTurn.state (completed->idle,
  running/in_progress/pending->running, errored->error), not a non-existent
  top-level `status` field.

Guard against recurrence: the test fake now REJECTS any command lacking a `type`
discriminator (the original bug fails loudly), plus an opt-in live smoke test
(tests/test_afk_t3_live.py) so "green" can mean "wired to T3".

Also align dispatch_policy to lower-priority-value-first (P0 before P1), matching
tracker conventions and Issue.priority's own docstring — it had deliberately
diverged to higher-first. Loop still ships DISABLED (kill switch on, empty
allowlist). 416 tests pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-15 22:27:00 +00:00
parent 2ef0db9a96
commit e34640cc47
8 changed files with 555 additions and 272 deletions

View file

@ -23,18 +23,19 @@ What it encapsulates (the dispatch predicate from the AFK pipeline design doc):
the issue is skipped.
* **One-agent-per-repo within the batch** because a repo hosts only one
in-flight agent, a single call returns at most ONE decision per repo: the
highest-priority eligible issue in that repo wins the slot. (A higher-priority
issue that is itself ineligible does not consume the slot the best
*eligible* candidate does.)
most-urgent eligible issue in that repo wins the slot. (A more-urgent issue
that is itself ineligible does not consume the slot the best *eligible*
candidate does.)
* **Priority ordering** the surviving per-repo winners are returned
highest-``priority``-first, with a deterministic tiebreaker (ascending issue
number) so the output is a total, stable order independent of input order.
lowest-``priority``-value-first (P0 before P1 before P2), with a deterministic
tiebreaker (ascending issue number) so the output is a total, stable order
independent of input order.
PRIORITY DIRECTION note the deliberate divergence: ``Issue.priority``'s
docstring in ``types`` says "lower runs first", but this module follows the
explicit dispatch-policy specification, which orders **higher priority first**.
The ordering lives here (the one place that consumes ``priority`` for dispatch),
so this module is the source of truth for the direction.
PRIORITY DIRECTION lower ``Issue.priority`` runs first, matching tracker
conventions (P0/P1 are more urgent than P2) and ``Issue.priority``'s own
docstring in ``types``. The ordering lives here (the one place that consumes
``priority`` for dispatch), so this module is the source of truth for the
direction.
Pure: it never mutates its inputs the caller's issue list, the config, and the
``in_flight_repos`` set are all left exactly as passed.
@ -51,7 +52,8 @@ def select_dispatchable(
Empty when the kill switch is on, the allowlist excludes everything, or no
issue clears every gate. At most one decision per repo; ordered
highest-priority-first, ties broken by ascending issue number.
lowest-priority-value-first (most urgent), ties broken by ascending issue
number.
"""
# Kill switch: master off-ramp, evaluated before any per-issue work.
if config.kill_switch:
@ -73,7 +75,7 @@ def select_dispatchable(
for issue in sorted(eligible, key=_dispatch_sort_key):
best_per_repo.setdefault(issue.repo, issue)
# Final order: the per-repo winners, highest priority first (total + stable).
# Final order: the per-repo winners, most urgent first (total + stable).
winners = sorted(best_per_repo.values(), key=_dispatch_sort_key)
return [DispatchDecision(issue=issue, reason=_reason(issue)) for issue in winners]
@ -101,11 +103,10 @@ def _is_eligible(
def _dispatch_sort_key(issue: Issue) -> tuple[int, int]:
"""Sort key giving a total, deterministic order: highest ``priority`` first
(negated so a plain ascending sort puts it on top), then lowest issue number
as the tiebreaker so equal-priority issues never depend on input/iteration
order."""
return (-issue.priority, issue.number)
"""Sort key giving a total, deterministic order: lowest ``priority`` value
first (P0 before P1 most urgent wins), then lowest issue number as the
tiebreaker so equal-priority issues never depend on input/iteration order."""
return (issue.priority, issue.number)
def _reason(issue: Issue) -> str:

View file

@ -1,33 +1,46 @@
"""Adapter for the in-cluster T3 Code instance — the AFK executor + cockpit.
The control plane keeps the brain; T3 runs the agent. This module is the thin
wire between them: it turns "implement issue N of repo R with this prompt" into
the TWO HTTP commands T3's orchestration API needs, and reads the fleet
snapshot the watcher polls. It owns no AFK behaviour the agent's standing
rules ride in as the ``ISSUE_IMPLEMENTER_PREAMBLE`` prepended to the turn
message, because T3's full-access ``claudeAgent`` runtime does NOT honour
``~/.claude/CLAUDE.md`` (see ``issue_implementer_prompt``).
wire between them, written against T3's **real** orchestration contract
(reverse-engineered from the v0.0.27 binary and verified live against t3-afk on
2026-06-15 an earlier version of this adapter was written against a guessed
shape that a fake test accepted but the real server 400s).
Two operations, both against the dedicated in-cluster T3 pod:
The contract, in three facts that shape everything here:
* ``dispatch(repo, issue, prompt) -> thread_id`` POSTs ``thread.create``
then ``thread.turn.start`` to ``/api/orchestration/dispatch``. The create
command selects the ``claudeAgent`` instance in ``full-access`` runtime mode
and returns a thread id; the turn command targets that thread and delivers
``ISSUE_IMPLEMENTER_PREAMBLE + prompt`` as ``message.text``. One dispatch =
one worktree-isolated worker.
* ``snapshot() -> dict`` GETs ``/api/orchestration/snapshot``, the full fleet
read-model. T3 has no outbound webhooks, so the watcher polls this for
per-thread ``running``/``idle``/``error`` status.
1. **Bare command envelope.** ``POST /api/orchestration/dispatch`` takes a
single command object whose discriminator is ``type`` (NOT a ``command``
string, NOT a wrapper). The body *is* the command.
2. **Client-authoritative IDs.** The CLIENT mints ``threadId`` / ``commandId``
/ ``messageId`` (UUIDs) and stamps ``createdAt`` (ISO-8601); the server
replies ``{"sequence": N}`` and does NOT echo the thread id. So ``dispatch``
returns the id it generated, never one parsed from the response.
3. **Threads live in a project.** A project's ``workspaceRoot`` is the repo
checkout the agent runs in (it ``cd``s there and commits there). So a repo
maps to a project; ``dispatch`` ensures that project exists before creating
the thread.
The HTTP transport and the bearer provider are **injected** (constructor
args), so the production wiring hands in an ``httpx.Client`` plus a Vault-backed
token reader, while tests hand in an in-memory fake nothing here ever opens a
socket on its own. The bearer is re-read from the provider on **every** request
because T3's ``orchestration:operate`` token expires hourly and is refreshed out
of band.
Operations (the methods ``poller`` / ``watcher`` call, plus a multi-turn helper):
* ``dispatch(repo, issue, prompt) -> thread_id`` ensure the repo's project,
then ``thread.create`` + ``thread.turn.start`` (``ISSUE_IMPLEMENTER_PREAMBLE
+ prompt`` as the user message). Returns the client-minted thread id.
* ``send_turn(thread_id, prompt) -> None`` a follow-up user turn on an
existing thread. Multi-turn context is retained (verified live), so this is
how a conversation continues without spawning a fresh thread.
* ``snapshot() -> dict`` the fleet read-model (``GET``); the watcher reads
per-thread ``latestTurn.state`` from it.
The HTTP transport, the bearer provider, the id factory, and the clock are all
**injected**, so production hands in an ``httpx.Client`` + a Vault-backed token
reader + ``uuid4`` + a UTC clock, while tests hand in deterministic fakes. The
bearer is re-read from the provider on **every** request because T3's
``orchestration:operate`` token rotates.
"""
import uuid
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Protocol
from .issue_implementer_prompt import ISSUE_IMPLEMENTER_PREAMBLE
@ -36,24 +49,69 @@ from .issue_implementer_prompt import ISSUE_IMPLEMENTER_PREAMBLE
_DISPATCH_PATH = "/api/orchestration/dispatch"
_SNAPSHOT_PATH = "/api/orchestration/snapshot"
# Pilot-baked dispatch envelope: which backend instance runs the thread and in
# which runtime mode. Constants (not config) — every AFK thread is identical.
# Pilot-baked execution envelope. ``claudeAgent`` is the embedded Claude Agent
# SDK instance; ``full-access`` is the unattended runtime (bypass-permissions);
# ``default`` interaction mode is normal turns (vs ``plan``). The model is the
# one the pilot validated — tunable via the constructor.
_INSTANCE_ID = "claudeAgent"
_DEFAULT_MODEL = "claude-sonnet-4-6"
_RUNTIME_MODE = "full-access"
_INTERACTION_MODE = "default"
# JSON shapes. Command bodies and the snapshot read-model are open string-keyed
# objects; ``object`` values keep us honest without a bare ``Any``.
type Json = dict[str, object]
class HttpResponse(Protocol):
"""The httpx-shaped response surface this adapter relies on.
def _uuid() -> str:
"""Default id factory: a fresh random UUID string (thread/command/message ids)."""
return str(uuid.uuid4())
Both ``httpx.Response`` and the test fake satisfy it: ``raise_for_status``
turns a non-2xx into an exception (so a failed ``thread.create`` aborts
before ``thread.turn.start`` ever fires) and ``json`` parses the body.
def _now_iso() -> str:
"""Default clock: the current instant as an ISO-8601 UTC timestamp."""
return datetime.now(timezone.utc).isoformat()
@dataclass(frozen=True)
class ProjectRef:
"""Where a repo's agent runs. ``project_id`` is the stable T3 project id (the
client mints it, deterministically per repo); ``workspace_root`` is the repo
checkout directory the project points at (the agent's cwd); ``title`` is the
human label shown in the cockpit."""
project_id: str
workspace_root: str
title: str
def default_project_resolver(workspace_base: str = "/data") -> "Callable[[str], ProjectRef]":
"""A repo -> :class:`ProjectRef` resolver with stable, deterministic ids.
``project_id`` is a UUID5 of the repo (so the same repo always resolves to the
same project across ticks and restarts ``dispatch``'s ensure-project step
is therefore idempotent); ``workspace_root`` is ``<workspace_base>/<slug>``
where the slug flattens ``owner/name`` to a single path segment. The checkout
itself (cloning the repo into ``workspace_root``) is an enrollment concern,
not this adapter's — the agent or a provisioning step populates it.
"""
def resolve(repo: str) -> ProjectRef:
slug = repo.replace("/", "__")
return ProjectRef(
project_id=str(uuid.uuid5(uuid.NAMESPACE_URL, f"afk-project:{repo}")),
workspace_root=f"{workspace_base.rstrip('/')}/{slug}",
title=repo,
)
return resolve
class HttpResponse(Protocol):
"""The httpx-shaped response surface this adapter relies on: ``raise_for_status``
turns a non-2xx into an exception (so a failed command aborts the sequence)
and ``json`` parses the body."""
def raise_for_status(self) -> object: ...
def json(self) -> Json: ...
@ -61,8 +119,8 @@ class HttpResponse(Protocol):
class HttpClient(Protocol):
"""Minimal injected transport: a JSON ``post`` and a ``get``, both taking
explicit headers. Deliberately a strict subset of ``httpx.Client`` so the
real client passes one straight through and tests pass a recorder."""
explicit headers. A strict subset of ``httpx.Client`` so the real client
passes straight through and tests pass a recorder."""
def post(self, url: str, json: Json, headers: dict[str, str]) -> HttpResponse: ...
@ -72,9 +130,11 @@ class HttpClient(Protocol):
class T3Client:
"""Dispatch/snapshot adapter for one in-cluster T3 instance.
``base_url`` is the T3 service root (a trailing slash is tolerated);
``http`` is the injected transport; ``bearer_provider`` returns the current
``orchestration:operate`` token, re-read per request for hourly rotation.
``base_url`` is the T3 service root (a trailing slash is tolerated); ``http``
is the injected transport; ``bearer_provider`` returns the current
``orchestration:operate`` token, re-read per request; ``project_resolver``
maps a repo to its :class:`ProjectRef`; ``id_factory`` / ``clock`` are
injected for deterministic tests (defaulting to ``uuid4`` / UTC now).
"""
def __init__(
@ -82,54 +142,113 @@ class T3Client:
base_url: str,
http: HttpClient,
bearer_provider: Callable[[], str],
project_resolver: Callable[[str], ProjectRef] | None = None,
*,
id_factory: Callable[[], str] = _uuid,
clock: Callable[[], str] = _now_iso,
model: str = _DEFAULT_MODEL,
) -> None:
self._base_url = base_url.rstrip("/")
self._http = http
self._bearer_provider = bearer_provider
self._project_for = project_resolver or default_project_resolver()
self._id = id_factory
self._now = clock
self._model = model
# ----------------------------------------------------------------- #
# Public API (the ``t3_client.T3Client`` contract).
# Public API (the ``t3_client.T3Client`` contract the poller/watcher use).
# ----------------------------------------------------------------- #
def dispatch(self, repo: str, issue: int, prompt: str) -> str:
"""Spawn one worker thread for ``issue`` of ``repo`` and return its id.
Two POSTs to ``/api/orchestration/dispatch``: ``thread.create`` (selects
the ``claudeAgent`` instance, ``full-access`` runtime) yields the thread
id; ``thread.turn.start`` then delivers ``ISSUE_IMPLEMENTER_PREAMBLE +
prompt`` to that thread. A failed create raises and short-circuits the
turn (we never fire a turn at a thread that wasn't created).
Ensures the repo's project exists, generates the thread id locally, then
POSTs ``thread.create`` followed by ``thread.turn.start`` (delivering
``ISSUE_IMPLEMENTER_PREAMBLE + prompt``). Any failed POST raises and
short-circuits the rest of the sequence. The returned id is the one this
method minted the server never sends it back.
"""
create_resp = self._post(
_DISPATCH_PATH,
{
"command": "thread.create",
"repo": repo,
"issue": issue,
"modelSelection": {"instanceId": _INSTANCE_ID},
"runtimeMode": _RUNTIME_MODE,
},
)
thread_id = self._thread_id_of(create_resp.json())
project = self._ensure_project(repo)
thread_id = self._id()
self._post(
_DISPATCH_PATH,
{
"command": "thread.turn.start",
"threadId": thread_id,
"message": {"text": ISSUE_IMPLEMENTER_PREAMBLE + prompt},
},
)
self._post(self._thread_create_command(thread_id, project))
self._post(self._turn_command(thread_id, ISSUE_IMPLEMENTER_PREAMBLE + prompt))
return thread_id
def send_turn(self, thread_id: str, prompt: str) -> None:
"""Deliver a follow-up user turn to an existing thread (multi-turn).
Used to continue a conversation the agent retains the thread's prior
context across turns. No preamble: the standing rules were already
delivered on the opening turn.
"""
self._post(self._turn_command(thread_id, prompt))
def snapshot(self) -> Json:
"""Return the parsed fleet read-model from ``/api/orchestration/snapshot``."""
return self._get(_SNAPSHOT_PATH).json()
# ----------------------------------------------------------------- #
# Internals.
# Command builders (the real wire shapes).
# ----------------------------------------------------------------- #
def _post(self, path: str, body: Json) -> HttpResponse:
resp = self._http.post(self._url(path), json=body, headers=self._headers())
def _ensure_project(self, repo: str) -> ProjectRef:
"""Make sure the repo's project exists, creating it if absent. Idempotent:
the resolver's project id is stable per repo, so a project already in the
snapshot is left untouched (no duplicate, no error)."""
project = self._project_for(repo)
existing = {
p.get("id") for p in self._get(_SNAPSHOT_PATH).json().get("projects", [])
}
if project.project_id not in existing:
self._post(
{
"type": "project.create",
"commandId": self._id(),
"projectId": project.project_id,
"title": project.title,
"workspaceRoot": project.workspace_root,
"createWorkspaceRootIfMissing": True,
"createdAt": self._now(),
}
)
return project
def _thread_create_command(self, thread_id: str, project: ProjectRef) -> Json:
return {
"type": "thread.create",
"commandId": self._id(),
"threadId": thread_id,
"projectId": project.project_id,
"title": project.title,
"modelSelection": {"instanceId": _INSTANCE_ID, "model": self._model},
"runtimeMode": _RUNTIME_MODE,
"interactionMode": _INTERACTION_MODE,
"branch": None,
"worktreePath": None,
"createdAt": self._now(),
}
def _turn_command(self, thread_id: str, text: str) -> Json:
return {
"type": "thread.turn.start",
"commandId": self._id(),
"threadId": thread_id,
"message": {
"messageId": self._id(),
"role": "user",
"text": text,
"attachments": [],
},
"runtimeMode": _RUNTIME_MODE,
"interactionMode": _INTERACTION_MODE,
"createdAt": self._now(),
}
# ----------------------------------------------------------------- #
# Transport internals.
# ----------------------------------------------------------------- #
def _post(self, command: Json) -> HttpResponse:
resp = self._http.post(self._url(_DISPATCH_PATH), json=command, headers=self._headers())
resp.raise_for_status()
return resp
@ -143,17 +262,3 @@ class T3Client:
def _headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self._bearer_provider()}"}
@staticmethod
def _thread_id_of(create_response: Json) -> str:
"""Extract the new thread id from a ``thread.create`` reply.
T3 returns it as ``threadId``; we fail loudly on a malformed reply rather
than dispatch a turn at an empty/None id.
"""
thread_id = create_response.get("threadId")
if not isinstance(thread_id, str) or not thread_id:
raise ValueError(
f"thread.create response missing a usable threadId: {create_response!r}"
)
return thread_id

View file

@ -7,10 +7,11 @@ and the fix-forward bookkeeping), one ``tick``:
1. **assemble a ``RunState``** from the live edges + the run's bookkeeping:
* ``thread_status`` from ``t3_client.snapshot()``, by finding this run's
thread and mapping T3's ``running``/``idle``/``error`` to a
``ThreadStatus`` (missing thread, or any unrecognised status, folds to
``None`` "no status yet" the state machine WAITs; we never escalate
or close on a status we don't understand);
thread and mapping its ``latestTurn.state`` (``completed`` idle,
``running``/``in_progress``/``pending`` running, ``errored`` error)
to a ``ThreadStatus`` (missing thread, no turn yet, or any unrecognised
state folds to ``None`` "no status yet" the state machine WAITs; we
never escalate or close on a status we don't understand);
* ``ci_status`` ``ci_watcher.status(repo, commit)`` *only* when a commit
is pushed (no commit nothing to check ``None``);
* ``pushed`` / ``fix_forward_attempts`` / ``elapsed_seconds`` straight
@ -50,13 +51,22 @@ from .notifier import KIND_DONE, KIND_FROZEN, KIND_NEEDS_HUMAN
from .poller import T3Port as _DispatchPort # dispatch(repo, issue, prompt) -> id
from .types import Action, CIStatus, Config, Issue, Phase, RunState, ThreadStatus
# T3 snapshot status string -> ThreadStatus. Anything not in here (a status T3
# adds later, or a malformed entry) maps to None — "no usable status yet" — so
# the state machine waits rather than acting on something it can't interpret.
# T3 ``latestTurn.state`` -> ThreadStatus. The real snapshot reports a thread's
# liveness as the state of its latest turn (verified against t3-afk v0.0.27):
# ``completed`` == the turn finished cleanly (agent is idle, awaiting input);
# any not-yet-finished state (``running``/``in_progress``/``pending``/``queued``/
# ``pendingInit``) == still working; ``errored`` == the turn failed. Anything not
# in here (a state T3 adds later, or a malformed/absent entry) maps to None —
# "no usable status yet" — so the state machine waits rather than acting on
# something it can't interpret.
_THREAD_STATUS_BY_STRING: dict[str, ThreadStatus] = {
"completed": ThreadStatus.IDLE,
"running": ThreadStatus.RUNNING,
"idle": ThreadStatus.IDLE,
"error": ThreadStatus.ERROR,
"in_progress": ThreadStatus.RUNNING,
"pending": ThreadStatus.RUNNING,
"queued": ThreadStatus.RUNNING,
"pendingInit": ThreadStatus.RUNNING,
"errored": ThreadStatus.ERROR,
}
# Action -> the terminal doorbell kind to ring. Only the terminal actions appear;
@ -201,10 +211,13 @@ class Watcher:
def _thread_status(self, thread_id: str) -> ThreadStatus | None:
"""This thread's liveness from the fleet snapshot, or ``None`` when the
thread is absent or its status string is one we don't recognise."""
thread is absent, has no turn yet, or its ``latestTurn.state`` is one we
don't recognise. Liveness is the state of the thread's latest turn (the
real snapshot shape), not a top-level ``status`` field."""
for thread in self._t3.snapshot().get("threads", []):
if thread.get("id") == thread_id:
return _THREAD_STATUS_BY_STRING.get(thread.get("status"))
latest_turn = thread.get("latestTurn") or {}
return _THREAD_STATUS_BY_STRING.get(latest_turn.get("state"))
return None
# ----------------------------------------------------------------- #