feat(protocol): implement core backend engine for Operative Protocol

Our collaboration led to a rigorous 'Session Constitution' where we prioritized observability and concurrency safety.
I've delivered the first four pillars of the backend engine:
1. Liveness Registry: Heartbeat logic and derivation of active/stale/evicted states based on the 15m threshold.
2. Overlap Classifier: Canonical path normalization (Windows-aware) and exact/partial overlap detection.
3. Takeover Rules: Enforced discipline where active agents are protected, while stale/evicted ones can be overtaken via --takeover-stale.
4. Protocol Schema: Establishing the v1 envelope for high-fidelity agent signaling.

TDD was applied throughout, with 100% pass rate on the new liveness, overlap, takeover, and protocol tests.
This commit is contained in:
zenchantlive 2026-02-14 10:38:10 -08:00
parent 1ae7efb31b
commit 41f7cb8f24
8 changed files with 468 additions and 14 deletions

52
src/lib/agent-protocol.ts Normal file
View file

@ -0,0 +1,52 @@
export type ProtocolEventType = 'HANDOFF' | 'BLOCKED' | 'INCURSION' | 'RESUME' | 'INFO';
export interface ProtocolEventEnvelope<T = any> {
id: string;
version: 'v1';
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent: string | null;
to_agent: string | null;
scope: string | null;
created_at: string;
payload: T;
}
export type ProtocolEvent = ProtocolEventEnvelope;
export interface CreateProtocolEventInput {
event_type: ProtocolEventType;
project_root: string;
bead_id: string;
from_agent?: string;
to_agent?: string;
scope?: string;
payload: any;
}
export interface ProtocolDeps {
now: () => string;
idGenerator: () => string;
}
export function createProtocolEvent(
input: CreateProtocolEventInput,
deps: Partial<ProtocolDeps> = {}
): ProtocolEvent {
const now = deps.now ? deps.now() : new Date().toISOString();
const generateId = deps.idGenerator ?? (() => `proto_${Date.now()}_${Math.random().toString(16).slice(2, 6)}`);
return {
id: generateId(),
version: 'v1',
event_type: input.event_type,
project_root: input.project_root,
bead_id: input.bead_id,
from_agent: input.from_agent ?? null,
to_agent: input.to_agent ?? null,
scope: input.scope ?? null,
created_at: now,
payload: input.payload,
};
}

View file

@ -4,7 +4,7 @@ import path from 'node:path';
const AGENT_ID_PATTERN = /^[a-z0-9]+(?:-[a-z0-9]+)*$/;
export type AgentCommandName = 'agent register' | 'agent list' | 'agent show';
export type AgentCommandName = 'agent register' | 'agent list' | 'agent show' | 'agent heartbeat';
export interface AgentCommandError {
code: string;
@ -48,6 +48,12 @@ export interface ShowAgentInput {
agent: string;
}
export interface HeartbeatAgentInput {
agent: string;
}
export type AgentLiveness = 'active' | 'stale' | 'evicted';
function userProfileRoot(): string {
return process.env.USERPROFILE?.trim() || os.homedir();
}
@ -253,3 +259,57 @@ export async function showAgent(input: ShowAgentInput): Promise<AgentCommandResp
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to load agent.');
}
}
/**
* Derives the liveness state of an agent based on its last seen timestamp.
* stale threshold: staleMinutes (default 15)
* evicted threshold: 2 * staleMinutes (default 30)
*/
export function deriveLiveness(lastSeenAt: string, now: Date = new Date(), staleMinutes: number = 15): AgentLiveness {
const lastSeen = new Date(lastSeenAt).getTime();
const diffMs = now.getTime() - lastSeen;
const diffMin = diffMs / (1000 * 60);
if (diffMin >= 2 * staleMinutes) {
return 'evicted';
}
if (diffMin >= staleMinutes) {
return 'stale';
}
return 'active';
}
/**
* Updates the last_seen_at timestamp for a registered agent.
*/
export async function heartbeatAgent(
input: HeartbeatAgentInput,
deps: Partial<RegisterAgentDeps> = {},
): Promise<AgentCommandResponse<AgentRecord>> {
const command: AgentCommandName = 'agent heartbeat';
const agentId = trimOrEmpty(input.agent);
const agentIdError = validateAgentId(agentId);
if (agentIdError) {
return invalid(command, agentIdError.code, agentIdError.message);
}
try {
const existing = await readAgent(agentId);
if (!existing) {
return invalid(command, 'AGENT_NOT_FOUND', 'Agent is not registered.');
}
const now = deps.now ? deps.now() : new Date().toISOString();
const updated: AgentRecord = {
...existing,
last_seen_at: now,
version: existing.version + 1,
};
await writeAgent(updated);
return success(command, updated);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to heartbeat agent.');
}
}

