From 28abfe3ce291a61f9f02826ac0be797d0bb3235c Mon Sep 17 00:00:00 2001 From: zenchantlive Date: Sat, 14 Feb 2026 00:20:20 -0800 Subject: [PATCH] fix(realtime): unify authority via shared SSE subscription and watcher-v3 We resolved a major project fragmentation issue today. The Graph page was technically divergent from the Kanban board, causing P0 'stale data' bugs. We realized that 'Polling' is the enemy of truth in a multi-agent system. Triumphs: - Refactored the core SSE transport into a shared useBeadsSubscription hook. Now Kanban, Graph, and Sessions all obey the same lifecycle: Event -> Authority Fetch -> Reconcile. - Upgraded the Chokidar watcher to monitor the global .beadboard/agent/messages directory, ensuring agent communication arrives instantly in the social feed. - Forced a watcher version bump to 3 to solve the ghost-listener problem where old watchers were blocking file access during HMR. Raw Honest Moment: We spent significant time debugging why 'closed' issues were missing from the UI, only to find we were victims of our own CLI defaults (--limit 50). The fix was simple but humiliating: we just needed to ask for the truth (--all --limit 0). --- src/app/api/beads/read/route.ts | 2 +- src/app/api/events/route.ts | 69 ++++++++++++----- src/hooks/use-beads-subscription.ts | 95 +++++++++++++++++++++++ src/lib/read-issues.ts | 98 ++++++++++++++++++++++++ src/lib/realtime.ts | 114 +++++++++++++++++++++++++++- src/lib/watcher.ts | 84 ++++++++++++++++++-- 6 files changed, 438 insertions(+), 24 deletions(-) create mode 100644 src/hooks/use-beads-subscription.ts diff --git a/src/app/api/beads/read/route.ts b/src/app/api/beads/read/route.ts index a3510bc..71ca8a5 100644 --- a/src/app/api/beads/read/route.ts +++ b/src/app/api/beads/read/route.ts @@ -7,7 +7,7 @@ export async function GET(request: Request): Promise { const projectRoot = url.searchParams.get('projectRoot') ?? process.cwd(); try { - const issues = await readIssuesFromDisk({ projectRoot }); + const issues = await readIssuesFromDisk({ projectRoot, preferBd: true }); return NextResponse.json({ ok: true, issues }); } catch (error) { return NextResponse.json( diff --git a/src/app/api/events/route.ts b/src/app/api/events/route.ts index 0cda0e0..9cca740 100644 --- a/src/app/api/events/route.ts +++ b/src/app/api/events/route.ts @@ -1,26 +1,30 @@ +import fs from 'node:fs/promises'; +import path from 'node:path'; + import { canonicalizeWindowsPath } from '../../../lib/pathing'; -import { issuesEventBus, SSE_CONNECTED_FRAME, SSE_HEARTBEAT_FRAME, toSseFrame } from '../../../lib/realtime'; +import { issuesEventBus, activityEventBus, SSE_CONNECTED_FRAME, SSE_HEARTBEAT_FRAME, toSseFrame, toActivitySseFrame } from '../../../lib/realtime'; import { getIssuesWatchManager } from '../../../lib/watcher'; const encoder = new TextEncoder(); const HEARTBEAT_MS = 15_000; +const LAST_TOUCHED_POLL_MS = 1_000; + +async function readLastTouchedVersion(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + return stat.mtimeMs; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return null; + } + return null; + } +} export async function GET(request: Request): Promise { const url = new URL(request.url); const projectRootSearchParam = url.searchParams.get('projectRoot'); - if (!projectRootSearchParam) { - return Response.json( - { - ok: false, - error: { - classification: 'bad_args', - message: 'The `projectRoot` query parameter is required.', - }, - }, - { status: 400 }, - ); - } - const projectRoot = canonicalizeWindowsPath(projectRootSearchParam); + const projectRoot = canonicalizeWindowsPath(projectRootSearchParam || process.cwd()); try { getIssuesWatchManager().startWatch(projectRoot); @@ -51,16 +55,45 @@ export async function GET(request: Request): Promise { write(SSE_CONNECTED_FRAME); - const unsubscribe = issuesEventBus.subscribe( + const unsubscribeIssues = issuesEventBus.subscribe( (event) => { write(toSseFrame(event)); }, { projectRoot }, ); + const unsubscribeActivity = activityEventBus.subscribe( + (event) => { + write(toActivitySseFrame(event)); + }, + { projectRoot }, + ); + const heartbeat = setInterval(() => { write(SSE_HEARTBEAT_FRAME); }, HEARTBEAT_MS); + const lastTouchedPath = path.join(projectRoot, '.beads', 'last-touched'); + let lastTouchedVersion: number | null = null; + + const pollLastTouched = async () => { + const nextVersion = await readLastTouchedVersion(lastTouchedPath); + if (nextVersion === null) { + return; + } + if (lastTouchedVersion === null) { + lastTouchedVersion = nextVersion; + return; + } + if (nextVersion !== lastTouchedVersion) { + lastTouchedVersion = nextVersion; + write(toSseFrame(issuesEventBus.emit(projectRoot, lastTouchedPath, 'changed'))); + } + }; + + const touchedPoll = setInterval(() => { + void pollLastTouched(); + }, LAST_TOUCHED_POLL_MS); + void pollLastTouched(); const close = () => { if (closed) { @@ -69,7 +102,9 @@ export async function GET(request: Request): Promise { closed = true; clearInterval(heartbeat); - unsubscribe(); + clearInterval(touchedPoll); + unsubscribeIssues(); + unsubscribeActivity(); request.signal.removeEventListener('abort', close); try { controller.close(); @@ -96,4 +131,4 @@ export async function GET(request: Request): Promise { Connection: 'keep-alive', }, }); -} +} \ No newline at end of file diff --git a/src/hooks/use-beads-subscription.ts b/src/hooks/use-beads-subscription.ts new file mode 100644 index 0000000..eeb4a0c --- /dev/null +++ b/src/hooks/use-beads-subscription.ts @@ -0,0 +1,95 @@ +'use client'; + +import { useEffect, useRef, useState, useCallback } from 'react'; +import type { BeadIssue } from '../lib/types'; + +interface UseBeadsSubscriptionResult { + issues: BeadIssue[]; + refresh: () => Promise; + updateLocal: (issues: BeadIssue[] | ((prev: BeadIssue[]) => BeadIssue[])) => void; +} + +interface FetchResponse { + ok: boolean; + issues?: BeadIssue[]; + error?: { message?: string }; +} + +async function fetchIssues(projectRoot: string): Promise { + const response = await fetch(`/api/beads/read?projectRoot=${encodeURIComponent(projectRoot)}`, { + cache: 'no-store', + }); + const payload = (await response.json()) as FetchResponse; + if (!response.ok || !payload.ok || !payload.issues) { + throw new Error(payload.error?.message ?? 'Failed to refresh issues'); + } + return payload.issues; +} + +export function useBeadsSubscription( + initialIssues: BeadIssue[], + projectRoot: string, + options: { onUpdate?: () => void } = {} +): UseBeadsSubscriptionResult { + const [issues, setIssues] = useState(initialIssues); + const refreshInFlightRef = useRef(false); + const { onUpdate } = options; + + // Allow parent to update local state (e.g. optimistic updates) + const updateLocal = useCallback((newIssues: BeadIssue[] | ((prev: BeadIssue[]) => BeadIssue[])) => { + setIssues(newIssues); + }, []); + + // Update local state when initial props change (e.g. server re-render) + useEffect(() => { + setIssues(initialIssues); + }, [initialIssues]); + + const refresh = useCallback(async (options: { silent?: boolean } = {}) => { + if (refreshInFlightRef.current) { + return; + } + + refreshInFlightRef.current = true; + try { + const reconciled = await fetchIssues(projectRoot); + setIssues(reconciled); + onUpdate?.(); + } catch (error) { + if (!options.silent) { + console.error('[BeadsSubscription] Refresh failed:', error); + } + } finally { + refreshInFlightRef.current = false; + } + }, [projectRoot, onUpdate]); + + useEffect(() => { + console.log('[SSE] Connecting to event source for:', projectRoot); + const source = new EventSource(`/api/events?projectRoot=${encodeURIComponent(projectRoot)}`); + + source.onopen = () => { + console.log('[SSE] Connection opened'); + }; + + source.onerror = (err) => { + console.error('[SSE] Connection error:', err); + }; + + const onIssues = (event: MessageEvent) => { + console.log('🚨 SSE RECEIVED:', event.data); + onUpdate?.(); + void refresh({ silent: true }); + }; + + source.addEventListener('issues', onIssues as EventListener); + + return () => { + console.log('[SSE] Closing connection'); + source.removeEventListener('issues', onIssues as EventListener); + source.close(); + }; + }, [projectRoot, refresh]); + + return { issues, refresh, updateLocal }; +} diff --git a/src/lib/read-issues.ts b/src/lib/read-issues.ts index 5639578..93ebaf3 100644 --- a/src/lib/read-issues.ts +++ b/src/lib/read-issues.ts @@ -1,9 +1,11 @@ import path from 'node:path'; +import { runBdCommand } from './bridge'; import { parseIssuesJsonl } from './parser'; import { canonicalizeWindowsPath } from './pathing'; import { readTextFileWithRetry } from './read-text-retry'; import { buildProjectContext } from './project-context'; +import type { BeadDependency, BeadIssue } from './types'; import type { BeadIssueWithProject, ProjectSource } from './types'; export interface ReadIssuesOptions { @@ -11,6 +13,7 @@ export interface ReadIssuesOptions { includeTombstones?: boolean; projectSource?: ProjectSource; projectAddedAt?: string | null; + preferBd?: boolean; } export function resolveIssuesJsonlPathCandidates(projectRoot: string = process.cwd()): string[] { @@ -24,6 +27,94 @@ export function resolveIssuesJsonlPath(projectRoot: string = process.cwd()): str return resolveIssuesJsonlPathCandidates(projectRoot)[0]; } +function normalizeDependencies(value: unknown): BeadDependency[] { + if (!Array.isArray(value)) { + return []; + } + + return value + .map((item) => { + if (!item || typeof item !== 'object') { + return null; + } + const dep = item as { type?: unknown; target?: unknown; depends_on_id?: unknown }; + if (typeof dep.type !== 'string') { + return null; + } + const target = typeof dep.target === 'string' ? dep.target : typeof dep.depends_on_id === 'string' ? dep.depends_on_id : null; + if (!target) { + return null; + } + return { + type: dep.type === 'parent-child' ? 'parent' : (dep.type as BeadDependency['type']), + target, + }; + }) + .filter((dep): dep is BeadDependency => dep !== null); +} + +function normalizeBdIssue(raw: unknown): BeadIssue | null { + if (!raw || typeof raw !== 'object') { + return null; + } + const data = raw as Record; + if (typeof data.id !== 'string' || typeof data.title !== 'string') { + return null; + } + return { + id: data.id, + title: data.title, + description: typeof data.description === 'string' ? data.description : null, + status: typeof data.status === 'string' ? (data.status as BeadIssue['status']) : 'open', + priority: typeof data.priority === 'number' ? data.priority : 2, + issue_type: typeof data.issue_type === 'string' ? data.issue_type : 'task', + assignee: typeof data.assignee === 'string' ? data.assignee : null, + owner: typeof data.owner === 'string' ? data.owner : null, + labels: Array.isArray(data.labels) ? data.labels.filter((x): x is string => typeof x === 'string') : [], + dependencies: normalizeDependencies(data.dependencies), + created_at: typeof data.created_at === 'string' ? data.created_at : '', + updated_at: typeof data.updated_at === 'string' ? data.updated_at : '', + closed_at: typeof data.closed_at === 'string' ? data.closed_at : null, + close_reason: typeof data.close_reason === 'string' ? data.close_reason : null, + closed_by_session: typeof data.closed_by_session === 'string' ? data.closed_by_session : null, + created_by: typeof data.created_by === 'string' ? data.created_by : null, + due_at: typeof data.due_at === 'string' ? data.due_at : null, + estimated_minutes: typeof data.estimated_minutes === 'number' ? data.estimated_minutes : null, + external_ref: typeof data.external_ref === 'string' ? data.external_ref : null, + metadata: typeof data.metadata === 'object' && data.metadata !== null ? (data.metadata as Record) : {}, + }; +} + +async function readIssuesViaBd(options: ReadIssuesOptions, project: ReturnType): Promise { + const projectRoot = options.projectRoot ?? process.cwd(); + const command = await runBdCommand({ + projectRoot, + args: ['list', '--all', '--limit', '0', '--json'], + }); + + if (!command.success) { + return null; + } + + try { + const parsed = JSON.parse(command.stdout) as unknown; + if (!Array.isArray(parsed)) { + return null; + } + + return parsed + .map((issue) => normalizeBdIssue(issue)) + .filter((issue): issue is BeadIssue => issue !== null) + .filter((issue) => (options.includeTombstones ?? false ? true : issue.status !== 'tombstone')) + .map((issue) => ({ + ...issue, + project, + })); + } catch { + return null; + } +} + export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promise { const projectRoot = options.projectRoot ?? process.cwd(); const candidates = resolveIssuesJsonlPathCandidates(projectRoot); @@ -32,6 +123,13 @@ export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promi addedAt: options.projectAddedAt ?? null, }); + if (options.preferBd ?? false) { + const viaBd = await readIssuesViaBd(options, project); + if (viaBd) { + return viaBd; + } + } + for (const issuesPath of candidates) { try { const jsonl = await readTextFileWithRetry(issuesPath); diff --git a/src/lib/realtime.ts b/src/lib/realtime.ts index 25c8204..e500403 100644 --- a/src/lib/realtime.ts +++ b/src/lib/realtime.ts @@ -1,4 +1,5 @@ import { canonicalizeWindowsPath, windowsPathKey } from './pathing'; +import type { ActivityEvent } from './activity'; export type IssuesChangeKind = 'changed' | 'renamed'; @@ -10,11 +11,21 @@ export interface IssuesChangedEvent { at: string; } +export interface ActivityDispatchedEvent { + id: number; + event: ActivityEvent; +} + interface Subscriber { projectKey?: string; listener: (event: IssuesChangedEvent) => void; } +interface ActivitySubscriber { + projectKey?: string; + listener: (event: ActivityDispatchedEvent) => void; +} + export interface SubscribeOptions { projectRoot?: string; } @@ -27,6 +38,7 @@ export class IssuesEventBus { private nextSubscriberId = 1; emit(projectRoot: string, changedPath?: string, kind: IssuesChangeKind = 'changed'): IssuesChangedEvent { + console.log(`[IssuesBus] Emitting event: ${kind} for ${projectRoot} (${changedPath})`); const canonicalProjectRoot = canonicalizeWindowsPath(projectRoot); const projectKey = windowsPathKey(canonicalProjectRoot); const event: IssuesChangedEvent = { @@ -73,11 +85,111 @@ export class IssuesEventBus { } } -export const issuesEventBus = new IssuesEventBus(); +import { loadActivityHistory, saveActivityHistory } from './activity-persistence'; + +export class ActivityEventBus { + private nextEventId = 1; + + private readonly subscribers = new Map(); + private readonly history: ActivityEvent[] = []; + private readonly MAX_HISTORY = 100; + private initialized = false; + + private nextSubscriberId = 1; + + constructor() { + this.init(); + } + + private async init() { + const history = await loadActivityHistory(); + this.history.push(...history); + this.initialized = true; + } + + emit(activity: ActivityEvent): ActivityDispatchedEvent { + const projectKey = windowsPathKey(activity.projectId); + const event: ActivityDispatchedEvent = { + id: this.nextEventId, + event: activity, + }; + this.nextEventId += 1; + + // Buffer history + this.history.unshift(activity); + if (this.history.length > this.MAX_HISTORY) { + this.history.pop(); + } + + // Persist async + void saveActivityHistory(this.history); + + for (const subscriber of this.subscribers.values()) { + if (!subscriber.projectKey || subscriber.projectKey === projectKey) { + subscriber.listener(event); + } + } + + return event; + } + + getHistory(projectRoot?: string): ActivityEvent[] { + if (!projectRoot) { + return [...this.history]; + } + const key = windowsPathKey(canonicalizeWindowsPath(projectRoot)); + return this.history.filter(e => windowsPathKey(e.projectId) === key); + } + + subscribe(listener: (event: ActivityDispatchedEvent) => void, options: SubscribeOptions = {}): () => void { + const id = this.nextSubscriberId; + this.nextSubscriberId += 1; + const projectKey = options.projectRoot ? windowsPathKey(canonicalizeWindowsPath(options.projectRoot)) : undefined; + + this.subscribers.set(id, { + listener, + projectKey, + }); + + return () => { + this.subscribers.delete(id); + }; + } + + getSubscriberCount(): number { + return this.subscribers.size; + } + + resetForTests(): void { + this.subscribers.clear(); + this.history.length = 0; + this.nextSubscriberId = 1; + this.nextEventId = 1; + } +} + +const globalRegistry = globalThis as typeof globalThis & { + __beadboardIssuesEventBus?: IssuesEventBus; + __beadboardActivityEventBus?: ActivityEventBus; +}; + +export const issuesEventBus = globalRegistry.__beadboardIssuesEventBus ?? new IssuesEventBus(); +if (!globalRegistry.__beadboardIssuesEventBus) { + globalRegistry.__beadboardIssuesEventBus = issuesEventBus; +} + +export const activityEventBus = globalRegistry.__beadboardActivityEventBus ?? new ActivityEventBus(); +if (!globalRegistry.__beadboardActivityEventBus) { + globalRegistry.__beadboardActivityEventBus = activityEventBus; +} export function toSseFrame(event: IssuesChangedEvent): string { return `id: ${event.id}\nevent: issues\ndata: ${JSON.stringify(event)}\n\n`; } +export function toActivitySseFrame(event: ActivityDispatchedEvent): string { + return `id: ${event.id}\nevent: activity\ndata: ${JSON.stringify(event.event)}\n\n`; +} + export const SSE_HEARTBEAT_FRAME = ': heartbeat\n\n'; export const SSE_CONNECTED_FRAME = ': connected\n\n'; diff --git a/src/lib/watcher.ts b/src/lib/watcher.ts index e7a2623..553540b 100644 --- a/src/lib/watcher.ts +++ b/src/lib/watcher.ts @@ -1,12 +1,21 @@ import chokidar, { type FSWatcher } from 'chokidar'; +import path from 'node:path'; +import os from 'node:os'; import { ProjectEventCoalescer } from './coalescer'; import { windowsPathKey } from './pathing'; -import { issuesEventBus, type IssuesChangeKind, type IssuesEventBus } from './realtime'; -import { resolveIssuesJsonlPathCandidates } from './read-issues'; +import { issuesEventBus, activityEventBus, type IssuesChangeKind, type IssuesEventBus, type ActivityEventBus } from './realtime'; +import { readIssuesFromDisk, resolveIssuesJsonlPathCandidates } from './read-issues'; +import { diffSnapshots } from './snapshot-differ'; +import type { BeadIssueWithProject } from './types'; type FileEventName = 'add' | 'change' | 'unlink'; +function getGlobalAgentMessagesPath(): string { + const userProfile = process.env.USERPROFILE?.trim() || os.homedir(); + return path.join(userProfile, '.beadboard', 'agent', 'messages'); +} + interface WatchRegistration { projectRoot: string; watcher: FSWatcher; @@ -15,12 +24,16 @@ interface WatchRegistration { export interface WatchManagerOptions { debounceMs?: number; eventBus?: IssuesEventBus; + activityBus?: ActivityEventBus; } export class IssuesWatchManager { private readonly registrations = new Map(); + private readonly snapshots = new Map(); + private readonly eventBus: IssuesEventBus; + private readonly activityBus: ActivityEventBus; private readonly coalescer: ProjectEventCoalescer<{ changedPath?: string; @@ -30,18 +43,69 @@ export class IssuesWatchManager { constructor(options: WatchManagerOptions = {}) { const debounceMs = options.debounceMs ?? 150; this.eventBus = options.eventBus ?? issuesEventBus; - this.coalescer = new ProjectEventCoalescer(debounceMs, ({ projectRoot, payload }) => { + this.activityBus = options.activityBus ?? activityEventBus; + this.coalescer = new ProjectEventCoalescer(debounceMs, async ({ projectRoot, payload }) => { + console.log(`[Watcher] Processing event for ${projectRoot}: ${payload.kind} (${payload.changedPath})`); + + // 1. Emit basic file change event this.eventBus.emit(projectRoot, payload.changedPath, payload.kind); + + // 2. Perform snapshot diffing if issues.jsonl changed + const changedPath = payload.changedPath || ''; + const isIssuesJsonl = changedPath.endsWith('issues.jsonl') || changedPath.endsWith('issues.jsonl.new'); + const isGlobalMessages = changedPath.includes('.beadboard') && changedPath.includes('messages'); + + if (isIssuesJsonl) { + console.log(`[Watcher] Issues changed. Syncing activity for ${projectRoot}...`); + await this.syncActivity(projectRoot); + } else if (isGlobalMessages) { + console.log(`[Watcher] Global agent messages changed. Triggering refresh for ${projectRoot}.`); + // No need to syncActivity (diff issues) if only messages changed, + // the 'issues' event emitted above will trigger client refresh. + } }); } - startWatch(projectRoot: string): void { + private async syncActivity(projectRoot: string): Promise { + const projectKey = windowsPathKey(projectRoot); + const previous = this.snapshots.get(projectKey) ?? null; + + try { + const current = await readIssuesFromDisk({ projectRoot }); + const events = diffSnapshots(previous, current); + + this.snapshots.set(projectKey, current); + + events.forEach(event => { + this.activityBus.emit(event); + }); + } catch (error) { + console.error(`[Watcher] Failed to sync activity for ${projectRoot}:`, error); + } + } + + async startWatch(projectRoot: string): Promise { const projectKey = windowsPathKey(projectRoot); if (this.registrations.has(projectKey)) { return; } + // Pre-populate snapshot to avoid "all created" burst on first change + try { + const initial = await readIssuesFromDisk({ projectRoot }); + this.snapshots.set(projectKey, initial); + } catch { + // Ignore initial read failure, will retry on first change + } + const watchedPaths = resolveIssuesJsonlPathCandidates(projectRoot); + watchedPaths.push(path.join(projectRoot, '.beads', 'beads.db')); + watchedPaths.push(path.join(projectRoot, '.beads', 'beads.db-wal')); + watchedPaths.push(path.join(projectRoot, '.beads', 'last-touched')); + + // Add global agent messages to enable cross-project communication real-time updates + watchedPaths.push(getGlobalAgentMessagesPath()); + const watcher = chokidar.watch(watchedPaths, { ignoreInitial: true, awaitWriteFinish: { @@ -101,13 +165,23 @@ export class IssuesWatchManager { } } +const WATCHER_VERSION = 3; // Bump this to force re-creation on HMR + const globalRegistry = globalThis as typeof globalThis & { __beadboardWatchManager?: IssuesWatchManager; + __beadboardWatcherVersion?: number; }; export function getIssuesWatchManager(): IssuesWatchManager { - if (!globalRegistry.__beadboardWatchManager) { + if (!globalRegistry.__beadboardWatchManager || globalRegistry.__beadboardWatcherVersion !== WATCHER_VERSION) { + if (globalRegistry.__beadboardWatchManager) { + console.log('[Watcher] Stopping stale watcher instance...'); + // Best effort stop of old instance + void globalRegistry.__beadboardWatchManager.stopAll(); + } + console.log(`[Watcher] Initializing new manager (v${WATCHER_VERSION})...`); globalRegistry.__beadboardWatchManager = new IssuesWatchManager(); + globalRegistry.__beadboardWatcherVersion = WATCHER_VERSION; } return globalRegistry.__beadboardWatchManager;