From 6de4549a961931d509a4edeb2facff70ffd7928c Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 17 May 2026 21:03:17 +0000 Subject: [PATCH] docs/plans: add agent presence implementation plan (2026-05-17) 15-task plan for a shared presence board so Claude Code sessions can see which shared infra resources are being actively mutated by other sessions. Resource-scoped claims on the existing Dolt server, heartbeat-driven TTL, agent-driven via CLAUDE.md rule + Python CLI. --- docs/plans/2026-05-17-agent-presence-plan.md | 1495 ++++++++++++++++++ 1 file changed, 1495 insertions(+) create mode 100644 docs/plans/2026-05-17-agent-presence-plan.md diff --git a/docs/plans/2026-05-17-agent-presence-plan.md b/docs/plans/2026-05-17-agent-presence-plan.md new file mode 100644 index 00000000..11db9759 --- /dev/null +++ b/docs/plans/2026-05-17-agent-presence-plan.md @@ -0,0 +1,1495 @@ +# Agent Presence Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a shared presence board so Claude Code agent sessions can see which shared infra resources are being actively mutated by other sessions, preventing redundant investigations and overlapping operations. + +**Architecture:** Single-table store on the existing Dolt server (`10.0.20.200:3306`, `beads` DB, new `presence_claims` table). Python single-file CLI (`scripts/presence`) writes/reads claims. Heartbeat-driven TTL — entries expire 15 min after the last heartbeat, so "left unclosed" is structurally impossible. A consolidated UserPromptSubmit hook injects other sessions' active claims into every turn for ambient awareness. CLAUDE.md rule mandates agents claim before mutating shared state. + +**Tech Stack:** Python 3 stdlib + `pymysql`; Dolt (MySQL-compatible) at `10.0.20.200:3306`; Bash hooks; Terraform Kubernetes provider. + +**Coverage of design decisions (locked in grilling):** +- Pure presence/coordination — not work tracking +- Resource-scoped entries (`:`) +- Heartbeat TTL + Stop-hook release +- Agent-driven claim via CLI invoked from agent reasoning per CLAUDE.md rule +- Stored on Dolt `beads` DB, new table +- CLI verbs: `claim`, `heartbeat`, `release`, `list`, `peek` +- UserPromptSubmit hook consolidates beads + presence +- Seed vocab: `node:`, `host:`, `stack:`, `service:`, `db:`, `pvc:`, `infra:` +- Only mutating ops trigger claim +- Co-claim allowed; soft-defer protocol on conflict +- MVP devvm only (no claude-agent-service / Woodpecker) +- Beads coexists with cleaned semantics +- Pure rule + visibility for enforcement (measure first) +- Python single-file CLI at `~/code/scripts/presence` + +--- + +## File Structure + +**New files:** +- `scripts/presence` — Python single-file CLI (~250 lines) +- `scripts/tests/test_presence.py` — pytest unit tests for the CLI +- `scripts/tests/conftest.py` — pytest fixtures (mocked DB) +- `.claude/hooks/presence-session-start.sh` — generates session ID at start +- `.claude/hooks/presence-heartbeat.sh` — throttled heartbeat on PostToolUse +- `.claude/hooks/presence-release.sh` — release on Stop +- `.claude/hooks/agent-state-context.sh` — consolidated beads+presence injector (replaces user-global `beads-task-context.sh`) + +**Modified files:** +- `infra/stacks/beads-server/main.tf` — add `presence_claims` schema init +- `.claude/settings.json` — wire new hooks; swap UserPromptSubmit to consolidated script +- `CLAUDE.md` — add the claim-before-mutate rule, seed vocab, defer protocol + +**Touched-but-untouched (audit only):** +- Stale `in_progress` beads items (close or revert to `open`) + +--- + +## Task 1: Create `presence_claims` table on the Dolt server + +**Files:** +- Modify: `infra/stacks/beads-server/main.tf` — extend the existing `kubernetes_config_map.dolt_init` data block + add a `kubernetes_job` for idempotent table creation on already-running Dolt +- Apply via `scripts/tg apply` from `infra/stacks/beads-server/` + +The `dolt_init` ConfigMap only runs on fresh Dolt PVCs. Since Dolt is already running with the existing PV, the new SQL won't fire from there. The Job is the workaround for live updates and stays idempotent forever. + +- [ ] **Step 1: Add the schema SQL into the existing `dolt_init` ConfigMap** + +In `infra/stacks/beads-server/main.tf`, locate `resource "kubernetes_config_map" "dolt_init"` and add a second data entry: + +```hcl +resource "kubernetes_config_map" "dolt_init" { + metadata { + name = "dolt-init" + namespace = kubernetes_namespace.beads.metadata[0].name + } + data = { + "01-create-beads-user.sql" = <<-EOT + CREATE USER IF NOT EXISTS 'beads'@'%' IDENTIFIED BY ''; + GRANT ALL PRIVILEGES ON *.* TO 'beads'@'%' WITH GRANT OPTION; + EOT + "02-create-presence-table.sql" = <<-EOT + CREATE DATABASE IF NOT EXISTS beads; + USE beads; + CREATE TABLE IF NOT EXISTS presence_claims ( + session_id VARCHAR(128) NOT NULL, + resource_label VARCHAR(255) NOT NULL, + purpose TEXT NOT NULL, + claimed_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), + expires_at DATETIME(3) NOT NULL, + host VARCHAR(128) NOT NULL, + user VARCHAR(64) NOT NULL, + agent_name VARCHAR(64) DEFAULT 'claude-code', + PRIMARY KEY (session_id, resource_label), + INDEX idx_resource (resource_label), + INDEX idx_expires (expires_at) + ); + EOT + } +} +``` + +- [ ] **Step 2: Add an idempotent migration Job that creates the table on the running Dolt** + +Append a new resource block in `infra/stacks/beads-server/main.tf`, after the `kubernetes_deployment.dolt` resource: + +```hcl +resource "kubernetes_job" "presence_schema_migrate" { + metadata { + # name includes a hash of the SQL so a real schema change forces a new Job + name = "presence-schema-${substr(sha256(kubernetes_config_map.dolt_init.data["02-create-presence-table.sql"]), 0, 8)}" + namespace = kubernetes_namespace.beads.metadata[0].name + } + spec { + backoff_limit = 3 + template { + metadata {} + spec { + restart_policy = "OnFailure" + container { + name = "migrate" + image = "mysql:8.4" + command = ["sh", "-c"] + args = [ + "mysql -h dolt.beads-server.svc.cluster.local -P 3306 -u root < /sql/02-create-presence-table.sql" + ] + volume_mount { + name = "sql" + mount_path = "/sql" + } + } + volume { + name = "sql" + config_map { + name = kubernetes_config_map.dolt_init.metadata[0].name + } + } + } + } + } + wait_for_completion = true + timeouts { + create = "5m" + } + depends_on = [kubernetes_deployment.dolt] +} +``` + +- [ ] **Step 3: Apply the Terraform change** + +Run: +```bash +cd /home/wizard/code/infra/stacks/beads-server +../../scripts/tg apply +``` +Expected: `kubernetes_config_map.dolt_init` updated + `kubernetes_job.presence_schema_migrate` created + Job completes successfully. + +- [ ] **Step 4: Verify the table exists** + +Run: +```bash +mysql -h 10.0.20.200 -u beads -e "USE beads; SHOW TABLES LIKE 'presence_claims'; DESCRIBE presence_claims;" +``` +Expected: one row `presence_claims` from `SHOW TABLES`; DESCRIBE shows the 8 columns with the right types. + +- [ ] **Step 5: Commit** + +```bash +git add infra/stacks/beads-server/main.tf +git commit -m "beads-server: add presence_claims table for agent coordination + +Adds the schema for the new agent presence board. Live Dolt is updated +via a hashed-named one-shot Job; the ConfigMap entry preserves fresh-PVC +init. +" +``` + +--- + +## Task 2: Python CLI scaffolding (argparse + DB connection) + +**Files:** +- Create: `scripts/presence` +- Create: `scripts/tests/test_presence.py` +- Create: `scripts/tests/conftest.py` + +- [ ] **Step 1: Write the failing test for `--help`** + +Create `scripts/tests/test_presence.py`: + +```python +import subprocess +from pathlib import Path + +SCRIPT = Path(__file__).parent.parent / "presence" + + +def test_help_lists_subcommands(): + """--help should list all supported subcommands.""" + result = subprocess.run( + [str(SCRIPT), "--help"], capture_output=True, text=True + ) + assert result.returncode == 0 + for verb in ("claim", "heartbeat", "release", "list", "peek"): + assert verb in result.stdout +``` + +- [ ] **Step 2: Run the test, confirm it fails** + +Run: `pytest scripts/tests/test_presence.py::test_help_lists_subcommands -v` +Expected: FAIL — `scripts/presence` doesn't exist yet (FileNotFoundError). + +- [ ] **Step 3: Create the CLI skeleton** + +Create `scripts/presence`: + +```python +#!/usr/bin/env python3 +"""Agent presence board CLI. + +Lets Claude Code agent sessions claim, heartbeat, release, list, and peek at +shared infra resource claims so that two sessions don't unknowingly mutate +the same thing at the same time. + +Reads connection details from env: + PRESENCE_DSN mysql DSN (default: beads@10.0.20.200:3306/beads) + CLAUDE_SESSION_ID session identity (default: read from session-id file) +""" + +from __future__ import annotations + +import argparse +import getpass +import json +import os +import socket +import sys +import uuid +from pathlib import Path + +SESSION_ID_FILE = Path.home() / ".cache" / "claude-presence" / "current.session" +DEFAULT_DSN = "mysql://beads@10.0.20.200:3306/beads" +DEFAULT_TTL_SECONDS = 15 * 60 + + +def get_session_id() -> str: + """Return the current session ID, generating a fallback if missing.""" + env = os.environ.get("CLAUDE_SESSION_ID") + if env: + return env + if SESSION_ID_FILE.exists(): + return SESSION_ID_FILE.read_text().strip() + # Fallback: ephemeral one-shot id (won't be cleaned up by Stop hook) + return f"{getpass.getuser()}@{socket.gethostname().split('.')[0]}@{uuid.uuid4().hex[:8]}" + + +def build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + prog="presence", + description="Agent presence board for coordinating shared-infra mutations.", + ) + p.add_argument("--json", action="store_true", help="emit machine-readable output") + sub = p.add_subparsers(dest="verb", required=True) + + c = sub.add_parser("claim", help="claim a resource you're about to mutate") + c.add_argument("label", help="resource label, e.g. node:k8s-node1") + c.add_argument("--purpose", required=True, help="what + why") + c.add_argument("--ttl", type=int, default=DEFAULT_TTL_SECONDS, help="seconds") + + sub.add_parser("heartbeat", help="extend TTL on all my active claims") + + r = sub.add_parser("release", help="release one or all of my claims") + r.add_argument("label", nargs="?", help="resource label; omit with --all-mine") + r.add_argument("--all-mine", action="store_true") + + li = sub.add_parser("list", help="show active claims") + g = li.add_mutually_exclusive_group() + g.add_argument("--mine", action="store_true") + g.add_argument("--all", action="store_true", default=True) + + pe = sub.add_parser("peek", help="show all active claims on a resource") + pe.add_argument("label", help="resource label") + + return p + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + # Verbs implemented in later tasks; stub for now so --help works. + print(f"verb={args.verb} not yet implemented", file=sys.stderr) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) +``` + +- [ ] **Step 4: Make it executable** + +Run: `chmod +x /home/wizard/code/scripts/presence` + +- [ ] **Step 5: Re-run the test, confirm it passes** + +Run: `pytest scripts/tests/test_presence.py::test_help_lists_subcommands -v` +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add scripts/presence scripts/tests/test_presence.py +git commit -m "presence: add CLI scaffolding with argparse subcommands" +``` + +--- + +## Task 3: `claim` verb — write to DB, return conflicts + +**Files:** +- Modify: `scripts/presence` +- Modify: `scripts/tests/test_presence.py` +- Create: `scripts/tests/conftest.py` + +- [ ] **Step 1: Add pymysql + fixture scaffolding in conftest** + +Create `scripts/tests/conftest.py`: + +```python +import os +from unittest.mock import MagicMock + +import pytest + + +@pytest.fixture +def fake_db(monkeypatch): + """Mocks pymysql.connect to return a MagicMock cursor we can inspect.""" + conn = MagicMock(name="conn") + cur = MagicMock(name="cur") + conn.cursor.return_value.__enter__.return_value = cur + cur.fetchall.return_value = [] + + import pymysql + monkeypatch.setattr(pymysql, "connect", MagicMock(return_value=conn)) + monkeypatch.setenv("CLAUDE_SESSION_ID", "wizard@devvm@testtest") + return cur +``` + +- [ ] **Step 2: Write the failing test for `claim` happy path** + +Append to `scripts/tests/test_presence.py`: + +```python +import importlib.util +import sys +from pathlib import Path + + +def _load_module(): + spec = importlib.util.spec_from_file_location("presence", SCRIPT) + mod = importlib.util.module_from_spec(spec) + sys.modules["presence"] = mod + spec.loader.exec_module(mod) + return mod + + +def test_claim_inserts_row(fake_db): + presence = _load_module() + rc = presence.main(["claim", "node:k8s-node1", "--purpose", "GPU upgrade"]) + assert rc == 0 + # First call: insert/upsert; second: read existing other-session claims + sql_calls = [c.args[0] for c in fake_db.execute.call_args_list] + assert any("INSERT" in s.upper() or "REPLACE" in s.upper() for s in sql_calls) + assert any("SELECT" in s.upper() for s in sql_calls) + + +def test_claim_reports_other_session_conflict(fake_db, capsys): + presence = _load_module() + # Simulate one OTHER session already holding the label + fake_db.fetchall.return_value = [ + { + "session_id": "emo@laptop@aaaaaaaa", + "purpose": "tcpdump on uplink", + "claimed_at": "2026-05-17 14:10:00.000", + "user": "emo", + "host": "laptop", + } + ] + rc = presence.main(["claim", "node:k8s-node1", "--purpose", "GPU upgrade"]) + out = capsys.readouterr().out + assert rc == 0 + assert "emo@laptop@aaaaaaaa" in out + assert "tcpdump on uplink" in out +``` + +- [ ] **Step 3: Run the tests, confirm they fail** + +Run: `pytest scripts/tests/test_presence.py -v -k claim` +Expected: 2 failures — `claim` verb not implemented (stub prints "not yet implemented"). + +- [ ] **Step 4: Implement `claim` in `scripts/presence`** + +Replace the bottom of `scripts/presence` (the stub `main`) with this. Also add the DB helpers and `_claim` function above `main`: + +```python +import urllib.parse + +try: + import pymysql + import pymysql.cursors +except ImportError: + pymysql = None # graceful: handled in _connect + + +def _connect(): + if pymysql is None: + return None + dsn = os.environ.get("PRESENCE_DSN", DEFAULT_DSN) + u = urllib.parse.urlparse(dsn) + try: + return pymysql.connect( + host=u.hostname, + port=u.port or 3306, + user=u.username or "beads", + password=u.password or "", + database=(u.path.lstrip("/") or "beads"), + cursorclass=pymysql.cursors.DictCursor, + connect_timeout=3, + autocommit=True, + ) + except Exception as e: + print(f"presence: warning: dolt unreachable ({e}); continuing", file=sys.stderr) + return None + + +def _claim(args, session_id: str) -> int: + conn = _connect() + if conn is None: + return 0 # graceful degradation + with conn.cursor() as cur: + cur.execute( + """ + REPLACE INTO presence_claims + (session_id, resource_label, purpose, claimed_at, expires_at, host, user, agent_name) + VALUES + (%s, %s, %s, NOW(3), NOW(3) + INTERVAL %s SECOND, %s, %s, %s) + """, + ( + session_id, + args.label, + args.purpose, + args.ttl, + socket.gethostname().split(".")[0], + getpass.getuser(), + "claude-code", + ), + ) + cur.execute( + """ + SELECT session_id, purpose, claimed_at, user, host + FROM presence_claims + WHERE resource_label = %s + AND session_id != %s + AND expires_at > NOW(3) + ORDER BY claimed_at + """, + (args.label, session_id), + ) + conflicts = cur.fetchall() + if not conflicts: + print(f"presence: claimed {args.label}") + return 0 + print(f"presence: claimed {args.label} -- ALSO CLAIMED BY:") + for c in conflicts: + print(f" - {c['session_id']} ({c['user']}@{c['host']}): {c['purpose']} since {c['claimed_at']}") + print("presence: per CLAUDE.md rule, default is to DEFER — release your claim and confirm with the user.") + return 0 +``` + +Update `main` to dispatch: + +```python +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + session_id = get_session_id() + if args.verb == "claim": + return _claim(args, session_id) + print(f"verb={args.verb} not yet implemented", file=sys.stderr) + return 0 +``` + +- [ ] **Step 5: Run tests, confirm they pass** + +Run: `pytest scripts/tests/test_presence.py -v -k claim` +Expected: both `test_claim_inserts_row` and `test_claim_reports_other_session_conflict` PASS. + +- [ ] **Step 6: Commit** + +```bash +git add scripts/presence scripts/tests/test_presence.py scripts/tests/conftest.py +git commit -m "presence: implement claim verb (upsert + conflict report)" +``` + +--- + +## Task 4: `peek` and `list` verbs (read paths) + +**Files:** +- Modify: `scripts/presence` +- Modify: `scripts/tests/test_presence.py` + +- [ ] **Step 1: Write the failing tests for `peek` and `list`** + +Append to `scripts/tests/test_presence.py`: + +```python +def test_peek_shows_all_active_claims_for_resource(fake_db, capsys): + presence = _load_module() + fake_db.fetchall.return_value = [ + { + "session_id": "wizard@devvm@bbbbbbbb", + "purpose": "GPU driver upgrade", + "claimed_at": "2026-05-17 14:32:00.000", + "expires_at": "2026-05-17 14:47:00.000", + "user": "wizard", + "host": "devvm", + } + ] + rc = presence.main(["peek", "node:k8s-node1"]) + out = capsys.readouterr().out + assert rc == 0 + assert "wizard@devvm@bbbbbbbb" in out + assert "GPU driver upgrade" in out + + +def test_peek_empty_resource_prints_no_active_claim(fake_db, capsys): + presence = _load_module() + fake_db.fetchall.return_value = [] + rc = presence.main(["peek", "node:k8s-node99"]) + out = capsys.readouterr().out + assert rc == 0 + assert "no active claim" in out.lower() + + +def test_list_all_shows_only_active(fake_db, capsys): + presence = _load_module() + fake_db.fetchall.return_value = [ + { + "session_id": "wizard@devvm@xxxxxxxx", + "resource_label": "stack:gpu-operator", + "purpose": "rebuild driver", + "claimed_at": "2026-05-17 14:00:00.000", + "expires_at": "2026-05-17 14:15:00.000", + "user": "wizard", + "host": "devvm", + } + ] + rc = presence.main(["list", "--all"]) + out = capsys.readouterr().out + assert rc == 0 + assert "stack:gpu-operator" in out + assert "wizard@devvm@xxxxxxxx" in out + + +def test_list_mine_filters_to_current_session(fake_db, monkeypatch): + presence = _load_module() + presence.main(["list", "--mine"]) + sql = fake_db.execute.call_args_list[-1].args[0] + assert "session_id" in sql + assert "expires_at" in sql +``` + +- [ ] **Step 2: Run the tests, confirm they fail** + +Run: `pytest scripts/tests/test_presence.py -v -k "peek or list"` +Expected: 4 failures — verbs unimplemented. + +- [ ] **Step 3: Implement `peek` and `list`** + +Add to `scripts/presence`, above `main`: + +```python +def _peek(args, session_id: str) -> int: + conn = _connect() + if conn is None: + return 0 + with conn.cursor() as cur: + cur.execute( + """ + SELECT session_id, purpose, claimed_at, expires_at, user, host + FROM presence_claims + WHERE resource_label = %s + AND expires_at > NOW(3) + ORDER BY claimed_at + """, + (args.label,), + ) + rows = cur.fetchall() + if not rows: + print(f"presence: no active claim on {args.label}") + return 0 + print(f"presence: active claims on {args.label}:") + for r in rows: + marker = " (me)" if r["session_id"] == session_id else "" + print(f" - {r['session_id']}{marker} ({r['user']}@{r['host']}): {r['purpose']} since {r['claimed_at']} (expires {r['expires_at']})") + return 0 + + +def _list(args, session_id: str) -> int: + conn = _connect() + if conn is None: + return 0 + query = """ + SELECT session_id, resource_label, purpose, claimed_at, expires_at, user, host + FROM presence_claims + WHERE expires_at > NOW(3) + """ + params: tuple = () + if args.mine: + query += " AND session_id = %s" + params = (session_id,) + query += " ORDER BY claimed_at" + with conn.cursor() as cur: + cur.execute(query, params) + rows = cur.fetchall() + if not rows: + print("presence: no active claims") + return 0 + for r in rows: + marker = " (me)" if r["session_id"] == session_id else "" + print(f" {r['resource_label']:<32} {r['session_id']}{marker} -- {r['purpose']} ({r['claimed_at']})") + return 0 +``` + +Extend the dispatcher in `main`: + +```python + if args.verb == "claim": + return _claim(args, session_id) + if args.verb == "peek": + return _peek(args, session_id) + if args.verb == "list": + return _list(args, session_id) +``` + +- [ ] **Step 4: Run tests, confirm they pass** + +Run: `pytest scripts/tests/test_presence.py -v -k "peek or list"` +Expected: 4 PASSES. + +- [ ] **Step 5: Commit** + +```bash +git add scripts/presence scripts/tests/test_presence.py +git commit -m "presence: implement peek + list verbs" +``` + +--- + +## Task 5: `heartbeat` and `release` verbs + +**Files:** +- Modify: `scripts/presence` +- Modify: `scripts/tests/test_presence.py` + +- [ ] **Step 1: Write the failing tests** + +Append to `scripts/tests/test_presence.py`: + +```python +def test_heartbeat_extends_all_my_claims(fake_db): + presence = _load_module() + rc = presence.main(["heartbeat"]) + assert rc == 0 + sql = fake_db.execute.call_args_list[-1].args[0] + assert "UPDATE" in sql.upper() + assert "expires_at" in sql + assert "session_id" in sql + + +def test_release_single_label(fake_db): + presence = _load_module() + rc = presence.main(["release", "node:k8s-node1"]) + assert rc == 0 + last = fake_db.execute.call_args_list[-1] + assert "DELETE" in last.args[0].upper() + assert "node:k8s-node1" in last.args[1] + + +def test_release_all_mine(fake_db): + presence = _load_module() + rc = presence.main(["release", "--all-mine"]) + assert rc == 0 + last = fake_db.execute.call_args_list[-1] + assert "DELETE" in last.args[0].upper() + assert "wizard@devvm@testtest" in last.args[1] +``` + +- [ ] **Step 2: Run tests, confirm they fail** + +Run: `pytest scripts/tests/test_presence.py -v -k "heartbeat or release"` +Expected: 3 failures. + +- [ ] **Step 3: Implement `heartbeat` and `release`** + +Add to `scripts/presence`: + +```python +def _heartbeat(args, session_id: str) -> int: + conn = _connect() + if conn is None: + return 0 + with conn.cursor() as cur: + cur.execute( + """ + UPDATE presence_claims + SET expires_at = NOW(3) + INTERVAL %s SECOND + WHERE session_id = %s + AND expires_at > NOW(3) + """, + (DEFAULT_TTL_SECONDS, session_id), + ) + return 0 + + +def _release(args, session_id: str) -> int: + conn = _connect() + if conn is None: + return 0 + with conn.cursor() as cur: + if args.all_mine: + cur.execute("DELETE FROM presence_claims WHERE session_id = %s", (session_id,)) + else: + if not args.label: + print("presence: release requires