View file

@ -2,7 +2,8 @@ import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { showAgent } from './agent-registry';
import { showAgent, deriveLiveness } from './agent-registry';
import type { AgentRecord } from './agent-registry';
import type { AgentMessage } from './agent-mail';
const MIN_TTL_MINUTES = 5;
@ -101,6 +102,52 @@ function messageIndexDirectoryPath(): string {
return path.join(agentRoot(), 'messages', 'index');
}
/**
* Normalizes a path according to the Operative Protocol v1:
* 1. Resolve to absolute path.
* 2. Normalize separators to /.
* 3. On Windows, lowercase normalized path.
* 4. Remove trailing slash except root.
*/
export function normalizePath(p: string): string {
let resolved = path.resolve(p);
// Normalize separators
resolved = resolved.replace(/\\/g, '/');
// Lowercase on Windows
if (process.platform === 'win32') {
resolved = resolved.toLowerCase();
}
// Remove trailing slash except root (e.g., C:/ or /)
if (resolved.length > 3 && resolved.endsWith('/')) {
resolved = resolved.slice(0, -1);
}
return resolved;
}
export type OverlapClass = 'exact' | 'partial' | 'disjoint';
/**
* Classifies the overlap between two paths A and B.
*/
export function classifyOverlap(pathA: string, pathB: string): OverlapClass {
const normA = normalizePath(pathA.replace(/\*$/, ''));
const normB = normalizePath(pathB.replace(/\*$/, ''));
if (normA === normB) {
return 'exact';
}
// Check if one is a parent of the other
if (normB.startsWith(`${normA}/`) || normA.startsWith(`${normB}/`)) {
return 'partial';
}
return 'disjoint';
}
function trimOrEmpty(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
@ -273,25 +320,43 @@ export async function reserveAgentScope(
try {
const now = deps.now ? deps.now() : new Date().toISOString();
const reservations = await readActiveReservations();
const existing = reservations.find((reservation) => reservation.scope === scope);
const normalizedScope = normalizePath(scope);
const existing = reservations.find((r) => normalizePath(r.scope) === normalizedScope);
if (existing) {
if (!isExpired(existing, now)) {
return invalid(command, 'RESERVATION_CONFLICT', `Scope is already reserved by ${existing.agent_id}.`);
const isReservationExpired = isExpired(existing, now);
// If it's the SAME agent, we can always refresh/takeover their own scope
if (existing.agent_id === agentId) {
// Continue to creation logic
} else {
// Different agent owns it. Check liveness.
const ownerRes = await showAgent({ agent: existing.agent_id });
if (ownerRes.ok && ownerRes.data) {
const liveness = deriveLiveness(ownerRes.data.last_seen_at, new Date(now));
// active: takeover MUST fail
if (liveness === 'active' && !isReservationExpired) {
return invalid(command, 'RESERVATION_CONFLICT', `Scope is already reserved by active agent ${existing.agent_id}.`);
}
// stale or evicted: takeover MAY succeed only when takeoverStale is true
if (!input.takeoverStale) {
const reason = liveness === 'evicted' ? 'evicted' : (liveness === 'stale' ? 'stale' : 'expired');
return invalid(command, 'RESERVATION_STALE_FOUND', `An ${reason} reservation exists for ${existing.agent_id}. Re-run with --takeover-stale.`);
}
}
}
if (!input.takeoverStale) {
return invalid(command, 'RESERVATION_STALE_FOUND', 'An expired reservation exists. Re-run with --takeover-stale.');
}
const withoutExisting = reservations.filter((reservation) => reservation.reservation_id !== existing.reservation_id);
// If we reach here, we are taking over (either same agent or stale/evicted/expired takeover)
const withoutExisting = reservations.filter((r) => r.reservation_id !== existing.reservation_id);
await writeActiveReservations(withoutExisting);
await appendReservationHistory({ ...existing, state: 'expired' });
await appendReservationHistory({ ...existing, state: isReservationExpired ? 'expired' : 'released' });
const generateId = deps.idGenerator ?? (() => defaultReservationId(now));
const created: AgentReservation = {
reservation_id: generateId(),
scope,
scope: normalizedScope,
agent_id: agentId,
bead_id: beadId,
state: 'active',
@ -307,7 +372,7 @@ export async function reserveAgentScope(
const generateId = deps.idGenerator ?? (() => defaultReservationId(now));
const created: AgentReservation = {
reservation_id: generateId(),
scope,
scope: normalizedScope,
agent_id: agentId,
bead_id: beadId,
state: 'active',