refactor: extract agent bounded context + fix SSE comments + cleanup unused

- Extract src/lib/agent/ bounded context with types, registry, messaging
- Add comments_count to BeadIssue for SSE comment detection
- Create batch endpoints for mail/reservations APIs
- Add memory validation to session-preflight
- Remove unused empty dirs (mockup, sessions, timeline)
- Move stashes to docs/references, gitignore them
This commit is contained in:
zenchantlive 2026-03-04 22:06:40 -08:00
parent 6f41c4af31
commit 18fbafdce4
34 changed files with 62714 additions and 1970 deletions

View file

@ -1,52 +1,3 @@
export type ProtocolEventType = 'HANDOFF' | 'BLOCKED' | 'INCURSION' | 'RESUME' | 'INFO';
export interface ProtocolEventEnvelope<T = any> {
id: string;
version: 'v1';
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent: string | null;
to_agent: string | null;
scope: string | null;
created_at: string;
payload: T;
}
export type ProtocolEvent = ProtocolEventEnvelope;
export interface CreateProtocolEventInput {
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent?: string;
to_agent?: string;
scope?: string;
payload: any;
}
export interface ProtocolDeps {
now: () => string;
idGenerator: () => string;
}
export function createProtocolEvent(
input: CreateProtocolEventInput,
deps: Partial<ProtocolDeps> = {}
): ProtocolEvent {
const now = deps.now ? deps.now() : new Date().toISOString();
const generateId = deps.idGenerator ?? (() => `proto_${Date.now()}_${Math.random().toString(16).slice(2, 6)}`);
return {
id: generateId(),
version: 'v1',
event_type: input.event_type,
project_root: input.project_root,
bead_id: input.bead_id,
from_agent: input.from_agent ?? null,
to_agent: input.to_agent ?? null,
scope: input.scope ?? null,
created_at: now,
payload: input.payload,
};
}
// Re-export from new bounded context
// This file is deprecated - import from ./agent/types instead
export * from './agent/types';

View file

