diff --git a/app/afk/dispatch_policy.py b/app/afk/dispatch_policy.py index f2c8f0a..b8502fa 100644 --- a/app/afk/dispatch_policy.py +++ b/app/afk/dispatch_policy.py @@ -23,18 +23,19 @@ What it encapsulates (the dispatch predicate from the AFK pipeline design doc): the issue is skipped. * **One-agent-per-repo within the batch** — because a repo hosts only one in-flight agent, a single call returns at most ONE decision per repo: the - highest-priority eligible issue in that repo wins the slot. (A higher-priority - issue that is itself ineligible does not consume the slot — the best - *eligible* candidate does.) + most-urgent eligible issue in that repo wins the slot. (A more-urgent issue + that is itself ineligible does not consume the slot — the best *eligible* + candidate does.) * **Priority ordering** — the surviving per-repo winners are returned - highest-``priority``-first, with a deterministic tiebreaker (ascending issue - number) so the output is a total, stable order independent of input order. + lowest-``priority``-value-first (P0 before P1 before P2), with a deterministic + tiebreaker (ascending issue number) so the output is a total, stable order + independent of input order. -PRIORITY DIRECTION — note the deliberate divergence: ``Issue.priority``'s -docstring in ``types`` says "lower runs first", but this module follows the -explicit dispatch-policy specification, which orders **higher priority first**. -The ordering lives here (the one place that consumes ``priority`` for dispatch), -so this module is the source of truth for the direction. +PRIORITY DIRECTION — lower ``Issue.priority`` runs first, matching tracker +conventions (P0/P1 are more urgent than P2) and ``Issue.priority``'s own +docstring in ``types``. The ordering lives here (the one place that consumes +``priority`` for dispatch), so this module is the source of truth for the +direction. Pure: it never mutates its inputs — the caller's issue list, the config, and the ``in_flight_repos`` set are all left exactly as passed. @@ -51,7 +52,8 @@ def select_dispatchable( Empty when the kill switch is on, the allowlist excludes everything, or no issue clears every gate. At most one decision per repo; ordered - highest-priority-first, ties broken by ascending issue number. + lowest-priority-value-first (most urgent), ties broken by ascending issue + number. """ # Kill switch: master off-ramp, evaluated before any per-issue work. if config.kill_switch: @@ -73,7 +75,7 @@ def select_dispatchable( for issue in sorted(eligible, key=_dispatch_sort_key): best_per_repo.setdefault(issue.repo, issue) - # Final order: the per-repo winners, highest priority first (total + stable). + # Final order: the per-repo winners, most urgent first (total + stable). winners = sorted(best_per_repo.values(), key=_dispatch_sort_key) return [DispatchDecision(issue=issue, reason=_reason(issue)) for issue in winners] @@ -101,11 +103,10 @@ def _is_eligible( def _dispatch_sort_key(issue: Issue) -> tuple[int, int]: - """Sort key giving a total, deterministic order: highest ``priority`` first - (negated so a plain ascending sort puts it on top), then lowest issue number - as the tiebreaker so equal-priority issues never depend on input/iteration - order.""" - return (-issue.priority, issue.number) + """Sort key giving a total, deterministic order: lowest ``priority`` value + first (P0 before P1 — most urgent wins), then lowest issue number as the + tiebreaker so equal-priority issues never depend on input/iteration order.""" + return (issue.priority, issue.number) def _reason(issue: Issue) -> str: diff --git a/app/afk/t3_client.py b/app/afk/t3_client.py index cb32e7f..c7ffd00 100644 --- a/app/afk/t3_client.py +++ b/app/afk/t3_client.py @@ -1,33 +1,46 @@ """Adapter for the in-cluster T3 Code instance — the AFK executor + cockpit. The control plane keeps the brain; T3 runs the agent. This module is the thin -wire between them: it turns "implement issue N of repo R with this prompt" into -the TWO HTTP commands T3's orchestration API needs, and reads the fleet -snapshot the watcher polls. It owns no AFK behaviour — the agent's standing -rules ride in as the ``ISSUE_IMPLEMENTER_PREAMBLE`` prepended to the turn -message, because T3's full-access ``claudeAgent`` runtime does NOT honour -``~/.claude/CLAUDE.md`` (see ``issue_implementer_prompt``). +wire between them, written against T3's **real** orchestration contract +(reverse-engineered from the v0.0.27 binary and verified live against t3-afk on +2026-06-15 — an earlier version of this adapter was written against a guessed +shape that a fake test accepted but the real server 400s). -Two operations, both against the dedicated in-cluster T3 pod: +The contract, in three facts that shape everything here: - * ``dispatch(repo, issue, prompt) -> thread_id`` — POSTs ``thread.create`` - then ``thread.turn.start`` to ``/api/orchestration/dispatch``. The create - command selects the ``claudeAgent`` instance in ``full-access`` runtime mode - and returns a thread id; the turn command targets that thread and delivers - ``ISSUE_IMPLEMENTER_PREAMBLE + prompt`` as ``message.text``. One dispatch = - one worktree-isolated worker. - * ``snapshot() -> dict`` — GETs ``/api/orchestration/snapshot``, the full fleet - read-model. T3 has no outbound webhooks, so the watcher polls this for - per-thread ``running``/``idle``/``error`` status. + 1. **Bare command envelope.** ``POST /api/orchestration/dispatch`` takes a + single command object whose discriminator is ``type`` (NOT a ``command`` + string, NOT a wrapper). The body *is* the command. + 2. **Client-authoritative IDs.** The CLIENT mints ``threadId`` / ``commandId`` + / ``messageId`` (UUIDs) and stamps ``createdAt`` (ISO-8601); the server + replies ``{"sequence": N}`` and does NOT echo the thread id. So ``dispatch`` + returns the id it generated, never one parsed from the response. + 3. **Threads live in a project.** A project's ``workspaceRoot`` is the repo + checkout the agent runs in (it ``cd``s there and commits there). So a repo + maps to a project; ``dispatch`` ensures that project exists before creating + the thread. -The HTTP transport and the bearer provider are **injected** (constructor -args), so the production wiring hands in an ``httpx.Client`` plus a Vault-backed -token reader, while tests hand in an in-memory fake — nothing here ever opens a -socket on its own. The bearer is re-read from the provider on **every** request -because T3's ``orchestration:operate`` token expires hourly and is refreshed out -of band. +Operations (the methods ``poller`` / ``watcher`` call, plus a multi-turn helper): + + * ``dispatch(repo, issue, prompt) -> thread_id`` — ensure the repo's project, + then ``thread.create`` + ``thread.turn.start`` (``ISSUE_IMPLEMENTER_PREAMBLE + + prompt`` as the user message). Returns the client-minted thread id. + * ``send_turn(thread_id, prompt) -> None`` — a follow-up user turn on an + existing thread. Multi-turn context is retained (verified live), so this is + how a conversation continues without spawning a fresh thread. + * ``snapshot() -> dict`` — the fleet read-model (``GET``); the watcher reads + per-thread ``latestTurn.state`` from it. + +The HTTP transport, the bearer provider, the id factory, and the clock are all +**injected**, so production hands in an ``httpx.Client`` + a Vault-backed token +reader + ``uuid4`` + a UTC clock, while tests hand in deterministic fakes. The +bearer is re-read from the provider on **every** request because T3's +``orchestration:operate`` token rotates. """ +import uuid from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime, timezone from typing import Protocol from .issue_implementer_prompt import ISSUE_IMPLEMENTER_PREAMBLE @@ -36,24 +49,69 @@ from .issue_implementer_prompt import ISSUE_IMPLEMENTER_PREAMBLE _DISPATCH_PATH = "/api/orchestration/dispatch" _SNAPSHOT_PATH = "/api/orchestration/snapshot" -# Pilot-baked dispatch envelope: which backend instance runs the thread and in -# which runtime mode. Constants (not config) — every AFK thread is identical. +# Pilot-baked execution envelope. ``claudeAgent`` is the embedded Claude Agent +# SDK instance; ``full-access`` is the unattended runtime (bypass-permissions); +# ``default`` interaction mode is normal turns (vs ``plan``). The model is the +# one the pilot validated — tunable via the constructor. _INSTANCE_ID = "claudeAgent" +_DEFAULT_MODEL = "claude-sonnet-4-6" _RUNTIME_MODE = "full-access" +_INTERACTION_MODE = "default" # JSON shapes. Command bodies and the snapshot read-model are open string-keyed # objects; ``object`` values keep us honest without a bare ``Any``. type Json = dict[str, object] -class HttpResponse(Protocol): - """The httpx-shaped response surface this adapter relies on. +def _uuid() -> str: + """Default id factory: a fresh random UUID string (thread/command/message ids).""" + return str(uuid.uuid4()) - Both ``httpx.Response`` and the test fake satisfy it: ``raise_for_status`` - turns a non-2xx into an exception (so a failed ``thread.create`` aborts - before ``thread.turn.start`` ever fires) and ``json`` parses the body. + +def _now_iso() -> str: + """Default clock: the current instant as an ISO-8601 UTC timestamp.""" + return datetime.now(timezone.utc).isoformat() + + +@dataclass(frozen=True) +class ProjectRef: + """Where a repo's agent runs. ``project_id`` is the stable T3 project id (the + client mints it, deterministically per repo); ``workspace_root`` is the repo + checkout directory the project points at (the agent's cwd); ``title`` is the + human label shown in the cockpit.""" + + project_id: str + workspace_root: str + title: str + + +def default_project_resolver(workspace_base: str = "/data") -> "Callable[[str], ProjectRef]": + """A repo -> :class:`ProjectRef` resolver with stable, deterministic ids. + + ``project_id`` is a UUID5 of the repo (so the same repo always resolves to the + same project across ticks and restarts — ``dispatch``'s ensure-project step + is therefore idempotent); ``workspace_root`` is ``/`` + where the slug flattens ``owner/name`` to a single path segment. The checkout + itself (cloning the repo into ``workspace_root``) is an enrollment concern, + not this adapter's — the agent or a provisioning step populates it. """ + def resolve(repo: str) -> ProjectRef: + slug = repo.replace("/", "__") + return ProjectRef( + project_id=str(uuid.uuid5(uuid.NAMESPACE_URL, f"afk-project:{repo}")), + workspace_root=f"{workspace_base.rstrip('/')}/{slug}", + title=repo, + ) + + return resolve + + +class HttpResponse(Protocol): + """The httpx-shaped response surface this adapter relies on: ``raise_for_status`` + turns a non-2xx into an exception (so a failed command aborts the sequence) + and ``json`` parses the body.""" + def raise_for_status(self) -> object: ... def json(self) -> Json: ... @@ -61,8 +119,8 @@ class HttpResponse(Protocol): class HttpClient(Protocol): """Minimal injected transport: a JSON ``post`` and a ``get``, both taking - explicit headers. Deliberately a strict subset of ``httpx.Client`` so the - real client passes one straight through and tests pass a recorder.""" + explicit headers. A strict subset of ``httpx.Client`` so the real client + passes straight through and tests pass a recorder.""" def post(self, url: str, json: Json, headers: dict[str, str]) -> HttpResponse: ... @@ -72,9 +130,11 @@ class HttpClient(Protocol): class T3Client: """Dispatch/snapshot adapter for one in-cluster T3 instance. - ``base_url`` is the T3 service root (a trailing slash is tolerated); - ``http`` is the injected transport; ``bearer_provider`` returns the current - ``orchestration:operate`` token, re-read per request for hourly rotation. + ``base_url`` is the T3 service root (a trailing slash is tolerated); ``http`` + is the injected transport; ``bearer_provider`` returns the current + ``orchestration:operate`` token, re-read per request; ``project_resolver`` + maps a repo to its :class:`ProjectRef`; ``id_factory`` / ``clock`` are + injected for deterministic tests (defaulting to ``uuid4`` / UTC now). """ def __init__( @@ -82,54 +142,113 @@ class T3Client: base_url: str, http: HttpClient, bearer_provider: Callable[[], str], + project_resolver: Callable[[str], ProjectRef] | None = None, + *, + id_factory: Callable[[], str] = _uuid, + clock: Callable[[], str] = _now_iso, + model: str = _DEFAULT_MODEL, ) -> None: self._base_url = base_url.rstrip("/") self._http = http self._bearer_provider = bearer_provider + self._project_for = project_resolver or default_project_resolver() + self._id = id_factory + self._now = clock + self._model = model # ----------------------------------------------------------------- # - # Public API (the ``t3_client.T3Client`` contract). + # Public API (the ``t3_client.T3Client`` contract the poller/watcher use). # ----------------------------------------------------------------- # def dispatch(self, repo: str, issue: int, prompt: str) -> str: """Spawn one worker thread for ``issue`` of ``repo`` and return its id. - Two POSTs to ``/api/orchestration/dispatch``: ``thread.create`` (selects - the ``claudeAgent`` instance, ``full-access`` runtime) yields the thread - id; ``thread.turn.start`` then delivers ``ISSUE_IMPLEMENTER_PREAMBLE + - prompt`` to that thread. A failed create raises and short-circuits the - turn (we never fire a turn at a thread that wasn't created). + Ensures the repo's project exists, generates the thread id locally, then + POSTs ``thread.create`` followed by ``thread.turn.start`` (delivering + ``ISSUE_IMPLEMENTER_PREAMBLE + prompt``). Any failed POST raises and + short-circuits the rest of the sequence. The returned id is the one this + method minted — the server never sends it back. """ - create_resp = self._post( - _DISPATCH_PATH, - { - "command": "thread.create", - "repo": repo, - "issue": issue, - "modelSelection": {"instanceId": _INSTANCE_ID}, - "runtimeMode": _RUNTIME_MODE, - }, - ) - thread_id = self._thread_id_of(create_resp.json()) + project = self._ensure_project(repo) + thread_id = self._id() - self._post( - _DISPATCH_PATH, - { - "command": "thread.turn.start", - "threadId": thread_id, - "message": {"text": ISSUE_IMPLEMENTER_PREAMBLE + prompt}, - }, - ) + self._post(self._thread_create_command(thread_id, project)) + self._post(self._turn_command(thread_id, ISSUE_IMPLEMENTER_PREAMBLE + prompt)) return thread_id + def send_turn(self, thread_id: str, prompt: str) -> None: + """Deliver a follow-up user turn to an existing thread (multi-turn). + + Used to continue a conversation — the agent retains the thread's prior + context across turns. No preamble: the standing rules were already + delivered on the opening turn. + """ + self._post(self._turn_command(thread_id, prompt)) + def snapshot(self) -> Json: """Return the parsed fleet read-model from ``/api/orchestration/snapshot``.""" return self._get(_SNAPSHOT_PATH).json() # ----------------------------------------------------------------- # - # Internals. + # Command builders (the real wire shapes). # ----------------------------------------------------------------- # - def _post(self, path: str, body: Json) -> HttpResponse: - resp = self._http.post(self._url(path), json=body, headers=self._headers()) + def _ensure_project(self, repo: str) -> ProjectRef: + """Make sure the repo's project exists, creating it if absent. Idempotent: + the resolver's project id is stable per repo, so a project already in the + snapshot is left untouched (no duplicate, no error).""" + project = self._project_for(repo) + existing = { + p.get("id") for p in self._get(_SNAPSHOT_PATH).json().get("projects", []) + } + if project.project_id not in existing: + self._post( + { + "type": "project.create", + "commandId": self._id(), + "projectId": project.project_id, + "title": project.title, + "workspaceRoot": project.workspace_root, + "createWorkspaceRootIfMissing": True, + "createdAt": self._now(), + } + ) + return project + + def _thread_create_command(self, thread_id: str, project: ProjectRef) -> Json: + return { + "type": "thread.create", + "commandId": self._id(), + "threadId": thread_id, + "projectId": project.project_id, + "title": project.title, + "modelSelection": {"instanceId": _INSTANCE_ID, "model": self._model}, + "runtimeMode": _RUNTIME_MODE, + "interactionMode": _INTERACTION_MODE, + "branch": None, + "worktreePath": None, + "createdAt": self._now(), + } + + def _turn_command(self, thread_id: str, text: str) -> Json: + return { + "type": "thread.turn.start", + "commandId": self._id(), + "threadId": thread_id, + "message": { + "messageId": self._id(), + "role": "user", + "text": text, + "attachments": [], + }, + "runtimeMode": _RUNTIME_MODE, + "interactionMode": _INTERACTION_MODE, + "createdAt": self._now(), + } + + # ----------------------------------------------------------------- # + # Transport internals. + # ----------------------------------------------------------------- # + def _post(self, command: Json) -> HttpResponse: + resp = self._http.post(self._url(_DISPATCH_PATH), json=command, headers=self._headers()) resp.raise_for_status() return resp @@ -143,17 +262,3 @@ class T3Client: def _headers(self) -> dict[str, str]: return {"Authorization": f"Bearer {self._bearer_provider()}"} - - @staticmethod - def _thread_id_of(create_response: Json) -> str: - """Extract the new thread id from a ``thread.create`` reply. - - T3 returns it as ``threadId``; we fail loudly on a malformed reply rather - than dispatch a turn at an empty/None id. - """ - thread_id = create_response.get("threadId") - if not isinstance(thread_id, str) or not thread_id: - raise ValueError( - f"thread.create response missing a usable threadId: {create_response!r}" - ) - return thread_id diff --git a/app/afk/watcher.py b/app/afk/watcher.py index c9c9cc0..b036dbe 100644 --- a/app/afk/watcher.py +++ b/app/afk/watcher.py @@ -7,10 +7,11 @@ and the fix-forward bookkeeping), one ``tick``: 1. **assemble a ``RunState``** from the live edges + the run's bookkeeping: * ``thread_status`` — from ``t3_client.snapshot()``, by finding this run's - thread and mapping T3's ``running``/``idle``/``error`` to a - ``ThreadStatus`` (missing thread, or any unrecognised status, folds to - ``None`` → "no status yet" → the state machine WAITs; we never escalate - or close on a status we don't understand); + thread and mapping its ``latestTurn.state`` (``completed`` → idle, + ``running``/``in_progress``/``pending`` → running, ``errored`` → error) + to a ``ThreadStatus`` (missing thread, no turn yet, or any unrecognised + state folds to ``None`` → "no status yet" → the state machine WAITs; we + never escalate or close on a status we don't understand); * ``ci_status`` — ``ci_watcher.status(repo, commit)`` *only* when a commit is pushed (no commit ⇒ nothing to check ⇒ ``None``); * ``pushed`` / ``fix_forward_attempts`` / ``elapsed_seconds`` — straight @@ -50,13 +51,22 @@ from .notifier import KIND_DONE, KIND_FROZEN, KIND_NEEDS_HUMAN from .poller import T3Port as _DispatchPort # dispatch(repo, issue, prompt) -> id from .types import Action, CIStatus, Config, Issue, Phase, RunState, ThreadStatus -# T3 snapshot status string -> ThreadStatus. Anything not in here (a status T3 -# adds later, or a malformed entry) maps to None — "no usable status yet" — so -# the state machine waits rather than acting on something it can't interpret. +# T3 ``latestTurn.state`` -> ThreadStatus. The real snapshot reports a thread's +# liveness as the state of its latest turn (verified against t3-afk v0.0.27): +# ``completed`` == the turn finished cleanly (agent is idle, awaiting input); +# any not-yet-finished state (``running``/``in_progress``/``pending``/``queued``/ +# ``pendingInit``) == still working; ``errored`` == the turn failed. Anything not +# in here (a state T3 adds later, or a malformed/absent entry) maps to None — +# "no usable status yet" — so the state machine waits rather than acting on +# something it can't interpret. _THREAD_STATUS_BY_STRING: dict[str, ThreadStatus] = { + "completed": ThreadStatus.IDLE, "running": ThreadStatus.RUNNING, - "idle": ThreadStatus.IDLE, - "error": ThreadStatus.ERROR, + "in_progress": ThreadStatus.RUNNING, + "pending": ThreadStatus.RUNNING, + "queued": ThreadStatus.RUNNING, + "pendingInit": ThreadStatus.RUNNING, + "errored": ThreadStatus.ERROR, } # Action -> the terminal doorbell kind to ring. Only the terminal actions appear; @@ -201,10 +211,13 @@ class Watcher: def _thread_status(self, thread_id: str) -> ThreadStatus | None: """This thread's liveness from the fleet snapshot, or ``None`` when the - thread is absent or its status string is one we don't recognise.""" + thread is absent, has no turn yet, or its ``latestTurn.state`` is one we + don't recognise. Liveness is the state of the thread's latest turn (the + real snapshot shape), not a top-level ``status`` field.""" for thread in self._t3.snapshot().get("threads", []): if thread.get("id") == thread_id: - return _THREAD_STATUS_BY_STRING.get(thread.get("status")) + latest_turn = thread.get("latestTurn") or {} + return _THREAD_STATUS_BY_STRING.get(latest_turn.get("state")) return None # ----------------------------------------------------------------- # diff --git a/tests/test_afk_dispatch_policy.py b/tests/test_afk_dispatch_policy.py index ee8cd83..3fc8b0b 100644 --- a/tests/test_afk_dispatch_policy.py +++ b/tests/test_afk_dispatch_policy.py @@ -7,11 +7,10 @@ touches a real T3 server, tracker, or cluster. The suite walks the full dispatchability matrix — trust gate, allowlist, per-repo lock, blocked_by, kill switch — plus the priority ordering and the one-agent-per-repo invariant. -Ordering contract under test: **higher ``priority`` first** (per the AFK module -spec), with a deterministic tiebreaker so the output is stable regardless of -input order. NOTE: ``Issue.priority``'s own docstring says "lower runs first"; -this module follows the explicit dispatch-policy spec instead — see the module -docstring in ``dispatch_policy.py``. +Ordering contract under test: **lower ``priority`` value first** (P0 before P1 +before P2 — most urgent wins), matching tracker conventions and +``Issue.priority``'s own docstring, with a deterministic tiebreaker (ascending +issue number) so the output is stable regardless of input order. """ import itertools @@ -183,13 +182,13 @@ def test_all_repos_in_flight_dispatches_nothing(make_issue, make_config): # same repo, even when both are eligible and the repo is not yet in-flight. # --------------------------------------------------------------------------- # def test_at_most_one_decision_per_repo(make_issue, make_config): - lo = make_issue(number=1, repo="infra", priority=1) - hi = make_issue(number=2, repo="infra", priority=9) + urgent = make_issue(number=1, repo="infra", priority=1) + minor = make_issue(number=2, repo="infra", priority=9) decisions = dispatch_policy.select_dispatchable( - [lo, hi], make_config(allowlist=["infra"]), set() + [urgent, minor], make_config(allowlist=["infra"]), set() ) assert len(decisions) == 1 - assert decisions[0].issue.number == 2 # the higher-priority one wins the slot + assert decisions[0].issue.number == 1 # most urgent (lowest value) wins the slot def test_one_decision_per_repo_across_many_repos(make_issue, make_config): @@ -202,20 +201,21 @@ def test_one_decision_per_repo_across_many_repos(make_issue, make_config): decisions = dispatch_policy.select_dispatchable( issues, make_config(allowlist=["infra", "realestate-crawler"]), set() ) - # One per repo, each the repo's highest-priority eligible issue. - assert _selected_set(decisions) == {11, 20} + # One per repo, each the repo's most urgent (lowest-value) eligible issue: + # infra -> #10 (p1 < p5); realestate-crawler -> #21 (p2 < p3). + assert _selected_set(decisions) == {10, 21} repos = [d.issue.repo for d in decisions] assert len(repos) == len(set(repos)) # no repo appears twice def test_ineligible_higher_priority_does_not_consume_repo_slot(make_issue, make_config): - """A higher-priority issue that is itself ineligible (e.g. blocked) must not - suppress a lower-priority *eligible* issue in the same repo — the slot goes - to the best ELIGIBLE candidate, not merely the highest-priority one.""" - blocked_hi = make_issue(number=1, repo="infra", priority=9, blocked_by=[99]) - ready_lo = make_issue(number=2, repo="infra", priority=1) + """A more-urgent issue that is itself ineligible (e.g. blocked) must not + suppress a less-urgent *eligible* issue in the same repo — the slot goes to + the best ELIGIBLE candidate, not merely the most urgent one.""" + blocked_urgent = make_issue(number=1, repo="infra", priority=1, blocked_by=[99]) + ready_minor = make_issue(number=2, repo="infra", priority=9) decisions = dispatch_policy.select_dispatchable( - [blocked_hi, ready_lo], make_config(allowlist=["infra"]), set() + [blocked_urgent, ready_minor], make_config(allowlist=["infra"]), set() ) assert _selected_numbers(decisions) == [2] @@ -249,18 +249,18 @@ def test_blocked_filters_only_blocked(make_issue, make_config): # --------------------------------------------------------------------------- # -# Priority ordering — higher priority first, deterministic tiebreaker. +# Priority ordering — lower priority value first, deterministic tiebreaker. # --------------------------------------------------------------------------- # -def test_higher_priority_first(make_issue, make_config): - lo = make_issue(number=1, repo="infra", priority=1) - mid = make_issue(number=2, repo="realestate-crawler", priority=5) - hi = make_issue(number=3, repo="SparkyFitness", priority=9) +def test_lower_priority_value_first(make_issue, make_config): + p1 = make_issue(number=1, repo="infra", priority=1) + p5 = make_issue(number=2, repo="realestate-crawler", priority=5) + p9 = make_issue(number=3, repo="SparkyFitness", priority=9) decisions = dispatch_policy.select_dispatchable( - [lo, hi, mid], + [p1, p9, p5], make_config(allowlist=["infra", "realestate-crawler", "SparkyFitness"]), set(), ) - assert _selected_numbers(decisions) == [3, 2, 1] # 9, 5, 1 + assert _selected_numbers(decisions) == [1, 2, 3] # priorities 1, 5, 9 def test_ordering_independent_of_input_order(make_issue, make_config): @@ -274,7 +274,7 @@ def test_ordering_independent_of_input_order(make_issue, make_config): ] allow = ["infra", "realestate-crawler", "SparkyFitness", "health"] config = make_config(allowlist=allow) - expected = [20, 30, 10, 40] # priorities 8,5,2,1 + expected = [40, 10, 30, 20] # priorities 1,2,5,8 (most urgent first) for perm in itertools.permutations(base): issues = [make_issue(number=n, repo=r, priority=p) for (r, n, p) in perm] @@ -305,7 +305,7 @@ def test_negative_and_zero_priorities_order_correctly(make_issue, make_config): make_config(allowlist=["infra", "realestate-crawler", "SparkyFitness"]), set(), ) - assert _selected_numbers(decisions) == [3, 2, 1] # 3 > 0 > -5 + assert _selected_numbers(decisions) == [1, 2, 3] # -5 < 0 < 3 (most urgent first) # --------------------------------------------------------------------------- # diff --git a/tests/test_afk_poller.py b/tests/test_afk_poller.py index d9d68d3..c88ff0c 100644 --- a/tests/test_afk_poller.py +++ b/tests/test_afk_poller.py @@ -176,14 +176,15 @@ def test_custom_in_progress_label_drives_the_lock(fake_tracker, fake_t3, make_is # --------------------------------------------------------------------------- # # One dispatch per repo per tick (the policy's one-agent-per-repo invariant, -# observed through the poller): highest-priority eligible issue wins the slot. +# observed through the poller): the most urgent (lowest-value) eligible issue +# wins the slot. # --------------------------------------------------------------------------- # def test_one_dispatch_per_repo_per_tick(fake_tracker, fake_t3, make_issue): fake_tracker.seed( "infra", [ - make_issue(number=1, repo="infra", priority=1), - make_issue(number=2, repo="infra", priority=9), # highest priority + make_issue(number=1, repo="infra", priority=1), # most urgent (lowest value) + make_issue(number=2, repo="infra", priority=9), make_issue(number=3, repo="infra", priority=5), ], ) @@ -191,8 +192,8 @@ def test_one_dispatch_per_repo_per_tick(fake_tracker, fake_t3, make_issue): _poller(fake_tracker, fake_t3).run_once(config) - assert _dispatched_pairs(fake_t3) == {("infra", 2)} - assert _added_in_progress(fake_tracker) == {("infra", 2)} + assert _dispatched_pairs(fake_t3) == {("infra", 1)} + assert _added_in_progress(fake_tracker) == {("infra", 1)} # --------------------------------------------------------------------------- # diff --git a/tests/test_afk_t3_client.py b/tests/test_afk_t3_client.py index e969c29..08d7b19 100644 --- a/tests/test_afk_t3_client.py +++ b/tests/test_afk_t3_client.py @@ -1,30 +1,34 @@ """Tests for ``app.afk.t3_client`` — the in-cluster T3 dispatch/snapshot adapter. -Everything here runs against an in-memory FAKE HTTP transport (``FakeHttp``); -no test touches a real T3 server, GitHub/Forgejo, or the cluster. The fake -records every request and replays staged responses, so the assertions pin the -wire contract the control plane depends on: +Everything runs against an in-memory FAKE HTTP transport; no test touches a real +T3 server. These assertions pin the **real** orchestration wire contract +(reverse-engineered from T3 v0.0.27 and verified live against t3-afk on +2026-06-15) — deliberately strict, because the previous version of this adapter +passed a laxer fake while 400-ing the real server. The fake therefore *rejects* +a command without a ``type`` discriminator, so a regression to the old +``{"command": "..."}` shape fails loudly here. - * ``dispatch`` issues exactly TWO POSTs to ``/api/orchestration/dispatch`` — - ``thread.create`` then ``thread.turn.start`` — carrying - ``modelSelection.instanceId == "claudeAgent"`` and ``runtimeMode == - "full-access"``, with ``ISSUE_IMPLEMENTER_PREAMBLE`` PREPENDED to - ``message.text`` and the thread id from the first response threaded into the - second. - * each request carries the ``Authorization: Bearer `` header from the - injected bearer provider (re-read per call, so token refresh is honoured). - * ``snapshot`` GETs ``/api/orchestration/snapshot`` and returns the parsed body. +Pinned facts: + * the dispatch body is a BARE command keyed by ``type`` (not ``command``); + * the CLIENT mints ``threadId``/``commandId``/``messageId`` + ``createdAt``; + ``dispatch`` returns the id it generated (the server replies ``{sequence}``); + * a thread lives in a project, so ``dispatch`` ensures the repo's project + (snapshot GET → ``project.create`` iff absent) before ``thread.create``; + * ``ISSUE_IMPLEMENTER_PREAMBLE`` is prepended to the opening turn's text; + * ``send_turn`` posts a follow-up turn (no preamble) on an existing thread; + * every request carries ``Authorization: Bearer ``, re-read per call. """ import pytest from app.afk import t3_client from app.afk.issue_implementer_prompt import ISSUE_IMPLEMENTER_PREAMBLE +_MODEL = "claude-sonnet-4-6" + # --------------------------------------------------------------------------- # -# Fake HTTP transport — httpx-shaped (``post``/``get`` → response with -# ``.json()`` + ``.raise_for_status()``), so the real client can hand the -# adapter a plain ``httpx.Client`` while tests hand it this recorder. +# Fake HTTP transport — httpx-shaped, but it ENFORCES the command envelope so a +# malformed command (the old bug) raises instead of silently passing. # --------------------------------------------------------------------------- # class FakeResponse: def __init__(self, payload: dict, status_code: int = 200) -> None: @@ -40,209 +44,222 @@ class FakeResponse: class FakeHttp: - """Records each POST/GET and replays queued responses in order. + """Records each POST/GET; GETs replay staged snapshots (default: no projects, + so ``dispatch`` creates one). POST bodies are validated as real commands.""" - ``post`` pops from ``post_responses`` (FIFO); ``get`` pops from - ``get_responses``. Each recorded call captures the url, json body, and - headers so tests can assert the two-command dispatch shape and the bearer. - """ - - def __init__( - self, - post_responses: list[dict] | None = None, - get_responses: list[dict] | None = None, - ) -> None: - self.post_responses = list(post_responses or []) + def __init__(self, get_responses: list[dict] | None = None) -> None: self.get_responses = list(get_responses or []) self.posts: list[dict] = [] self.gets: list[dict] = [] def post(self, url: str, json: dict, headers: dict) -> FakeResponse: + assert isinstance(json.get("type"), str) and json["type"], ( + f"command must carry a non-empty `type` discriminator, got {json!r}" + ) self.posts.append({"url": url, "json": json, "headers": headers}) - if not self.post_responses: - raise AssertionError("unexpected POST — no response staged") - return FakeResponse(self.post_responses.pop(0)) + return FakeResponse({"sequence": len(self.posts)}) # the real server reply def get(self, url: str, headers: dict) -> FakeResponse: self.gets.append({"url": url, "headers": headers}) - if not self.get_responses: - raise AssertionError("unexpected GET — no response staged") - return FakeResponse(self.get_responses.pop(0)) + body = self.get_responses.pop(0) if self.get_responses else {"projects": []} + return FakeResponse(body) + + # Convenience views over recorded POSTs, keyed by command type. + def commands(self, type_: str) -> list[dict]: + return [c["json"] for c in self.posts if c["json"]["type"] == type_] -# Two thread.create / thread.turn.start replies the happy-path dispatch needs. -_CREATE_REPLY = {"threadId": "thread-abc"} -_TURN_REPLY = {"ok": True} +def _ids(): + """Deterministic id factory: id-1, id-2, … so tests can reason about minting.""" + n = {"i": 0} + + def f() -> str: + n["i"] += 1 + return f"id-{n['i']}" + + return f -def _client(http: FakeHttp, *, base_url: str = "http://t3-afk:8080", token: str = "tok-1"): +def _resolver(repo: str) -> t3_client.ProjectRef: + """Predictable repo -> project mapping for assertions.""" + return t3_client.ProjectRef(f"proj-{repo}", f"/data/{repo}", repo) + + +def _client(http: FakeHttp, *, base_url="http://t3-afk:8080", token="tok-1", **kw): return t3_client.T3Client( base_url=base_url, http=http, bearer_provider=lambda: token, + project_resolver=_resolver, + id_factory=kw.pop("id_factory", _ids()), + clock=kw.pop("clock", lambda: "2026-06-15T00:00:00+00:00"), + model=_MODEL, ) -def _dispatch(http: FakeHttp, **kw) -> str: - repo = kw.pop("repo", "infra") - issue = kw.pop("issue", 42) - prompt = kw.pop("prompt", "Do the thing.") +def _dispatch(http: FakeHttp, *, repo="infra", issue=42, prompt="Do the thing.", **kw): return _client(http, **kw).dispatch(repo=repo, issue=issue, prompt=prompt) # --------------------------------------------------------------------------- # -# dispatch — the two-POST shape. +# dispatch — ensure-project, then create, then turn. # --------------------------------------------------------------------------- # -def test_dispatch_issues_exactly_two_posts_to_dispatch_endpoint(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_dispatch_ensures_project_then_creates_thread_then_turn_when_project_absent(): + http = FakeHttp(get_responses=[{"projects": []}]) _dispatch(http) - assert len(http.posts) == 2 - assert http.gets == [] + # one snapshot GET (the existence check) + three POSTs in order. + assert len(http.gets) == 1 + types = [c["json"]["type"] for c in http.posts] + assert types == ["project.create", "thread.create", "thread.turn.start"] for call in http.posts: assert call["url"] == "http://t3-afk:8080/api/orchestration/dispatch" -def test_dispatch_first_command_is_thread_create(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_dispatch_skips_project_create_when_project_already_exists(): + http = FakeHttp(get_responses=[{"projects": [{"id": "proj-infra"}]}]) + _dispatch(http, repo="infra") + types = [c["json"]["type"] for c in http.posts] + assert types == ["thread.create", "thread.turn.start"] # idempotent: no re-create + + +def test_dispatch_uses_type_discriminator_not_command_string(): + # Regression guard for the original bug: discriminator is `type`, and there is + # no legacy top-level `command` string key on any command. + http = FakeHttp() _dispatch(http) - assert http.posts[0]["json"]["command"] == "thread.create" - - -def test_dispatch_second_command_is_thread_turn_start(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) - _dispatch(http) - assert http.posts[1]["json"]["command"] == "thread.turn.start" - - -def test_dispatch_returns_thread_id_from_create_response(): - http = FakeHttp(post_responses=[{"threadId": "thread-xyz"}, _TURN_REPLY]) - assert _dispatch(http) == "thread-xyz" - - -def test_dispatch_threads_created_id_into_turn_start(): - http = FakeHttp(post_responses=[{"threadId": "thread-xyz"}, _TURN_REPLY]) - _dispatch(http) - # The second command must target the thread the first call created. - assert http.posts[1]["json"]["threadId"] == "thread-xyz" + for c in http.posts: + assert "type" in c["json"] + assert not isinstance(c["json"].get("command"), str) # --------------------------------------------------------------------------- # -# dispatch — model selection / runtime envelope (the pilot-baked constants). +# dispatch — thread.create real field set. # --------------------------------------------------------------------------- # -def test_dispatch_uses_claude_agent_instance_and_full_access_runtime(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) - _dispatch(http) - create_body = http.posts[0]["json"] - assert create_body["modelSelection"]["instanceId"] == "claudeAgent" - assert create_body["runtimeMode"] == "full-access" +def test_thread_create_carries_real_required_fields(): + http = FakeHttp() + _dispatch(http, repo="infra") + create = http.commands("thread.create")[0] + assert create["projectId"] == "proj-infra" + assert create["modelSelection"] == {"instanceId": "claudeAgent", "model": _MODEL} + assert create["runtimeMode"] == "full-access" + assert create["interactionMode"] == "default" + # NullOr fields are present (not omitted) — the schema requires the keys. + assert create["branch"] is None + assert create["worktreePath"] is None + # client-minted identity + timestamp. + assert isinstance(create["commandId"], str) and create["commandId"] + assert isinstance(create["threadId"], str) and create["threadId"] + assert create["createdAt"] == "2026-06-15T00:00:00+00:00" -def test_dispatch_create_carries_repo_and_issue(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) - _dispatch(http, repo="claude-agent-service", issue=7) - create_body = http.posts[0]["json"] - assert create_body["repo"] == "claude-agent-service" - assert create_body["issue"] == 7 +def test_dispatch_returns_client_minted_thread_id_not_a_server_value(): + http = FakeHttp() + returned = _dispatch(http) + create = http.commands("thread.create")[0] + turn = http.commands("thread.turn.start")[0] + # The returned id is the one WE put on thread.create (server only sends {sequence}). + assert returned == create["threadId"] == turn["threadId"] # --------------------------------------------------------------------------- # -# dispatch — the preamble PREPEND (behaviour injection). +# dispatch — thread.turn.start real message shape + preamble. # --------------------------------------------------------------------------- # -def test_dispatch_prepends_issue_implementer_preamble_to_message_text(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_turn_message_has_real_shape_and_prepends_preamble(): + http = FakeHttp() _dispatch(http, prompt="Implement issue 42 body here.") - text = http.posts[1]["json"]["message"]["text"] - assert text == ISSUE_IMPLEMENTER_PREAMBLE + "Implement issue 42 body here." + turn = http.commands("thread.turn.start")[0] + msg = turn["message"] + assert msg["role"] == "user" + assert isinstance(msg["messageId"], str) and msg["messageId"] + assert msg["attachments"] == [] + assert msg["text"] == ISSUE_IMPLEMENTER_PREAMBLE + "Implement issue 42 body here." + assert turn["runtimeMode"] == "full-access" + assert turn["interactionMode"] == "default" -def test_dispatch_preamble_comes_strictly_before_the_prompt(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) - _dispatch(http, prompt="UNIQUE-PROMPT-MARKER") - text = http.posts[1]["json"]["message"]["text"] - assert text.startswith(ISSUE_IMPLEMENTER_PREAMBLE) - assert text.index(ISSUE_IMPLEMENTER_PREAMBLE) < text.index("UNIQUE-PROMPT-MARKER") - # The raw prompt is preserved verbatim after the preamble. - assert text.endswith("UNIQUE-PROMPT-MARKER") - - -def test_dispatch_does_not_prepend_preamble_to_create_command(): - # The preamble belongs only on the turn message, not the thread.create call. - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_preamble_only_on_turn_not_on_create(): + http = FakeHttp() _dispatch(http) - assert "message" not in http.posts[0]["json"] + assert "message" not in http.commands("thread.create")[0] # --------------------------------------------------------------------------- # -# Auth — bearer header, read from the injected provider each call. +# send_turn — follow-up turn on an existing thread (multi-turn), no preamble. # --------------------------------------------------------------------------- # -def test_dispatch_sends_bearer_on_both_posts(): - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_send_turn_posts_single_turn_to_existing_thread_without_preamble(): + http = FakeHttp() + _client(http).send_turn("thread-xyz", "Just this follow-up.") + assert [c["json"]["type"] for c in http.posts] == ["thread.turn.start"] + turn = http.commands("thread.turn.start")[0] + assert turn["threadId"] == "thread-xyz" + assert turn["message"]["text"] == "Just this follow-up." # verbatim, no preamble + assert http.gets == [] # no project work for a follow-up + + +# --------------------------------------------------------------------------- # +# Auth — bearer on every request, re-read per call. +# --------------------------------------------------------------------------- # +def test_every_request_sends_bearer(): + http = FakeHttp() _dispatch(http, token="secret-token") for call in http.posts: assert call["headers"]["Authorization"] == "Bearer secret-token" + for call in http.gets: + assert call["headers"]["Authorization"] == "Bearer secret-token" -def test_bearer_provider_is_called_per_request_so_refresh_is_honoured(): - # A rotating provider proves the token isn't captured once at construction - # (T3's orchestration token expires hourly and must be re-read). - tokens = iter(["tok-A", "tok-B", "tok-C"]) - http = FakeHttp(post_responses=[_CREATE_REPLY, _TURN_REPLY]) +def test_bearer_is_reread_per_request_so_rotation_is_honoured(): + tokens = iter(["tok-A", "tok-B", "tok-C", "tok-D", "tok-E"]) + http = FakeHttp() client = t3_client.T3Client( base_url="http://t3-afk:8080", http=http, bearer_provider=lambda: next(tokens), + project_resolver=_resolver, + id_factory=_ids(), + clock=lambda: "t", ) client.dispatch(repo="infra", issue=1, prompt="x") - assert http.posts[0]["headers"]["Authorization"] == "Bearer tok-A" - assert http.posts[1]["headers"]["Authorization"] == "Bearer tok-B" + # GET(ensure) then POST(project.create) then POST(create) then POST(turn) — + # each pulled a fresh token in call order. + assert http.gets[0]["headers"]["Authorization"] == "Bearer tok-A" + assert http.posts[0]["headers"]["Authorization"] == "Bearer tok-B" + assert http.posts[1]["headers"]["Authorization"] == "Bearer tok-C" + assert http.posts[2]["headers"]["Authorization"] == "Bearer tok-D" # --------------------------------------------------------------------------- # # snapshot — GET + parse. # --------------------------------------------------------------------------- # -def test_snapshot_gets_snapshot_endpoint_and_returns_parsed_body(): - fleet = {"threads": [{"id": "thread-abc", "status": "running"}]} +def test_snapshot_gets_endpoint_and_returns_parsed_body(): + fleet = {"threads": [{"id": "t1", "latestTurn": {"state": "running"}}], "projects": []} http = FakeHttp(get_responses=[fleet]) result = _client(http).snapshot() assert result == fleet - assert len(http.gets) == 1 assert http.gets[0]["url"] == "http://t3-afk:8080/api/orchestration/snapshot" assert http.posts == [] -def test_snapshot_sends_bearer(): - http = FakeHttp(get_responses=[{"threads": []}]) - _client(http, token="snap-token").snapshot() - assert http.gets[0]["headers"]["Authorization"] == "Bearer snap-token" - - # --------------------------------------------------------------------------- # -# base_url handling — a trailing slash must not produce a double slash. +# base_url normalisation + error surfacing. # --------------------------------------------------------------------------- # def test_trailing_slash_in_base_url_is_normalised(): - http = FakeHttp( - post_responses=[_CREATE_REPLY, _TURN_REPLY], - get_responses=[{"threads": []}], - ) + http = FakeHttp() client = _client(http, base_url="http://t3-afk:8080/") client.dispatch(repo="infra", issue=1, prompt="x") - client.snapshot() assert http.posts[0]["url"] == "http://t3-afk:8080/api/orchestration/dispatch" assert http.gets[0]["url"] == "http://t3-afk:8080/api/orchestration/snapshot" -# --------------------------------------------------------------------------- # -# Error surfacing — a non-2xx response must raise, not be swallowed. -# --------------------------------------------------------------------------- # -def test_dispatch_raises_when_a_post_returns_an_error_status(): +def test_dispatch_raises_and_short_circuits_when_a_post_errors(): class ErroringHttp(FakeHttp): def post(self, url: str, json: dict, headers: dict) -> FakeResponse: - self.posts.append({"url": url, "json": json, "headers": headers}) + super().post(url, json, headers) # validates + records return FakeResponse({}, status_code=500) - http = ErroringHttp() + http = ErroringHttp(get_responses=[{"projects": [{"id": "proj-infra"}]}]) with pytest.raises(RuntimeError): - _dispatch(http) - # It failed on the FIRST call — never blindly fired thread.turn.start after - # a failed thread.create. - assert len(http.posts) == 1 + _dispatch(http, repo="infra") + # Project already existed, so the FIRST post is thread.create — and it failed, + # so thread.turn.start never fired. + assert [c["json"]["type"] for c in http.posts] == ["thread.create"] diff --git a/tests/test_afk_t3_live.py b/tests/test_afk_t3_live.py new file mode 100644 index 0000000..c9929fd --- /dev/null +++ b/tests/test_afk_t3_live.py @@ -0,0 +1,92 @@ +"""LIVE smoke test for ``app.afk.t3_client`` against a real T3 instance. + +Skipped by default. The unit tests (``test_afk_t3_client``) pin the wire shape +against a contract-accurate fake; this file proves the *same code* actually talks +to a live T3 — the guard that "green tests" mean "wired to T3", which the earlier +fake-only suite did NOT provide (it was green while the real server 400'd). + +It is opt-in because the orchestration API is in-cluster (ClusterIP + an +Authentik-gated ingress), so it can't run in CI without cluster access. Run it +from inside the cluster, or via a port-forward, with a bearer minted on the pod:: + + # bearer (on the t3-afk pod, as the node user): + # t3 auth session issue --token-only --base-dir /data/t3 --ttl 30m + kubectl -n t3-afk port-forward deploy/t3-afk 3773:3773 & + T3_AFK_BASE_URL=http://127.0.0.1:3773 T3_AFK_TOKEN= \ + python3 -m pytest tests/test_afk_t3_live.py -v + +The read-only snapshot check is always safe. The full dispatch round-trip +(create thread + turn + verify it appears, then delete it) only runs with +``T3_AFK_SMOKE_DISPATCH=1`` since it spends a (tiny) agent turn. +""" +import os +import time + +import pytest + +from app.afk import t3_client + +_BASE_URL = os.environ.get("T3_AFK_BASE_URL") +_TOKEN = os.environ.get("T3_AFK_TOKEN") + +pytestmark = pytest.mark.skipif( + not (_BASE_URL and _TOKEN), + reason="set T3_AFK_BASE_URL + T3_AFK_TOKEN to run the live T3 smoke test", +) + + +def _real_client(): + import httpx # local import so the module imports fine without httpx installed + + return t3_client.T3Client( + base_url=_BASE_URL, + http=httpx.Client(timeout=30.0), + bearer_provider=lambda: _TOKEN, + ) + + +def test_live_snapshot_has_the_real_shape(): + """A real snapshot parses and carries the keys the watcher/adapter depend on: + ``threads`` + ``projects``, and any thread exposes ``latestTurn`` (the + liveness source) — not a top-level ``status``.""" + snap = _real_client().snapshot() + assert isinstance(snap, dict) + assert "threads" in snap and "projects" in snap + for thread in snap["threads"]: + assert "id" in thread + # liveness lives under latestTurn.state (the contract this suite guards) + assert "status" not in thread, "real threads have no top-level status field" + + +@pytest.mark.skipif( + os.environ.get("T3_AFK_SMOKE_DISPATCH") != "1", + reason="set T3_AFK_SMOKE_DISPATCH=1 to run the dispatch round-trip (spends a turn)", +) +def test_live_dispatch_round_trip_then_cleanup(): + """End-to-end against the real server: ``dispatch`` (ensure-project + create + + turn) succeeds and the new thread shows up in the snapshot. Cleans up the + thread it created so the cockpit isn't littered.""" + import httpx + + repo = "afk-smoke/roundtrip" + client = _real_client() + thread_id = client.dispatch(repo, 1, "Reply with just: ok. Do not use any tools.") + assert isinstance(thread_id, str) and thread_id + + # The thread must appear in the fleet read-model (poll briefly — dispatch is + # accepted asynchronously). + found = False + for _ in range(10): + if any(t.get("id") == thread_id for t in client.snapshot().get("threads", [])): + found = True + break + time.sleep(1.0) + assert found, f"dispatched thread {thread_id} never appeared in the snapshot" + + # Cleanup: delete the throwaway thread (raw command — not part of the adapter). + httpx.post( + f"{_BASE_URL.rstrip('/')}/api/orchestration/dispatch", + headers={"Authorization": f"Bearer {_TOKEN}"}, + json={"type": "thread.delete", "commandId": t3_client._uuid(), "threadId": thread_id}, + timeout=30.0, + ).raise_for_status() diff --git a/tests/test_afk_watcher.py b/tests/test_afk_watcher.py index 052a58c..7bf7cbf 100644 --- a/tests/test_afk_watcher.py +++ b/tests/test_afk_watcher.py @@ -62,8 +62,21 @@ def _run( ) +# Map the tests' abstract liveness vocab to T3's REAL ``latestTurn.state`` strings +# so call sites stay readable while the snapshot carries the true shape the +# watcher parses (a finished turn is "completed", a failed one "errored", +# "running" is itself real). Unknown values pass through verbatim. +_REAL_STATE = {"idle": "completed", "error": "errored"} + + def _snapshot(thread_id: str, status: str) -> dict: - return {"threads": [{"id": thread_id, "status": status}]} + """A fleet snapshot with one thread whose latest turn is in ``status`` — real + shape ``threads[].latestTurn.state`` (not a top-level ``status`` field).""" + return { + "threads": [ + {"id": thread_id, "latestTurn": {"state": _REAL_STATE.get(status, status)}} + ] + } def _labels(fake_tracker): @@ -173,7 +186,7 @@ def test_close_success_posts_done_checklist( # --------------------------------------------------------------------------- # # ESCALATE_PREPUSH — agent stalled/errored before any push: hand to a human. # --------------------------------------------------------------------------- # -@pytest.mark.parametrize("thread_state", ["error", "idle"]) +@pytest.mark.parametrize("thread_state", ["errored", "completed"]) def test_escalate_prepush_relabels_and_notifies( fake_t3, fake_tracker, fake_ci, fake_notifier, make_issue, make_config, thread_state ): @@ -292,6 +305,47 @@ def test_unknown_thread_status_waits( assert fake_notifier.sent == [] +# --------------------------------------------------------------------------- # +# Real T3 ``latestTurn.state`` strings map to the right liveness (contract guard +# against the snapshot-shape drift that the previous adapter/fake masked). +# --------------------------------------------------------------------------- # +@pytest.mark.parametrize("state", ["running", "in_progress", "pending", "queued", "pendingInit"]) +def test_real_in_progress_states_keep_waiting( + fake_t3, fake_tracker, fake_ci, fake_notifier, make_issue, make_config, state +): + issue = make_issue(number=7, repo="infra") + fake_t3.set_snapshot({"threads": [{"id": "thread-0", "latestTurn": {"state": state}}]}) + result = _watcher(fake_t3, fake_tracker, fake_ci, fake_notifier).tick( + _run(issue, commit=None), make_config() + ) + assert result.action.value == "wait" # still working -> keep polling + + +def test_real_errored_state_escalates_when_nothing_pushed( + fake_t3, fake_tracker, fake_ci, fake_notifier, make_issue, make_config +): + # The real failure state is "errored" (not "error"); with nothing pushed it + # is a pre-push escalation, not a freeze. + issue = make_issue(number=7, repo="infra") + fake_t3.set_snapshot({"threads": [{"id": "thread-0", "latestTurn": {"state": "errored"}}]}) + result = _watcher(fake_t3, fake_tracker, fake_ci, fake_notifier).tick( + _run(issue, commit=None), make_config() + ) + assert result.action.value == "escalate_prepush" + + +def test_thread_present_but_no_turn_yet_waits( + fake_t3, fake_tracker, fake_ci, fake_notifier, make_issue, make_config +): + # A freshly-created thread has no latestTurn -> no usable status yet -> WAIT. + issue = make_issue(number=7, repo="infra") + fake_t3.set_snapshot({"threads": [{"id": "thread-0"}]}) + result = _watcher(fake_t3, fake_tracker, fake_ci, fake_notifier).tick( + _run(issue, commit=None), make_config() + ) + assert result.action.value == "wait" + + # --------------------------------------------------------------------------- # # Terminal cleanup only happens once / cleanly: a terminal tick posts exactly # one checklist comment (no double-commenting on the way out).