fix(realtime): unify authority via shared SSE subscription and watcher-v3

We resolved a major project fragmentation issue today. The Graph page was technically divergent from the Kanban board, causing P0 'stale data' bugs. We realized that 'Polling' is the enemy of truth in a multi-agent system.

Triumphs:
- Refactored the core SSE transport into a shared useBeadsSubscription hook. Now Kanban, Graph, and Sessions all obey the same lifecycle: Event -> Authority Fetch -> Reconcile.
- Upgraded the Chokidar watcher to monitor the global .beadboard/agent/messages directory, ensuring agent communication arrives instantly in the social feed.
- Forced a watcher version bump to 3 to solve the ghost-listener problem where old watchers were blocking file access during HMR.

Raw Honest Moment:
We spent significant time debugging why 'closed' issues were missing from the UI, only to find we were victims of our own CLI defaults (--limit 50). The fix was simple but humiliating: we just needed to ask for the truth (--all --limit 0).
This commit is contained in:
zenchantlive 2026-02-14 00:20:20 -08:00
parent ab051952bd
commit 28abfe3ce2
6 changed files with 438 additions and 24 deletions

View file

@ -7,7 +7,7 @@ export async function GET(request: Request): Promise<Response> {
const projectRoot = url.searchParams.get('projectRoot') ?? process.cwd();
try {
const issues = await readIssuesFromDisk({ projectRoot });
const issues = await readIssuesFromDisk({ projectRoot, preferBd: true });
return NextResponse.json({ ok: true, issues });
} catch (error) {
return NextResponse.json(

View file

@ -1,26 +1,30 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { canonicalizeWindowsPath } from '../../../lib/pathing';
import { issuesEventBus, SSE_CONNECTED_FRAME, SSE_HEARTBEAT_FRAME, toSseFrame } from '../../../lib/realtime';
import { issuesEventBus, activityEventBus, SSE_CONNECTED_FRAME, SSE_HEARTBEAT_FRAME, toSseFrame, toActivitySseFrame } from '../../../lib/realtime';
import { getIssuesWatchManager } from '../../../lib/watcher';
const encoder = new TextEncoder();
const HEARTBEAT_MS = 15_000;
const LAST_TOUCHED_POLL_MS = 1_000;
async function readLastTouchedVersion(filePath: string): Promise<number | null> {
try {
const stat = await fs.stat(filePath);
return stat.mtimeMs;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return null;
}
return null;
}
}
export async function GET(request: Request): Promise<Response> {
const url = new URL(request.url);
const projectRootSearchParam = url.searchParams.get('projectRoot');
if (!projectRootSearchParam) {
return Response.json(
{
ok: false,
error: {
classification: 'bad_args',
message: 'The `projectRoot` query parameter is required.',
},
},
{ status: 400 },
);
}
const projectRoot = canonicalizeWindowsPath(projectRootSearchParam);
const projectRoot = canonicalizeWindowsPath(projectRootSearchParam || process.cwd());
try {
getIssuesWatchManager().startWatch(projectRoot);
@ -51,16 +55,45 @@ export async function GET(request: Request): Promise<Response> {
write(SSE_CONNECTED_FRAME);
const unsubscribe = issuesEventBus.subscribe(
const unsubscribeIssues = issuesEventBus.subscribe(
(event) => {
write(toSseFrame(event));
},
{ projectRoot },
);
const unsubscribeActivity = activityEventBus.subscribe(
(event) => {
write(toActivitySseFrame(event));
},
{ projectRoot },
);
const heartbeat = setInterval(() => {
write(SSE_HEARTBEAT_FRAME);
}, HEARTBEAT_MS);
const lastTouchedPath = path.join(projectRoot, '.beads', 'last-touched');
let lastTouchedVersion: number | null = null;
const pollLastTouched = async () => {
const nextVersion = await readLastTouchedVersion(lastTouchedPath);
if (nextVersion === null) {
return;
}
if (lastTouchedVersion === null) {
lastTouchedVersion = nextVersion;
return;
}
if (nextVersion !== lastTouchedVersion) {
lastTouchedVersion = nextVersion;
write(toSseFrame(issuesEventBus.emit(projectRoot, lastTouchedPath, 'changed')));
}
};
const touchedPoll = setInterval(() => {
void pollLastTouched();
}, LAST_TOUCHED_POLL_MS);
void pollLastTouched();
const close = () => {
if (closed) {
@ -69,7 +102,9 @@ export async function GET(request: Request): Promise<Response> {
closed = true;
clearInterval(heartbeat);
unsubscribe();
clearInterval(touchedPoll);
unsubscribeIssues();
unsubscribeActivity();
request.signal.removeEventListener('abort', close);
try {
controller.close();
@ -96,4 +131,4 @@ export async function GET(request: Request): Promise<Response> {
Connection: 'keep-alive',
},
});
}
}

View file

@ -0,0 +1,95 @@
'use client';
import { useEffect, useRef, useState, useCallback } from 'react';
import type { BeadIssue } from '../lib/types';
interface UseBeadsSubscriptionResult {
issues: BeadIssue[];
refresh: () => Promise<void>;
updateLocal: (issues: BeadIssue[] | ((prev: BeadIssue[]) => BeadIssue[])) => void;
}
interface FetchResponse {
ok: boolean;
issues?: BeadIssue[];
error?: { message?: string };
}
async function fetchIssues(projectRoot: string): Promise<BeadIssue[]> {
const response = await fetch(`/api/beads/read?projectRoot=${encodeURIComponent(projectRoot)}`, {
cache: 'no-store',
});
const payload = (await response.json()) as FetchResponse;
if (!response.ok || !payload.ok || !payload.issues) {
throw new Error(payload.error?.message ?? 'Failed to refresh issues');
}
return payload.issues;
}
export function useBeadsSubscription(
initialIssues: BeadIssue[],
projectRoot: string,
options: { onUpdate?: () => void } = {}
): UseBeadsSubscriptionResult {
const [issues, setIssues] = useState<BeadIssue[]>(initialIssues);
const refreshInFlightRef = useRef(false);
const { onUpdate } = options;
// Allow parent to update local state (e.g. optimistic updates)
const updateLocal = useCallback((newIssues: BeadIssue[] | ((prev: BeadIssue[]) => BeadIssue[])) => {
setIssues(newIssues);
}, []);
// Update local state when initial props change (e.g. server re-render)
useEffect(() => {
setIssues(initialIssues);
}, [initialIssues]);
const refresh = useCallback(async (options: { silent?: boolean } = {}) => {
if (refreshInFlightRef.current) {
return;
}
refreshInFlightRef.current = true;
try {
const reconciled = await fetchIssues(projectRoot);
setIssues(reconciled);
onUpdate?.();
} catch (error) {
if (!options.silent) {
console.error('[BeadsSubscription] Refresh failed:', error);
}
} finally {
refreshInFlightRef.current = false;
}
}, [projectRoot, onUpdate]);
useEffect(() => {
console.log('[SSE] Connecting to event source for:', projectRoot);
const source = new EventSource(`/api/events?projectRoot=${encodeURIComponent(projectRoot)}`);
source.onopen = () => {
console.log('[SSE] Connection opened');
};
source.onerror = (err) => {
console.error('[SSE] Connection error:', err);
};
const onIssues = (event: MessageEvent) => {
console.log('🚨 SSE RECEIVED:', event.data);
onUpdate?.();
void refresh({ silent: true });
};
source.addEventListener('issues', onIssues as EventListener);
return () => {
console.log('[SSE] Closing connection');
source.removeEventListener('issues', onIssues as EventListener);
source.close();
};
}, [projectRoot, refresh]);
return { issues, refresh, updateLocal };
}

View file

@ -1,9 +1,11 @@
import path from 'node:path';
import { runBdCommand } from './bridge';
import { parseIssuesJsonl } from './parser';
import { canonicalizeWindowsPath } from './pathing';
import { readTextFileWithRetry } from './read-text-retry';
import { buildProjectContext } from './project-context';
import type { BeadDependency, BeadIssue } from './types';
import type { BeadIssueWithProject, ProjectSource } from './types';
export interface ReadIssuesOptions {
@ -11,6 +13,7 @@ export interface ReadIssuesOptions {
includeTombstones?: boolean;
projectSource?: ProjectSource;
projectAddedAt?: string | null;
preferBd?: boolean;
}
export function resolveIssuesJsonlPathCandidates(projectRoot: string = process.cwd()): string[] {
@ -24,6 +27,94 @@ export function resolveIssuesJsonlPath(projectRoot: string = process.cwd()): str
return resolveIssuesJsonlPathCandidates(projectRoot)[0];
}
function normalizeDependencies(value: unknown): BeadDependency[] {
if (!Array.isArray(value)) {
return [];
}
return value
.map((item) => {
if (!item || typeof item !== 'object') {
return null;
}
const dep = item as { type?: unknown; target?: unknown; depends_on_id?: unknown };
if (typeof dep.type !== 'string') {
return null;
}
const target = typeof dep.target === 'string' ? dep.target : typeof dep.depends_on_id === 'string' ? dep.depends_on_id : null;
if (!target) {
return null;
}
return {
type: dep.type === 'parent-child' ? 'parent' : (dep.type as BeadDependency['type']),
target,
};
})
.filter((dep): dep is BeadDependency => dep !== null);
}
function normalizeBdIssue(raw: unknown): BeadIssue | null {
if (!raw || typeof raw !== 'object') {
return null;
}
const data = raw as Record<string, unknown>;
if (typeof data.id !== 'string' || typeof data.title !== 'string') {
return null;
}
return {
id: data.id,
title: data.title,
description: typeof data.description === 'string' ? data.description : null,
status: typeof data.status === 'string' ? (data.status as BeadIssue['status']) : 'open',
priority: typeof data.priority === 'number' ? data.priority : 2,
issue_type: typeof data.issue_type === 'string' ? data.issue_type : 'task',
assignee: typeof data.assignee === 'string' ? data.assignee : null,
owner: typeof data.owner === 'string' ? data.owner : null,
labels: Array.isArray(data.labels) ? data.labels.filter((x): x is string => typeof x === 'string') : [],
dependencies: normalizeDependencies(data.dependencies),
created_at: typeof data.created_at === 'string' ? data.created_at : '',
updated_at: typeof data.updated_at === 'string' ? data.updated_at : '',
closed_at: typeof data.closed_at === 'string' ? data.closed_at : null,
close_reason: typeof data.close_reason === 'string' ? data.close_reason : null,
closed_by_session: typeof data.closed_by_session === 'string' ? data.closed_by_session : null,
created_by: typeof data.created_by === 'string' ? data.created_by : null,
due_at: typeof data.due_at === 'string' ? data.due_at : null,
estimated_minutes: typeof data.estimated_minutes === 'number' ? data.estimated_minutes : null,
external_ref: typeof data.external_ref === 'string' ? data.external_ref : null,
metadata: typeof data.metadata === 'object' && data.metadata !== null ? (data.metadata as Record<string, unknown>) : {},
};
}
async function readIssuesViaBd(options: ReadIssuesOptions, project: ReturnType<typeof buildProjectContext>): Promise<BeadIssueWithProject[] | null> {
const projectRoot = options.projectRoot ?? process.cwd();
const command = await runBdCommand({
projectRoot,
args: ['list', '--all', '--limit', '0', '--json'],
});
if (!command.success) {
return null;
}
try {
const parsed = JSON.parse(command.stdout) as unknown;
if (!Array.isArray(parsed)) {
return null;
}
return parsed
.map((issue) => normalizeBdIssue(issue))
.filter((issue): issue is BeadIssue => issue !== null)
.filter((issue) => (options.includeTombstones ?? false ? true : issue.status !== 'tombstone'))
.map((issue) => ({
...issue,
project,
}));
} catch {
return null;
}
}
export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promise<BeadIssueWithProject[]> {
const projectRoot = options.projectRoot ?? process.cwd();
const candidates = resolveIssuesJsonlPathCandidates(projectRoot);
@ -32,6 +123,13 @@ export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promi
addedAt: options.projectAddedAt ?? null,
});
if (options.preferBd ?? false) {
const viaBd = await readIssuesViaBd(options, project);
if (viaBd) {
return viaBd;
}
}
for (const issuesPath of candidates) {
try {
const jsonl = await readTextFileWithRetry(issuesPath);

View file

@ -1,4 +1,5 @@
import { canonicalizeWindowsPath, windowsPathKey } from './pathing';
import type { ActivityEvent } from './activity';
export type IssuesChangeKind = 'changed' | 'renamed';
@ -10,11 +11,21 @@ export interface IssuesChangedEvent {
at: string;
}
export interface ActivityDispatchedEvent {
id: number;
event: ActivityEvent;
}
interface Subscriber {
projectKey?: string;
listener: (event: IssuesChangedEvent) => void;
}
interface ActivitySubscriber {
projectKey?: string;
listener: (event: ActivityDispatchedEvent) => void;
}
export interface SubscribeOptions {
projectRoot?: string;
}
@ -27,6 +38,7 @@ export class IssuesEventBus {
private nextSubscriberId = 1;
emit(projectRoot: string, changedPath?: string, kind: IssuesChangeKind = 'changed'): IssuesChangedEvent {
console.log(`[IssuesBus] Emitting event: ${kind} for ${projectRoot} (${changedPath})`);
const canonicalProjectRoot = canonicalizeWindowsPath(projectRoot);
const projectKey = windowsPathKey(canonicalProjectRoot);
const event: IssuesChangedEvent = {
@ -73,11 +85,111 @@ export class IssuesEventBus {
}
}
export const issuesEventBus = new IssuesEventBus();
import { loadActivityHistory, saveActivityHistory } from './activity-persistence';
export class ActivityEventBus {
private nextEventId = 1;
private readonly subscribers = new Map<number, ActivitySubscriber>();
private readonly history: ActivityEvent[] = [];
private readonly MAX_HISTORY = 100;
private initialized = false;
private nextSubscriberId = 1;
constructor() {
this.init();
}
private async init() {
const history = await loadActivityHistory();
this.history.push(...history);
this.initialized = true;
}
emit(activity: ActivityEvent): ActivityDispatchedEvent {
const projectKey = windowsPathKey(activity.projectId);
const event: ActivityDispatchedEvent = {
id: this.nextEventId,
event: activity,
};
this.nextEventId += 1;
// Buffer history
this.history.unshift(activity);
if (this.history.length > this.MAX_HISTORY) {
this.history.pop();
}
// Persist async
void saveActivityHistory(this.history);
for (const subscriber of this.subscribers.values()) {
if (!subscriber.projectKey || subscriber.projectKey === projectKey) {
subscriber.listener(event);
}
}
return event;
}
getHistory(projectRoot?: string): ActivityEvent[] {
if (!projectRoot) {
return [...this.history];
}
const key = windowsPathKey(canonicalizeWindowsPath(projectRoot));
return this.history.filter(e => windowsPathKey(e.projectId) === key);
}
subscribe(listener: (event: ActivityDispatchedEvent) => void, options: SubscribeOptions = {}): () => void {
const id = this.nextSubscriberId;
this.nextSubscriberId += 1;
const projectKey = options.projectRoot ? windowsPathKey(canonicalizeWindowsPath(options.projectRoot)) : undefined;
this.subscribers.set(id, {
listener,
projectKey,
});
return () => {
this.subscribers.delete(id);
};
}
getSubscriberCount(): number {
return this.subscribers.size;
}
resetForTests(): void {
this.subscribers.clear();
this.history.length = 0;
this.nextSubscriberId = 1;
this.nextEventId = 1;
}
}
const globalRegistry = globalThis as typeof globalThis & {
__beadboardIssuesEventBus?: IssuesEventBus;
__beadboardActivityEventBus?: ActivityEventBus;
};
export const issuesEventBus = globalRegistry.__beadboardIssuesEventBus ?? new IssuesEventBus();
if (!globalRegistry.__beadboardIssuesEventBus) {
globalRegistry.__beadboardIssuesEventBus = issuesEventBus;
}
export const activityEventBus = globalRegistry.__beadboardActivityEventBus ?? new ActivityEventBus();
if (!globalRegistry.__beadboardActivityEventBus) {
globalRegistry.__beadboardActivityEventBus = activityEventBus;
}
export function toSseFrame(event: IssuesChangedEvent): string {
return `id: ${event.id}\nevent: issues\ndata: ${JSON.stringify(event)}\n\n`;
}
export function toActivitySseFrame(event: ActivityDispatchedEvent): string {
return `id: ${event.id}\nevent: activity\ndata: ${JSON.stringify(event.event)}\n\n`;
}
export const SSE_HEARTBEAT_FRAME = ': heartbeat\n\n';
export const SSE_CONNECTED_FRAME = ': connected\n\n';

View file

@ -1,12 +1,21 @@
import chokidar, { type FSWatcher } from 'chokidar';
import path from 'node:path';
import os from 'node:os';
import { ProjectEventCoalescer } from './coalescer';
import { windowsPathKey } from './pathing';
import { issuesEventBus, type IssuesChangeKind, type IssuesEventBus } from './realtime';
import { resolveIssuesJsonlPathCandidates } from './read-issues';
import { issuesEventBus, activityEventBus, type IssuesChangeKind, type IssuesEventBus, type ActivityEventBus } from './realtime';
import { readIssuesFromDisk, resolveIssuesJsonlPathCandidates } from './read-issues';
import { diffSnapshots } from './snapshot-differ';
import type { BeadIssueWithProject } from './types';
type FileEventName = 'add' | 'change' | 'unlink';
function getGlobalAgentMessagesPath(): string {
const userProfile = process.env.USERPROFILE?.trim() || os.homedir();
return path.join(userProfile, '.beadboard', 'agent', 'messages');
}
interface WatchRegistration {
projectRoot: string;
watcher: FSWatcher;
@ -15,12 +24,16 @@ interface WatchRegistration {
export interface WatchManagerOptions {
debounceMs?: number;
eventBus?: IssuesEventBus;
activityBus?: ActivityEventBus;
}
export class IssuesWatchManager {
private readonly registrations = new Map<string, WatchRegistration>();
private readonly snapshots = new Map<string, BeadIssueWithProject[]>();
private readonly eventBus: IssuesEventBus;
private readonly activityBus: ActivityEventBus;
private readonly coalescer: ProjectEventCoalescer<{
changedPath?: string;
@ -30,18 +43,69 @@ export class IssuesWatchManager {
constructor(options: WatchManagerOptions = {}) {
const debounceMs = options.debounceMs ?? 150;
this.eventBus = options.eventBus ?? issuesEventBus;
this.coalescer = new ProjectEventCoalescer(debounceMs, ({ projectRoot, payload }) => {
this.activityBus = options.activityBus ?? activityEventBus;
this.coalescer = new ProjectEventCoalescer(debounceMs, async ({ projectRoot, payload }) => {
console.log(`[Watcher] Processing event for ${projectRoot}: ${payload.kind} (${payload.changedPath})`);
// 1. Emit basic file change event
this.eventBus.emit(projectRoot, payload.changedPath, payload.kind);
// 2. Perform snapshot diffing if issues.jsonl changed
const changedPath = payload.changedPath || '';
const isIssuesJsonl = changedPath.endsWith('issues.jsonl') || changedPath.endsWith('issues.jsonl.new');
const isGlobalMessages = changedPath.includes('.beadboard') && changedPath.includes('messages');
if (isIssuesJsonl) {
console.log(`[Watcher] Issues changed. Syncing activity for ${projectRoot}...`);
await this.syncActivity(projectRoot);
} else if (isGlobalMessages) {
console.log(`[Watcher] Global agent messages changed. Triggering refresh for ${projectRoot}.`);
// No need to syncActivity (diff issues) if only messages changed,
// the 'issues' event emitted above will trigger client refresh.
}
});
}
startWatch(projectRoot: string): void {
private async syncActivity(projectRoot: string): Promise<void> {
const projectKey = windowsPathKey(projectRoot);
const previous = this.snapshots.get(projectKey) ?? null;
try {
const current = await readIssuesFromDisk({ projectRoot });
const events = diffSnapshots(previous, current);
this.snapshots.set(projectKey, current);
events.forEach(event => {
this.activityBus.emit(event);
});
} catch (error) {
console.error(`[Watcher] Failed to sync activity for ${projectRoot}:`, error);
}
}
async startWatch(projectRoot: string): Promise<void> {
const projectKey = windowsPathKey(projectRoot);
if (this.registrations.has(projectKey)) {
return;
}
// Pre-populate snapshot to avoid "all created" burst on first change
try {
const initial = await readIssuesFromDisk({ projectRoot });
this.snapshots.set(projectKey, initial);
} catch {
// Ignore initial read failure, will retry on first change
}
const watchedPaths = resolveIssuesJsonlPathCandidates(projectRoot);
watchedPaths.push(path.join(projectRoot, '.beads', 'beads.db'));
watchedPaths.push(path.join(projectRoot, '.beads', 'beads.db-wal'));
watchedPaths.push(path.join(projectRoot, '.beads', 'last-touched'));
// Add global agent messages to enable cross-project communication real-time updates
watchedPaths.push(getGlobalAgentMessagesPath());
const watcher = chokidar.watch(watchedPaths, {
ignoreInitial: true,
awaitWriteFinish: {
@ -101,13 +165,23 @@ export class IssuesWatchManager {
}
}
const WATCHER_VERSION = 3; // Bump this to force re-creation on HMR
const globalRegistry = globalThis as typeof globalThis & {
__beadboardWatchManager?: IssuesWatchManager;
__beadboardWatcherVersion?: number;
};
export function getIssuesWatchManager(): IssuesWatchManager {
if (!globalRegistry.__beadboardWatchManager) {
if (!globalRegistry.__beadboardWatchManager || globalRegistry.__beadboardWatcherVersion !== WATCHER_VERSION) {
if (globalRegistry.__beadboardWatchManager) {
console.log('[Watcher] Stopping stale watcher instance...');
// Best effort stop of old instance
void globalRegistry.__beadboardWatchManager.stopAll();
}
console.log(`[Watcher] Initializing new manager (v${WATCHER_VERSION})...`);
globalRegistry.__beadboardWatchManager = new IssuesWatchManager();
globalRegistry.__beadboardWatcherVersion = WATCHER_VERSION;
}
return globalRegistry.__beadboardWatchManager;