@ -1,503 +1,10 @@
import { randomUUID } from 'node:crypto';
import path from 'node:path';
import { runBdCommand } from './bridge';
import { activityEventBus } from './realtime';
const AGENT_ID_PATTERN = /^[a-z0-9]+(?:-[a-z0-9]+)*$/;
export type AgentCommandName = 'agent register' | 'agent list' | 'agent show' | 'agent activity-lease' | 'agent state';
export type AgentZfcState = 'idle' | 'spawning' | 'running' | 'working' | 'stuck' | 'done' | 'stopped' | 'dead';
export interface AgentCommandError {
code: string;
message: string;
}
export interface AgentCommandResponse<T> {
ok: boolean;
command: AgentCommandName;
data: T | null;
error: AgentCommandError | null;
}
export interface AgentRecord {
agent_id: string;
display_name: string;
role: string;
status: string;
created_at: string;
last_seen_at: string;
version: number;
rig?: string;
role_type?: string;
swarm_id?: string;
current_task?: string;
}
export interface RegisterAgentInput {
name: string;
display?: string;
role: string;
forceUpdate?: boolean;
rig?: string;
}
export interface RegisterAgentDeps {
now: () => string;
projectRoot: string;
}
export interface ListAgentsInput {
role?: string;
status?: string;
}
export interface ShowAgentInput {
agent: string;
}
export interface ActivityLeaseInput {
agent: string;
}
/**
* Normalizes agent name to bead ID with prefix.
* e.g. "silver-castle" -> "bb-silver-castle"
*/
function toBeadId(name: string): string {
const trimmed = name.trim();
if (trimmed.startsWith('bb-')) return trimmed;
return `bb-${trimmed}`;
}
/**
* Strips prefix from bead ID for display/internal logic.
* e.g. "bb-silver-castle" -> "silver-castle"
*/
function fromBeadId(id: string): string {
if (id.startsWith('bb-')) return id.slice(3);
return id;
}
/**
* Robustly extracts the first JSON block from a potentially noisy string.
* Handles cases where 'bd' outputs warnings or daemon logs before the JSON.
*/
function extractJson(text: string): any {
const start = text.indexOf('{');
const end = text.lastIndexOf('}');
if (start === -1 || end === -1) {
throw new Error('No JSON block found in output');
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
/**
* Robustly extracts the first JSON array from a potentially noisy string.
*/
function extractJsonArray(text: string): any[] {
const start = text.indexOf('[');
const end = text.lastIndexOf(']');
if (start === -1 || end === -1) {
// Check if it's a single object instead
try {
const single = extractJson(text);
return [single];
} catch {
return [];
}
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
function trimOrEmpty(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
/**
* Internal helper to fetch and parse agent details robustly.
*/
async function callBdAgentShow(beadId: string, projectRoot: string): Promise<AgentRecord | null> {
const showResult = await runBdCommand({
projectRoot,
args: ['agent', 'show', beadId, '--json'],
});
if (!showResult.success) {
return null;
}
try {
const bdAgent = extractJson(showResult.stdout);
return mapBdAgentToRecord(bdAgent);
} catch {
return null;
}
}
function invalid(command: AgentCommandName, code: string, message: string): AgentCommandResponse<never> {
return {
ok: false,
command,
data: null,
error: { code, message },
};
}
function success<T>(command: AgentCommandName, data: T): AgentCommandResponse<T> {
return {
ok: true,
command,
data,
error: null,
};
}
function validateAgentId(value: string): AgentCommandError | null {
if (!AGENT_ID_PATTERN.test(value) || value.length < 3 || value.length > 48) {
return {
code: 'INVALID_AGENT_ID',
message: 'Agent id must match ^[a-z0-9]+(?:-[a-z0-9]+)*$ and be 3..48 characters.',
};
}
return null;
}
function validateRole(value: string): AgentCommandError | null {
if (!value) {
return {
code: 'INVALID_ROLE',
message: 'Role is required.',
};
}
return null;
}
function mapBdAgentToRecord(bdAgent: any): AgentRecord {
// Extract role from labels if role_type is not set
let role = bdAgent.role_type || 'agent';
let swarmId: string | undefined;
let currentTask: string | undefined;
if (Array.isArray(bdAgent.labels)) {
const roleLabel = bdAgent.labels.find((l: string) => l.startsWith('role:'));
if (roleLabel) {
role = roleLabel.split(':')[1];
}
const swarmLabel = bdAgent.labels.find((l: string) => l.startsWith('swarm:'));
if (swarmLabel) {
swarmId = swarmLabel.split(':')[1];
}
const workingLabel = bdAgent.labels.find((l: string) => l.startsWith('working:'));
if (workingLabel) {
currentTask = workingLabel.split(':')[1];
}
}
let rig = bdAgent.rig;
if (!rig && Array.isArray(bdAgent.labels)) {
const rigLabel = bdAgent.labels.find((l: string) => l.startsWith('rig:'));
if (rigLabel) {
rig = rigLabel.split(':')[1];
}
}
const record: AgentRecord = {
agent_id: fromBeadId(bdAgent.id),
display_name: bdAgent.title?.replace(/^Agent: /, '') || fromBeadId(bdAgent.id),
role,
status: bdAgent.agent_state || 'idle',
created_at: bdAgent.created_at || bdAgent.last_activity || new Date().toISOString(),
last_seen_at: bdAgent.last_activity || new Date().toISOString(),
version: 1,
rig,
role_type: bdAgent.role_type,
swarm_id: swarmId,
current_task: currentTask,
};
return record;
}
export async function registerAgent(
input: RegisterAgentInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent register';
const name = trimOrEmpty(input.name);
const role = trimOrEmpty(input.role);
const display = trimOrEmpty(input.display) || name;
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
const roleError = validateRole(role);
if (roleError) {
return invalid(command, roleError.code, roleError.message);
}
try {
const beadId = toBeadId(name);
// 1. Check if agent exists
const showResult = await runBdCommand({
projectRoot,
args: ['agent', 'show', beadId, '--json'],
});
if (showResult.success && !input.forceUpdate) {
return invalid(command, 'DUPLICATE_AGENT_ID', 'Agent is already registered. Use --force-update to change display/role.');
}
// 2. Set state (auto-creates if missing)
const stateResult = await runBdCommand({
projectRoot,
args: ['agent', 'state', beadId, 'idle', '--json'],
});
if (!stateResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to set agent state: ${stateResult.error}`);
}
// 3. Update title, role, and rig via labels
const labels = ['gt:agent'];
if (role) {
labels.push(`role:${role}`);
}
if (input.rig) {
labels.push(`rig:${input.rig}`);
}
const updateArgs = ['update', beadId, '--title', `Agent: ${display}`, '--add-label', labels.join(',')];
const updateResult = await runBdCommand({
projectRoot,
args: [...updateArgs, '--json'],
});
if (!updateResult.success) {
console.error('Update failed:', updateResult.error, updateResult.stdout, updateResult.stderr);
return invalid(command, 'INTERNAL_ERROR', `Failed to update agent details: ${updateResult.error}`);
}
// 4. Force flush to ensure issues.jsonl is updated (critical for tests and sync)
const flushResult = await runBdCommand({
projectRoot,
args: ['admin', 'flush'],
});
if (!flushResult.success) {
console.error('Flush failed:', flushResult.error, flushResult.stdout, flushResult.stderr);
}
// 5. Return the new record
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'INTERNAL_ERROR', 'Failed to retrieve final agent state.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to register agent.');
}
}
export async function listAgents(
input: ListAgentsInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord[]>> {
const command: AgentCommandName = 'agent list';
const role = trimOrEmpty(input.role);
const status = trimOrEmpty(input.status);
const projectRoot = deps.projectRoot || process.cwd();
try {
const listResult = await runBdCommand({
projectRoot,
args: ['list', '--label', 'gt:agent', '--json'],
});
if (!listResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to list agents from bd: ${listResult.error}`);
}
const rawList = extractJsonArray(listResult.stdout);
if (rawList.length === 0) {
return success(command, []);
}
const agents: AgentRecord[] = [];
for (const item of rawList) {
// Get detailed agent state for each bead found using show
const record = await callBdAgentShow(item.id, projectRoot);
if (record) {
if (role && record.role !== role) continue;
if (status && record.status !== status) continue;
agents.push(record);
}
}
return success(command, agents.sort((a, b) => a.agent_id.localeCompare(b.agent_id)));
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to list agents.');
}
}
export async function showAgent(
input: ShowAgentInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent show';
const name = trimOrEmpty(input.agent);
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'AGENT_NOT_FOUND', 'Agent is not registered.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to load agent.');
}
}
/**
* Updates the ZFC state of an agent bead.
*/
export async function setAgentState(
input: { agent: string; state: AgentZfcState },
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent state';
const name = trimOrEmpty(input.agent);
const state = input.state;
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
const stateResult = await runBdCommand({
projectRoot,
args: ['agent', 'state', beadId, state, '--json'],
});
if (!stateResult.success) {
return invalid(command, 'AGENT_NOT_FOUND', 'Agent is not registered.');
}
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'INTERNAL_ERROR', 'Failed to retrieve agent state after update.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to set agent state.');
}
}
export type AgentLiveness = 'active' | 'stale' | 'evicted' | 'idle';
/**
* Derives the liveness state of an agent based on its last seen timestamp.
* active: < 15m
* stale: 15m - 30m
* evicted: 30m - 60m
* idle: >= 60m
*/
export function deriveLiveness(lastSeenAt: string, now: Date = new Date(), staleMinutes: number = 15): AgentLiveness {
const lastSeen = new Date(lastSeenAt).getTime();
const diffMs = now.getTime() - lastSeen;
const diffMin = diffMs / (1000 * 60);
if (diffMin >= 60) {
return 'idle';
}
if (diffMin >= 2 * staleMinutes) {
return 'evicted';
}
if (diffMin >= staleMinutes) {
return 'stale';
}
return 'active';
}
/**
* Extends the activity lease for a registered agent by emitting a native bd wisp.
* This provides silent observability WITHOUT persistent git churn.
*/
export async function extendActivityLease(
input: ActivityLeaseInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord | null>> {
const command: AgentCommandName = 'agent activity-lease';
const name = trimOrEmpty(input.agent);
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
// We create an ephemeral wisp of type 'heartbeat' tied to the agent bead.
// This refreshes the 'last_activity' in the bd system without mutating issues.jsonl.
const wispResult = await runBdCommand({
projectRoot,
args: [
'create',
`pulse:${name}:${Date.now()}`,
'--type', 'event',
'--wisp-type', 'heartbeat',
'--ephemeral',
'--event-actor', beadId,
'--json'
],
});
if (!wispResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to emit heartbeat wisp: ${wispResult.error}`);
}
// Emit heartbeat to activity bus for real-time aggregation
activityEventBus.emit({
id: randomUUID(),
kind: 'heartbeat',
beadId: beadId,
beadTitle: `Agent: ${name}`,
projectId: projectRoot,
projectName: path.basename(projectRoot),
timestamp: new Date().toISOString(),
actor: name,
payload: { message: 'running' }
});
// We return ok: true. The actual lease state will be aggregated from wisps.
return success(command, null);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to extend activity lease.');
}
}
/**
* @deprecated Import from './agent/registry' or './agent/types' instead
*
* This file is kept for backward compatibility.
* All implementations have been moved to src/lib/agent/
*/
// Re-export everything from the new bounded context
export * from './agent/registry';
export * from './agent/types';

15
src/lib/agent/index.ts Normal file
View file

@ -0,0 +1,15 @@
/**
* Agent Bounded Context
*
* This module provides the public API for agent coordination:
* - Registry: agent registration, listing, state management
* - Messaging: mail, inbox, handoffs
* - Reservations: scope locking, resource allocation
* - Sessions: lifecycle, liveness, coordination
*
* Current state: Migration in progress.
* Import from individual modules for now.
*/
// Re-export from types (already migrated)
export * from './types';

437
src/lib/agent/messaging.ts Normal file
View file

@ -0,0 +1,437 @@
/**
* Agent Messaging / Mail System
*
* This module handles agent-to-agent coordination messages:
* - sendAgentMessage: Send a message to another agent
* - inboxAgentMessages: Retrieve messages for an agent
* - readAgentMessage: Mark a message as read
* - ackAgentMessage: Acknowledge a message (for HANDOFF/BLOCKED)
*/
import path from 'node:path';
import { runBdCommand } from '../bridge';
import {
type SendAgentMessageInput,
type SendAgentMessageDeps,
type InboxAgentMessagesInput,
type MessageActionInput,
type MessageMutationDeps,
type AgentMessage,
type MailCommandName,
type MailCommandError,
type MailCommandResponse,
type MessageCategory,
type MessageState,
} from './types';
const MESSAGE_ID_PATTERN = /^msg_/;
const AGENT_ID_PATTERN = /^[a-z0-9]+(?:-[a-z0-9]+)*$/;
function invalid<T>(command: MailCommandName, code: string, message: string): MailCommandResponse<T> {
return { ok: false, command, data: null, error: { code, message } };
}
function success<T>(command: MailCommandName, data: T): MailCommandResponse<T> {
return { ok: true, command, data, error: null };
}
function trimOrEmpty(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
function extractJson(text: string): any {
const start = text.indexOf('{');
const end = text.lastIndexOf('}');
if (start === -1 || end === -1) {
throw new Error('No JSON block found in output');
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
function extractJsonArray(text: string): any[] {
const start = text.indexOf('[');
const end = text.lastIndexOf(']');
if (start === -1 || end === -1) {
try {
const single = extractJson(text);
return [single];
} catch {
return [];
}
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
function toBeadId(name: string): string {
const trimmed = name.trim();
if (trimmed.startsWith('bb-')) return trimmed;
return `bb-${trimmed}`;
}
async function getProjectRoot(deps: { projectRoot?: string }): Promise<string> {
return deps.projectRoot || process.cwd();
}
async function verifyAgentExists(agent: string, projectRoot: string): Promise<boolean> {
const beadId = toBeadId(agent);
const result = await runBdCommand({
projectRoot,
args: ['show', beadId, '--json'],
});
return result.success;
}
function mapRawToAgentMessage(raw: any): AgentMessage {
return {
message_id: raw.message_id || raw.id || '',
thread_id: raw.thread_id || raw.thread || '',
bead_id: raw.bead_id || raw.bead || '',
from_agent: raw.from_agent || raw.from || '',
to_agent: raw.to_agent || raw.to || '',
category: (raw.category as MessageCategory) || 'INFO',
subject: raw.subject || '',
body: raw.body || '',
state: (raw.state as MessageState) || 'unread',
requires_ack: raw.requires_ack ?? (raw.category === 'HANDOFF' || raw.category === 'BLOCKED'),
created_at: raw.created_at || raw.created_at || '',
read_at: raw.read_at || null,
acked_at: raw.acked_at || null,
};
}
export async function sendAgentMessage(
input: SendAgentMessageInput,
deps?: Partial<SendAgentMessageDeps> & { projectRoot?: string },
): Promise<MailCommandResponse<AgentMessage>> {
const command: MailCommandName = 'agent send';
const projectRoot = await getProjectRoot(deps || {});
const now = deps?.now || (() => new Date().toISOString());
const idGenerator = deps?.idGenerator || (() => `msg_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`);
const from = trimOrEmpty(input.from);
const to = trimOrEmpty(input.to);
const bead = trimOrEmpty(input.bead);
const category = input.category;
const subject = trimOrEmpty(input.subject);
const body = trimOrEmpty(input.body);
const thread = trimOrEmpty(input.thread);
if (!from) {
return invalid(command, 'INVALID_INPUT', 'Sender agent is required.');
}
if (!to) {
return invalid(command, 'INVALID_INPUT', 'Recipient agent is required.');
}
if (!bead) {
return invalid(command, 'INVALID_INPUT', 'Bead ID is required.');
}
if (!category) {
return invalid(command, 'INVALID_INPUT', 'Category is required.');
}
if (!subject) {
return invalid(command, 'INVALID_INPUT', 'Subject is required.');
}
if (!body) {
return invalid(command, 'INVALID_INPUT', 'Body is required.');
}
const fromExists = await verifyAgentExists(from, projectRoot);
if (!fromExists) {
return invalid(command, 'UNKNOWN_SENDER', `Sender agent '${from}' is not registered.`);
}
const toExists = await verifyAgentExists(to, projectRoot);
if (!toExists) {
return invalid(command, 'UNKNOWN_RECIPIENT', `Recipient agent '${to}' is not registered.`);
}
const validCategories = ['HANDOFF', 'BLOCKED', 'DECISION', 'INFO'];
if (!validCategories.includes(category)) {
return invalid(command, 'INVALID_CATEGORY', `Category must be one of: ${validCategories.join(', ')}`);
}
try {
const messageId = idGenerator();
const threadId = thread || `thread_${now().replace(/[-:]/g, '').replace('T', '_').split('.')[0]}`;
const requiresAck = category === 'HANDOFF' || category === 'BLOCKED';
const commentArgs = [
'comment',
bead,
'--author', from,
'--body', JSON.stringify({ message_id: messageId, thread_id: threadId, from_agent: from, to_agent: to, category, subject, body, requires_ack: requiresAck, created_at: now() }),
];
const result = await runBdCommand({
projectRoot,
args: [...commentArgs, '--json'],
});
if (!result.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to send message: ${result.error}`);
}
const message: AgentMessage = {
message_id: messageId,
thread_id: threadId,
bead_id: bead,
from_agent: from,
to_agent: to,
category,
subject,
body,
state: 'unread',
requires_ack: requiresAck,
created_at: now(),
read_at: null,
acked_at: null,
};
return success(command, message);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to send message.');
}
}
export async function inboxAgentMessages(
input: InboxAgentMessagesInput,
deps: { projectRoot?: string } = {},
): Promise<MailCommandResponse<AgentMessage[]>> {
const command: MailCommandName = 'agent inbox';
const projectRoot = await getProjectRoot(deps);
const agent = trimOrEmpty(input.agent);
if (!agent) {
return invalid(command, 'INVALID_INPUT', 'Agent name is required.');
}
const agentExists = await verifyAgentExists(agent, projectRoot);
if (!agentExists) {
return invalid(command, 'UNKNOWN_AGENT', `Agent '${agent}' is not registered.`);
}
try {
const listResult = await runBdCommand({
projectRoot,
args: ['list', '--author', toBeadId(agent), '--json'],
});
if (!listResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to list messages: ${listResult.error}`);
}
const rawList = extractJsonArray(listResult.stdout);
const messages: AgentMessage[] = [];
for (const item of rawList) {
try {
const commentBody = JSON.parse(item.body || '{}');
if (commentBody.to_agent === agent || commentBody.from_agent === agent) {
if (input.state && commentBody.state !== input.state) continue;
if (input.bead && commentBody.bead_id !== input.bead) continue;
const msg = mapRawToAgentMessage(commentBody);
if (msg.to_agent === agent) {
messages.push(msg);
}
}
} catch {
// Skip non-message comments
}
}
if (input.limit && messages.length > input.limit) {
messages.length = input.limit;
}
return success(command, messages);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to retrieve inbox.');
}
}
export async function readAgentMessage(
input: MessageActionInput,
deps?: Partial<MessageMutationDeps> & { projectRoot?: string },
): Promise<MailCommandResponse<AgentMessage>> {
const command: MailCommandName = 'agent read';
const projectRoot = await getProjectRoot(deps || {});
const now = deps?.now || (() => new Date().toISOString());
const agent = trimOrEmpty(input.agent);
const messageId = trimOrEmpty(input.message);
if (!agent) {
return invalid(command, 'INVALID_INPUT', 'Agent name is required.');
}
if (!messageId) {
return invalid(command, 'INVALID_INPUT', 'Message ID is required.');
}
const agentExists = await verifyAgentExists(agent, projectRoot);
if (!agentExists) {
return invalid(command, 'UNKNOWN_AGENT', `Agent '${agent}' is not registered.`);
}
try {
const listResult = await runBdCommand({
projectRoot,
args: ['list', '--author', toBeadId(agent), '--json'],
});
if (!listResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to find message: ${listResult.error}`);
}
const rawList = extractJsonArray(listResult.stdout);
let foundMessage: any = null;
let foundBead = '';
for (const item of rawList) {
try {
const commentBody = JSON.parse(item.body || '{}');
if (commentBody.message_id === messageId && commentBody.to_agent === agent) {
foundMessage = commentBody;
foundBead = item.id;
break;
}
} catch {
// Skip
}
}
if (!foundMessage) {
return invalid(command, 'MESSAGE_NOT_FOUND', `Message '${messageId}' not found for agent '${agent}'.`);
}
if (foundMessage.state === 'read' || foundMessage.state === 'acked') {
return invalid(command, 'ALREADY_READ', 'Message is already read or acknowledged.');
}
const updatedMessage = {
...foundMessage,
state: 'read' as MessageState,
read_at: now(),
};
const commentArgs = [
'comment',
foundBead,
'--author', agent,
'--body', JSON.stringify(updatedMessage),
];
const updateResult = await runBdCommand({
projectRoot,
args: [...commentArgs, '--json'],
});
if (!updateResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to mark message as read: ${updateResult.error}`);
}
const message = mapRawToAgentMessage(updatedMessage);
return success(command, message);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to read message.');
}
}
export async function ackAgentMessage(
input: MessageActionInput,
deps?: Partial<MessageMutationDeps> & { projectRoot?: string },
): Promise<MailCommandResponse<AgentMessage>> {
const command: MailCommandName = 'agent ack';
const projectRoot = await getProjectRoot(deps || {});
const now = deps?.now || (() => new Date().toISOString());
const agent = trimOrEmpty(input.agent);
const messageId = trimOrEmpty(input.message);
if (!agent) {
return invalid(command, 'INVALID_INPUT', 'Agent name is required.');
}
if (!messageId) {
return invalid(command, 'INVALID_INPUT', 'Message ID is required.');
}
const agentExists = await verifyAgentExists(agent, projectRoot);
if (!agentExists) {
return invalid(command, 'UNKNOWN_AGENT', `Agent '${agent}' is not registered.`);
}
try {
const listResult = await runBdCommand({
projectRoot,
args: ['list', '--author', toBeadId(agent), '--json'],
});
if (!listResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to find message: ${listResult.error}`);
}
const rawList = extractJsonArray(listResult.stdout);
let foundMessage: any = null;
let foundBead = '';
for (const item of rawList) {
try {
const commentBody = JSON.parse(item.body || '{}');
if (commentBody.message_id === messageId && commentBody.to_agent === agent) {
foundMessage = commentBody;
foundBead = item.id;
break;
}
} catch {
// Skip
}
}
if (!foundMessage) {
return invalid(command, 'MESSAGE_NOT_FOUND', `Message '${messageId}' not found for agent '${agent}'.`);
}
if (foundMessage.to_agent !== agent) {
return invalid(command, 'ACK_FORBIDDEN', 'Only the recipient can acknowledge this message.');
}
if (!foundMessage.requires_ack) {
return invalid(command, 'ACK_NOT_REQUIRED', 'This message does not require acknowledgment.');
}
if (foundMessage.state === 'acked') {
return invalid(command, 'ALREADY_ACKED', 'Message is already acknowledged.');
}
const updatedMessage = {
...foundMessage,
state: 'acked' as MessageState,
acked_at: now(),
};
const commentArgs = [
'comment',
foundBead,
'--author', agent,
'--body', JSON.stringify(updatedMessage),
];
const updateResult = await runBdCommand({
projectRoot,
args: [...commentArgs, '--json'],
});
if (!updateResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to acknowledge message: ${updateResult.error}`);
}
const message = mapRawToAgentMessage(updatedMessage);
return success(command, message);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to acknowledge message.');
}
}

442
src/lib/agent/registry.ts Normal file
View file

@ -0,0 +1,442 @@
import { randomUUID } from 'node:crypto';
import path from 'node:path';
import { runBdCommand } from '../bridge';
import { activityEventBus } from '../realtime';
import type {
AgentCommandName,
AgentZfcState,
AgentCommandError,
AgentCommandResponse,
AgentRecord,
RegisterAgentInput,
RegisterAgentDeps,
ListAgentsInput,
ShowAgentInput,
ActivityLeaseInput,
AgentLiveness,
} from './types';
const AGENT_ID_PATTERN = /^[a-z0-9]+(?:-[a-z0-9]+)*$/;
interface CacheEntry<T> {
data: T;
expiresAt: number;
}
const agentCache = new Map<string, CacheEntry<AgentRecord | null>>();
const CACHE_TTL_MS = 30_000;
function getCachedAgent(beadId: string): AgentRecord | null {
const entry = agentCache.get(beadId);
if (entry && entry.expiresAt > Date.now()) {
return entry.data;
}
agentCache.delete(beadId);
return null;
}
function setCachedAgent(beadId: string, data: AgentRecord | null): void {
agentCache.set(beadId, { data, expiresAt: Date.now() + CACHE_TTL_MS });
}
function toBeadId(name: string): string {
const trimmed = name.trim();
if (trimmed.startsWith('bb-')) return trimmed;
return `bb-${trimmed}`;
}
function fromBeadId(id: string): string {
if (id.startsWith('bb-')) return id.slice(3);
return id;
}
function extractJson(text: string): any {
const start = text.indexOf('{');
const end = text.lastIndexOf('}');
if (start === -1 || end === -1) {
throw new Error('No JSON block found in output');
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
function extractJsonArray(text: string): any[] {
const start = text.indexOf('[');
const end = text.lastIndexOf(']');
if (start === -1 || end === -1) {
try {
const single = extractJson(text);
return [single];
} catch {
return [];
}
}
const jsonPart = text.slice(start, end + 1);
return JSON.parse(jsonPart);
}
function trimOrEmpty(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
async function callBdAgentShow(beadId: string, projectRoot: string): Promise<AgentRecord | null> {
const cached = getCachedAgent(beadId);
if (cached !== undefined) {
return cached;
}
const showResult = await runBdCommand({
projectRoot,
args: ['agent', 'show', beadId, '--json'],
});
let record: AgentRecord | null = null;
if (showResult.success) {
try {
const bdAgent = extractJson(showResult.stdout);
record = mapBdAgentToRecord(bdAgent);
} catch {
record = null;
}
}
setCachedAgent(beadId, record);
return record;
}
function invalid(command: AgentCommandName, code: string, message: string): AgentCommandResponse<never> {
return {
ok: false,
command,
data: null,
error: { code, message },
};
}
function success<T>(command: AgentCommandName, data: T): AgentCommandResponse<T> {
return {
ok: true,
command,
data,
error: null,
};
}
function validateAgentId(value: string): AgentCommandError | null {
if (!AGENT_ID_PATTERN.test(value) || value.length < 3 || value.length > 48) {
return {
code: 'INVALID_AGENT_ID',
message: 'Agent id must match ^[a-z0-9]+(?:-[a-z0-9]+)*$ and be 3..48 characters.',
};
}
return null;
}
function validateRole(value: string): AgentCommandError | null {
if (!value) {
return {
code: 'INVALID_ROLE',
message: 'Role is required.',
};
}
return null;
}
function mapBdAgentToRecord(bdAgent: any): AgentRecord {
let role = bdAgent.role_type || 'agent';
let swarmId: string | undefined;
let currentTask: string | undefined;
if (Array.isArray(bdAgent.labels)) {
const roleLabel = bdAgent.labels.find((l: string) => l.startsWith('role:'));
if (roleLabel) {
role = roleLabel.split(':')[1];
}
const swarmLabel = bdAgent.labels.find((l: string) => l.startsWith('swarm:'));
if (swarmLabel) {
swarmId = swarmLabel.split(':')[1];
}
const workingLabel = bdAgent.labels.find((l: string) => l.startsWith('working:'));
if (workingLabel) {
currentTask = workingLabel.split(':')[1];
}
}
let rig = bdAgent.rig;
if (!rig && Array.isArray(bdAgent.labels)) {
const rigLabel = bdAgent.labels.find((l: string) => l.startsWith('rig:'));
if (rigLabel) {
rig = rigLabel.split(':')[1];
}
}
const record: AgentRecord = {
agent_id: fromBeadId(bdAgent.id),
display_name: bdAgent.title?.replace(/^Agent: /, '') || fromBeadId(bdAgent.id),
role,
status: bdAgent.agent_state || 'idle',
created_at: bdAgent.created_at || bdAgent.last_activity || new Date().toISOString(),
last_seen_at: bdAgent.last_activity || new Date().toISOString(),
version: 1,
rig,
role_type: bdAgent.role_type,
swarm_id: swarmId,
current_task: currentTask,
};
return record;
}
export async function registerAgent(
input: RegisterAgentInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent register';
const name = trimOrEmpty(input.name);
const role = trimOrEmpty(input.role);
const display = trimOrEmpty(input.display) || name;
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
const roleError = validateRole(role);
if (roleError) {
return invalid(command, roleError.code, roleError.message);
}
try {
const beadId = toBeadId(name);
const showResult = await runBdCommand({
projectRoot,
args: ['agent', 'show', beadId, '--json'],
});
if (showResult.success && !input.forceUpdate) {
return invalid(command, 'DUPLICATE_AGENT_ID', 'Agent is already registered. Use --force-update to change display/role.');
}
const stateResult = await runBdCommand({
projectRoot,
args: ['agent', 'state', beadId, 'idle', '--json'],
});
if (!stateResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to set agent state: ${stateResult.error}`);
}
const labels = ['gt:agent'];
if (role) {
labels.push(`role:${role}`);
}
if (input.rig) {
labels.push(`rig:${input.rig}`);
}
const updateArgs = ['update', beadId, '--title', `Agent: ${display}`, '--add-label', labels.join(',')];
const updateResult = await runBdCommand({
projectRoot,
args: [...updateArgs, '--json'],
});
if (!updateResult.success) {
console.error('Update failed:', updateResult.error, updateResult.stdout, updateResult.stderr);
return invalid(command, 'INTERNAL_ERROR', `Failed to update agent details: ${updateResult.error}`);
}
const flushResult = await runBdCommand({
projectRoot,
args: ['admin', 'flush'],
});
if (!flushResult.success) {
console.error('Flush failed:', flushResult.error, flushResult.stdout, flushResult.stderr);
}
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'INTERNAL_ERROR', 'Failed to retrieve final agent state.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to register agent.');
}
}
export async function listAgents(
input: ListAgentsInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord[]>> {
const command: AgentCommandName = 'agent list';
const role = trimOrEmpty(input.role);
const status = trimOrEmpty(input.status);
const projectRoot = deps.projectRoot || process.cwd();
try {
const listResult = await runBdCommand({
projectRoot,
args: ['list', '--label', 'gt:agent', '--json'],
});
if (!listResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to list agents from bd: ${listResult.error}`);
}
const rawList = extractJsonArray(listResult.stdout);
if (rawList.length === 0) {
return success(command, []);
}
const agents: AgentRecord[] = [];
for (const item of rawList) {
const record = await callBdAgentShow(item.id, projectRoot);
if (record) {
if (role && record.role !== role) continue;
if (status && record.status !== status) continue;
agents.push(record);
}
}
return success(command, agents.sort((a, b) => a.agent_id.localeCompare(b.agent_id)));
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to list agents.');
}
}
export async function showAgent(
input: ShowAgentInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent show';
const name = trimOrEmpty(input.agent);
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'AGENT_NOT_FOUND', 'Agent is not registered.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to load agent.');
}
}
export async function setAgentState(
input: { agent: string; state: AgentZfcState },
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent state';
const name = trimOrEmpty(input.agent);
const state = input.state;
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
const stateResult = await runBdCommand({
projectRoot,
args: ['agent', 'state', beadId, state, '--json'],
});
if (!stateResult.success) {
return invalid(command, 'AGENT_NOT_FOUND', 'Agent is not registered.');
}
const record = await callBdAgentShow(beadId, projectRoot);
if (!record) {
return invalid(command, 'INTERNAL_ERROR', 'Failed to retrieve agent state after update.');
}
return success(command, record);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to set agent state.');
}
}
export function deriveLiveness(lastSeenAt: string, now: Date = new Date(), staleMinutes: number = 15): AgentLiveness {
const lastSeen = new Date(lastSeenAt).getTime();
const diffMs = now.getTime() - lastSeen;
const diffMin = diffMs / (1000 * 60);
if (diffMin >= 60) {
return 'idle';
}
if (diffMin >= 2 * staleMinutes) {
return 'evicted';
}
if (diffMin >= staleMinutes) {
return 'stale';
}
return 'active';
}
export async function extendActivityLease(
input: ActivityLeaseInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord | null>> {
const command: AgentCommandName = 'agent activity-lease';
const name = trimOrEmpty(input.agent);
const projectRoot = deps.projectRoot || process.cwd();
const agentIdError = validateAgentId(name);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const beadId = toBeadId(name);
const wispResult = await runBdCommand({
projectRoot,
args: [
'create',
`pulse:${name}:${Date.now()}`,
'--type', 'event',
'--wisp-type', 'heartbeat',
'--ephemeral',
'--event-actor', beadId,
'--json'
],
});
if (!wispResult.success) {
return invalid(command, 'INTERNAL_ERROR', `Failed to emit heartbeat wisp: ${wispResult.error}`);
}
activityEventBus.emit({
id: randomUUID(),
kind: 'heartbeat',
beadId: beadId,
beadTitle: `Agent: ${name}`,
projectId: projectRoot,
projectName: path.basename(projectRoot),
timestamp: new Date().toISOString(),
actor: name,
payload: { message: 'running' }
});
return success(command, null);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to extend activity lease.');
}
}

177
src/lib/agent/types.ts Normal file
View file

@ -0,0 +1,177 @@
export type ProtocolEventType = 'HANDOFF' | 'BLOCKED' | 'INCURSION' | 'RESUME' | 'INFO';
export interface ProtocolEventEnvelope<T = any> {
id: string;
version: 'v1';
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent: string | null;
to_agent: string | null;
scope: string | null;
created_at: string;
payload: T;
}
export type ProtocolEvent = ProtocolEventEnvelope;
export interface CreateProtocolEventInput {
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent?: string;
to_agent?: string;
scope?: string;
payload: any;
}
export interface ProtocolDeps {
now: () => string;
idGenerator: () => string;
}
export function createProtocolEvent(
input: CreateProtocolEventInput,
deps: Partial<ProtocolDeps> = {}
): ProtocolEvent {
const now = deps.now ? deps.now() : new Date().toISOString();
const generateId = deps.idGenerator ?? (() => `proto_${Date.now()}_${Math.random().toString(16).slice(2, 6)}`);
return {
id: generateId(),
version: 'v1',
event_type: input.event_type,
project_root: input.project_root,
bead_id: input.bead_id,
from_agent: input.from_agent ?? null,
to_agent: input.to_agent ?? null,
scope: input.scope ?? null,
created_at: now,
payload: input.payload,
};
}
export type AgentCommandName = 'agent register' | 'agent list' | 'agent show' | 'agent activity-lease' | 'agent state';
export type AgentZfcState = 'idle' | 'spawning' | 'running' | 'working' | 'stuck' | 'done' | 'stopped' | 'dead';
export interface AgentCommandError {
code: string;
message: string;
}
export interface AgentCommandResponse<T> {
ok: boolean;
command: AgentCommandName;
data: T | null;
error: AgentCommandError | null;
}
export interface AgentRecord {
agent_id: string;
display_name: string;
role: string;
status: string;
created_at: string;
last_seen_at: string;
version: number;
rig?: string;
role_type?: string;
swarm_id?: string;
current_task?: string;
}
export interface RegisterAgentInput {
name: string;
display?: string;
role: string;
forceUpdate?: boolean;
rig?: string;
}
export interface RegisterAgentDeps {
now: () => string;
projectRoot: string;
}
export interface ListAgentsInput {
role?: string;
status?: string;
}
export interface ShowAgentInput {
agent: string;
}
export interface ActivityLeaseInput {
agent: string;
}
export type AgentLiveness = 'active' | 'stale' | 'evicted' | 'idle';
// Mail/Messaging types
export const MESSAGE_CATEGORIES = ['HANDOFF', 'BLOCKED', 'DECISION', 'INFO'] as const;
export const MESSAGE_STATES = ['unread', 'read', 'acked'] as const;
export type MessageCategory = typeof MESSAGE_CATEGORIES[number];
export type MessageState = typeof MESSAGE_STATES[number];
export type MailCommandName = 'agent send' | 'agent inbox' | 'agent read' | 'agent ack';
export interface MailCommandError {
code: string;
message: string;
}
export interface MailCommandResponse<T> {
ok: boolean;
command: MailCommandName;
data: T | null;
error: MailCommandError | null;
}
export interface AgentMessage {
message_id: string;
thread_id: string;
bead_id: string;
from_agent: string;
to_agent: string;
category: MessageCategory;
subject: string;
body: string;
state: MessageState;
requires_ack: boolean;
created_at: string;
read_at: string | null;
acked_at: string | null;
}
export interface SendAgentMessageInput {
from: string;
to: string;
bead: string;
category: MessageCategory;
subject: string;
body: string;
thread?: string;
}
export interface SendAgentMessageDeps {
now: () => string;
idGenerator: () => string;
}
export interface InboxAgentMessagesInput {
agent: string;
state?: MessageState;
bead?: string;
limit?: number;
}
export interface MessageActionInput {
agent: string;
message: string;
}
export interface MessageMutationDeps {
now: () => string;
}

View file

@ -24,6 +24,7 @@ interface IssueRow extends RowDataPacket {
closed_at: Date | string | null;
due_at: Date | string | null;
labels_concat: string | null;
comments_count: number | null;
}
interface DepRow extends RowDataPacket {
@ -60,6 +61,7 @@ function normalizeRow(row: IssueRow, deps: BeadDependency[]): BeadIssue {
due_at: toIsoString(row.due_at),
estimated_minutes: typeof row.estimated_minutes === 'number' ? row.estimated_minutes : null,
external_ref: row.external_ref ?? null,
comments_count: (row.comments_count ?? 0) as number,
metadata: row.metadata ?? {},
};
}
@ -86,7 +88,9 @@ export async function readIssuesViaDolt(
try {
// Query 1: All issues with comma-separated labels (GROUP_CONCAT avoids N+1)
const [issueRows] = await pool.execute<IssueRow[]>(
`SELECT i.*, GROUP_CONCAT(l.label SEPARATOR ',') AS labels_concat
`SELECT i.*,
GROUP_CONCAT(l.label SEPARATOR ',') AS labels_concat,
(SELECT COUNT(*) FROM comments c WHERE c.issue_id = i.id) AS comments_count
FROM issues i
LEFT JOIN labels l ON l.issue_id = i.id
GROUP BY i.id`

View file

@ -67,22 +67,31 @@ export function diffSnapshots(
events.push(createEvent('estimate_changed', curr, now, { field: 'estimated_minutes', from: prev.estimated_minutes, to: curr.estimated_minutes }));
}
// 4. Collection Changes (Labels)
if (!areArraysEqual(prev.labels, curr.labels)) {
events.push(createEvent('labels_changed', curr, now, {
field: 'labels',
from: prev.labels.join(','),
to: curr.labels.join(',')
}));
}
// 5. Collection Changes (Dependencies)
// 4. Collection Changes (Labels)
if (!areArraysEqual(prev.labels, curr.labels)) {
events.push(createEvent('labels_changed', curr, now, {
field: 'labels',
from: prev.labels.join(','),
to: curr.labels.join(',')
}));
}
// 5. Collection Changes (Comments) - detect comment count changes
if (prev.comments_count !== curr.comments_count) {
events.push(createEvent('comment_added', curr, now, {
field: 'comments_count',
from: String(prev.comments_count),
to: String(curr.comments_count)
}));
}
// 6. Collection Changes (Dependencies)
diffDependencies(prev.dependencies, curr.dependencies).forEach(kindAndTarget => {
events.push(createEvent(kindAndTarget.kind, curr, now, { to: kindAndTarget.target, field: kindAndTarget.type }));
});
});
// 6. Detect Deleted Issues
// 7. Detect Deleted Issues
if (previous) {
const currMap = new Set(current.map(c => c.id));
previous.forEach(prev => {

View file

@ -52,8 +52,9 @@ export interface BeadIssue {
created_by: string | null;
due_at: string | null;
estimated_minutes: number | null;
external_ref: string | null;
metadata: Record<string, unknown>;
external_ref: string | null;
comments_count?: number;
metadata: Record<string, unknown>;
}
export interface ParseableBeadIssue extends Partial<BeadIssue> {