claude-agent-service/app/afk/tracker.py

244 lines
10 KiB
Python
Raw Permalink Normal View History

afk: add the autonomous issue-implementer loop (SHIPS DISABLED) Adds app/afk/ — the "away-from-keyboard" control plane that watches the issue tracker for ready-for-agent issues, dispatches each to a fresh full-access T3 thread (with the issue-implementer preamble prepended, because T3 does not honour ~/.claude/CLAUDE.md), and drives the resulting run through its lifecycle: tests-red -> green -> pushed -> CI -> deployed, escalating or fix-forwarding via a small pure state machine. The loop is split into pure cores (no I/O, exhaustively unit-tested) and thin injected adapters (the only edges that ever touch T3, the tracker, CI, or Slack — faked in every test, so nothing here talks to a real server, GitHub/Forgejo, or the cluster): pure: types, dispatch_policy, run_state_machine, phase_checklist, config, issue_implementer_prompt adapters: t3_client (two-POST dispatch + snapshot), tracker, ci_watcher, notifier loops: poller — CronJob tick #1: list_ready -> select_dispatchable -> dispatch + stamp the in-progress lock (label only AFTER a successful dispatch, so a failed dispatch never leaves a phantom lock). Per-repo lock derived from the ready set, since the CronJob is stateless between ticks. watcher — CronJob tick #2: assemble RunState from snapshot + CI -> next_action -> act (close on success; relabel ready-for-human + ring the doorbell on the two escalations; dispatch a corrective turn on fix-forward; refresh the progress checklist). SHIPS DISABLED, on purpose: Config defaults to kill_switch=True AND an empty allowlist, so a freshly-loaded config dispatches nothing and does zero I/O. The package is not imported by the running service and has no auto-enable path. Arming it is a deliberate, later, manual step requiring BOTH gates (clear the kill switch AND enrol the exact repos) so one fat-fingered env var can't arm every repo. Test-first throughout: 412 tests pass (poller + watcher add integration tests wiring the real pure cores to in-memory fakes). mypy clean. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-15 21:15:11 +00:00
"""Issue-tracker adapter — the loop's read/write port onto GitHub issues.
``Tracker`` is the only place the AFK loop touches the issue tracker. It wraps an
injected ``GitHubClient`` (the port) so the policy/state-machine code and the
tests never depend on a real ``gh`` or the network: production injects
``GhCliClient`` (shells out to ``gh`` with no-shell argv); tests inject a fake.
The split is deliberate. The ``GitHubClient`` port speaks only in *primitives*
(list raw issues for a label, fetch a single issue's label events, and the four
mutations). All the loop-specific *decisions* live on ``Tracker``:
* ``labeled_by_trusted`` decided **fail-closed** from the actor who made the
most-recent application of the ready label. On private repos only
collaborators can label, so the label *is* the authorization (design doc,
"Trigger & dispatch predicate"); an unattributable label is never trusted.
* ``blocked_by`` the issue numbers in the body's "Blocked by #N" clauses
(the per-issue dependency the design doc gates dispatch on).
* ``priority`` read off a ``priority:<n>`` label, lowest wins (lower runs
first, matching ``Issue.priority`` semantics in ``types``).
Keeping the decisions here, not in the client, is what lets the whole read path
be tested against a thin fake. Mutations (``add_label`` / ``remove_label`` /
``comment`` / ``close``) are pass-throughs the loop drives during a run.
"""
import json
import re
from collections.abc import Callable
from subprocess import PIPE, run
from typing import Protocol, runtime_checkable
from .types import Issue
# Trusted author associations: GitHub tags each issue event actor with their
# association to the repo. Only these may arm an issue for the AFK loop — the
# trust gate from the design doc. Overridable per Tracker for a tighter policy.
DEFAULT_TRUSTED_ASSOCIATIONS: frozenset[str] = frozenset({"OWNER", "MEMBER", "COLLABORATOR"})
# Default gating label; mirrors Config.ready_label so a Tracker built without an
# explicit override matches the production default.
DEFAULT_READY_LABEL = "ready-for-agent"
# "Blocked by #3, #4 and #10" → [3, 4, 10]. We match a "blocked by" lead-in
# (case-insensitive) and then harvest every "#<n>" in the clause that follows,
# up to the next line break — so a bare "#7 for context" elsewhere is ignored.
_BLOCKED_BY_CLAUSE = re.compile(r"blocked\s+by\b([^\n\r]*)", re.IGNORECASE)
_ISSUE_REF = re.compile(r"#(\d+)")
# "priority:2" → 2. Anything non-numeric (e.g. "priority:high") is not a numeric
# priority and is skipped.
_PRIORITY_LABEL = re.compile(r"^priority:(\d+)$")
@runtime_checkable
class GitHubClient(Protocol):
"""The primitive surface ``Tracker`` depends on — one issue tracker, faked
in tests. Implementations must not embed loop policy; they only fetch raw
data and perform the four mutations.
``list_issues`` returns the ``gh issue list --json number,labels,body`` shape
(``labels`` is a list of ``{"name": ...}``; ``body`` may be ``None``).
``label_events`` returns the ``labeled`` timeline events for one issue, each
with ``label.name``, ``actor.login`` and ``author_association``.
"""
def list_issues(self, repo: str, label: str) -> list[dict]: ...
def label_events(self, repo: str, number: int) -> list[dict]: ...
def add_label(self, repo: str, number: int, label: str) -> None: ...
def remove_label(self, repo: str, number: int, label: str) -> None: ...
def comment(self, repo: str, number: int, body: str) -> None: ...
def close(self, repo: str, number: int) -> None: ...
class Tracker:
"""Adapter that turns raw issue-tracker data into ``Issue`` records and
relays mutations, over an injected :class:`GitHubClient`."""
def __init__(
self,
client: GitHubClient,
ready_label: str = DEFAULT_READY_LABEL,
trusted_associations: frozenset[str] = DEFAULT_TRUSTED_ASSOCIATIONS,
) -> None:
self.client = client
self.ready_label = ready_label
self.trusted_associations = trusted_associations
# ----------------------------------------------------------------- reads #
def list_ready(self, repos: list[str]) -> list[Issue]:
"""Every ready-labeled open issue across ``repos``, as ``Issue`` records.
Ordering follows the client's per-repo order; dispatch ordering by
priority is the dispatch policy's job, not the tracker's.
"""
issues: list[Issue] = []
for repo in repos:
for raw in self.client.list_issues(repo, self.ready_label):
issues.append(self._to_issue(repo, raw))
return issues
def _to_issue(self, repo: str, raw: dict) -> Issue:
number = int(raw["number"])
labels = [lbl["name"] for lbl in raw.get("labels", [])]
return Issue(
number=number,
repo=repo,
labels=labels,
blocked_by=_parse_blocked_by(raw.get("body")),
labeled_by_trusted=self._is_labeled_by_trusted(repo, number),
priority=_parse_priority(labels),
)
def _is_labeled_by_trusted(self, repo: str, number: int) -> bool:
"""True iff the MOST RECENT application of the ready label was made by a
trusted actor. Fail-closed: no attributable application not trusted."""
last_association: str | None = None
for event in self.client.label_events(repo, number):
if event.get("event") != "labeled":
continue
if (event.get("label") or {}).get("name") != self.ready_label:
continue
last_association = event.get("author_association")
return last_association in self.trusted_associations
# ------------------------------------------------------------- mutations #
def add_label(self, repo: str, issue: int, label: str) -> None:
self.client.add_label(repo, issue, label)
def remove_label(self, repo: str, issue: int, label: str) -> None:
self.client.remove_label(repo, issue, label)
def comment(self, repo: str, issue: int, body: str) -> None:
self.client.comment(repo, issue, body)
def close(self, repo: str, issue: int) -> None:
self.client.close(repo, issue)
# --------------------------------------------------------------------------- #
# Parsing helpers — pure functions, no I/O.
# --------------------------------------------------------------------------- #
def _parse_blocked_by(body: str | None) -> list[int]:
"""Issue numbers referenced in the body's "Blocked by #N" clauses.
Order-preserving and de-duplicated; bare "#N" mentions outside a "blocked by"
clause are ignored. A missing/empty body yields ``[]``.
"""
if not body:
return []
seen: dict[int, None] = {} # insertion-ordered set
for clause in _BLOCKED_BY_CLAUSE.findall(body):
for ref in _ISSUE_REF.findall(clause):
seen.setdefault(int(ref), None)
return list(seen)
def _parse_priority(labels: list[str]) -> int:
"""Numeric priority from a ``priority:<n>`` label, lowest wins; 0 if none."""
priorities = [
int(match.group(1))
for label in labels
if (match := _PRIORITY_LABEL.match(label))
]
return min(priorities) if priorities else 0
# --------------------------------------------------------------------------- #
# Concrete client — shells out to `gh`. Injected `run` keeps it testable.
# --------------------------------------------------------------------------- #
def _default_run(argv: list[str]) -> str:
"""Run ``argv`` with no shell and return stdout (text). Raises on non-zero.
List argv (never a shell string), matching the no-injection-surface pattern
the breakglass/main subprocess helpers use the repo/label/body values are
never interpreted by a shell.
"""
proc = run(argv, stdout=PIPE, stderr=PIPE, text=True, check=False)
if proc.returncode != 0:
raise RuntimeError(f"{argv[0]} failed ({proc.returncode}): {proc.stderr[:200]}")
return proc.stdout
class GhCliClient:
""":class:`GitHubClient` backed by the ``gh`` CLI.
``repo_owner`` is the GitHub owner/org the sub-project repos live under, so a
bare repo name (``"infra"``) becomes the ``--repo owner/infra`` slug ``gh``
wants. ``run`` is the subprocess runner (defaults to the real no-shell one);
tests inject a fake to capture argv without spawning ``gh``.
"""
def __init__(self, repo_owner: str, run: Callable[[list[str]], str] = _default_run) -> None:
self.repo_owner = repo_owner
self._run = run
def _slug(self, repo: str) -> str:
return f"{self.repo_owner}/{repo}"
def list_issues(self, repo: str, label: str) -> list[dict]:
out = self._run([
"gh", "issue", "list", "--repo", self._slug(repo),
"--label", label, "--state", "open",
"--json", "number,labels,body", "--limit", "100",
])
return _loads_list(out)
def label_events(self, repo: str, number: int) -> list[dict]:
out = self._run([
"gh", "api",
f"repos/{self._slug(repo)}/issues/{number}/timeline",
"--paginate",
"-H", "Accept: application/vnd.github+json",
])
events = _loads_list(out)
return [e for e in events if e.get("event") == "labeled"]
def add_label(self, repo: str, number: int, label: str) -> None:
self._run([
"gh", "issue", "edit", str(number), "--repo", self._slug(repo),
"--add-label", label,
])
def remove_label(self, repo: str, number: int, label: str) -> None:
self._run([
"gh", "issue", "edit", str(number), "--repo", self._slug(repo),
"--remove-label", label,
])
def comment(self, repo: str, number: int, body: str) -> None:
self._run([
"gh", "issue", "comment", str(number), "--repo", self._slug(repo),
"--body", body,
])
def close(self, repo: str, number: int) -> None:
self._run(["gh", "issue", "close", str(number), "--repo", self._slug(repo)])
def _loads_list(out: str) -> list[dict]:
"""Parse ``gh`` JSON stdout into a list of dicts. Empty stdout → ``[]``."""
text = out.strip()
if not text:
return []
return json.loads(text)