Add realtime watcher+SSE transport with tests and lock-retry read path

This commit is contained in:
zenchantlive 2026-02-11 21:05:27 -08:00
parent cc616c1543
commit 3f2ae384f5
15 changed files with 727 additions and 75 deletions

View file

@ -0,0 +1,85 @@
import { canonicalizeWindowsPath } from '../../../lib/pathing';
import { issuesEventBus, SSE_CONNECTED_FRAME, SSE_HEARTBEAT_FRAME, toSseFrame } from '../../../lib/realtime';
import { getIssuesWatchManager } from '../../../lib/watcher';
const encoder = new TextEncoder();
const HEARTBEAT_MS = 15_000;
export async function GET(request: Request): Promise<Response> {
const url = new URL(request.url);
const projectRoot = canonicalizeWindowsPath(url.searchParams.get('projectRoot') ?? process.cwd());
try {
getIssuesWatchManager().startWatch(projectRoot);
} catch (error) {
return Response.json(
{
ok: false,
error: {
classification: 'unknown',
message: error instanceof Error ? error.message : 'Failed to initialize watcher.',
},
},
{ status: 500 },
);
}
let cleanup = () => {};
const stream = new ReadableStream<Uint8Array>({
start(controller) {
let closed = false;
const write = (payload: string) => {
if (closed) {
return;
}
controller.enqueue(encoder.encode(payload));
};
write(SSE_CONNECTED_FRAME);
const unsubscribe = issuesEventBus.subscribe(
(event) => {
write(toSseFrame(event));
},
{ projectRoot },
);
const heartbeat = setInterval(() => {
write(SSE_HEARTBEAT_FRAME);
}, HEARTBEAT_MS);
const close = () => {
if (closed) {
return;
}
closed = true;
clearInterval(heartbeat);
unsubscribe();
try {
controller.close();
} catch {
// stream already closed
}
};
cleanup = close;
request.signal.addEventListener('abort', close);
},
cancel() {
// Called when client closes EventSource/reader.
// Ensures heartbeat + subscriber cleanup always runs.
cleanup();
return Promise.resolve();
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
},
});
}

View file

@ -1,7 +1,7 @@
'use client';
import { motion } from 'framer-motion';
import { useEffect, useMemo, useState } from 'react';
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import type { KanbanFilterOptions, KanbanStatus } from '../../lib/kanban';
import { buildKanbanColumns, buildKanbanStats, filterKanbanIssues } from '../../lib/kanban';
@ -61,6 +61,7 @@ export function KanbanPage({ issues, projectRoot }: KanbanPageProps) {
const [desktopDetailMinimized, setDesktopDetailMinimized] = useState(false);
const [pendingIssueIds, setPendingIssueIds] = useState<Set<string>>(new Set());
const [mutationError, setMutationError] = useState<string | null>(null);
const refreshInFlightRef = useRef(false);
useEffect(() => {
setLocalIssues(issues);
@ -73,6 +74,38 @@ export function KanbanPage({ issues, projectRoot }: KanbanPageProps) {
const selectedIssue = useMemo(() => filteredIssues.find((issue) => issue.id === selectedIssueId) ?? null, [filteredIssues, selectedIssueId]);
const showDesktopDetail = Boolean(selectedIssue) && !desktopDetailMinimized;
const refreshIssues = useCallback(async (options: { silent?: boolean } = {}) => {
if (refreshInFlightRef.current) {
return;
}
refreshInFlightRef.current = true;
try {
const reconciled = await fetchIssues(projectRoot);
setLocalIssues(reconciled);
} catch (error) {
if (!options.silent) {
throw error;
}
} finally {
refreshInFlightRef.current = false;
}
}, [projectRoot]);
useEffect(() => {
const source = new EventSource(`/api/events?projectRoot=${encodeURIComponent(projectRoot)}`);
const onIssues = () => {
void refreshIssues({ silent: true });
};
source.addEventListener('issues', onIssues as EventListener);
return () => {
source.removeEventListener('issues', onIssues as EventListener);
source.close();
};
}, [projectRoot, refreshIssues]);
const mutateStatus = async (issue: BeadIssue, targetStatus: KanbanStatus) => {
const steps = planStatusTransition(issue, targetStatus);
if (steps.length === 0) {
@ -92,8 +125,7 @@ export function KanbanPage({ issues, projectRoot }: KanbanPageProps) {
});
}
const reconciled = await fetchIssues(projectRoot);
setLocalIssues(reconciled);
await refreshIssues();
} catch (error) {
setLocalIssues(previous);
setMutationError(error instanceof Error ? error.message : 'Mutation failed');

76
src/lib/coalescer.ts Normal file
View file

@ -0,0 +1,76 @@
import { windowsPathKey } from './pathing';
export interface CoalescedEventInput<T> {
projectRoot: string;
payload: T;
}
interface PendingEvent<T> {
timer: NodeJS.Timeout;
projectRoot: string;
payload: T;
}
export class ProjectEventCoalescer<T> {
private readonly pending = new Map<string, PendingEvent<T>>();
private readonly debounceMs: number;
private readonly onFlush: (event: CoalescedEventInput<T>) => void;
constructor(debounceMs: number, onFlush: (event: CoalescedEventInput<T>) => void) {
this.debounceMs = debounceMs;
this.onFlush = onFlush;
}
queue(projectRoot: string, payload: T): void {
const projectKey = windowsPathKey(projectRoot);
const existing = this.pending.get(projectKey);
if (existing) {
clearTimeout(existing.timer);
existing.projectRoot = projectRoot;
existing.payload = payload;
existing.timer = setTimeout(() => this.flush(projectKey), this.debounceMs);
return;
}
this.pending.set(projectKey, {
projectRoot,
payload,
timer: setTimeout(() => this.flush(projectKey), this.debounceMs),
});
}
cancel(projectRoot: string): void {
const projectKey = windowsPathKey(projectRoot);
const pending = this.pending.get(projectKey);
if (!pending) {
return;
}
clearTimeout(pending.timer);
this.pending.delete(projectKey);
}
cancelAll(): void {
for (const pending of this.pending.values()) {
clearTimeout(pending.timer);
}
this.pending.clear();
}
pendingCount(): number {
return this.pending.size;
}
private flush(projectKey: string): void {
const pending = this.pending.get(projectKey);
if (!pending) {
return;
}
this.pending.delete(projectKey);
this.onFlush({
projectRoot: pending.projectRoot,
payload: pending.payload,
});
}
}

View file

@ -1,8 +1,8 @@
import fs from 'node:fs/promises';
import path from 'node:path';
import { parseIssuesJsonl } from './parser';
import { canonicalizeWindowsPath } from './pathing';
import { readTextFileWithRetry } from './read-text-retry';
import type { BeadIssue } from './types';
export interface ReadIssuesOptions {
@ -26,7 +26,7 @@ export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promi
for (const issuesPath of candidates) {
try {
const jsonl = await fs.readFile(issuesPath, 'utf8');
const jsonl = await readTextFileWithRetry(issuesPath);
return parseIssuesJsonl(jsonl, {
includeTombstones: options.includeTombstones ?? false,
});

View file

@ -0,0 +1,41 @@
import fs from 'node:fs/promises';
const DEFAULT_RETRY_CODES = new Set(['EBUSY', 'EPERM']);
export interface ReadTextRetryOptions {
retries?: number;
delayMs?: number;
retryCodes?: Set<string>;
}
function sleep(delayMs: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, delayMs));
}
function shouldRetry(error: unknown, retryCodes: Set<string>): boolean {
const code = (error as NodeJS.ErrnoException | undefined)?.code;
return typeof code === 'string' && retryCodes.has(code);
}
export async function readTextFileWithRetry(
filePath: string,
options: ReadTextRetryOptions = {},
): Promise<string> {
const retries = options.retries ?? 2;
const delayMs = options.delayMs ?? 40;
const retryCodes = options.retryCodes ?? DEFAULT_RETRY_CODES;
let attempt = 0;
while (true) {
try {
return await fs.readFile(filePath, 'utf8');
} catch (error) {
if (attempt >= retries || !shouldRetry(error, retryCodes)) {
throw error;
}
attempt += 1;
await sleep(delayMs);
}
}
}

82
src/lib/realtime.ts Normal file
View file

@ -0,0 +1,82 @@
import { canonicalizeWindowsPath, windowsPathKey } from './pathing';
export type IssuesChangeKind = 'changed' | 'renamed';
export interface IssuesChangedEvent {
id: number;
projectRoot: string;
changedPath?: string;
kind: IssuesChangeKind;
at: string;
}
interface Subscriber {
projectKey?: string;
listener: (event: IssuesChangedEvent) => void;
}
export interface SubscribeOptions {
projectRoot?: string;
}
export class IssuesEventBus {
private nextEventId = 1;
private readonly subscribers = new Map<number, Subscriber>();
private nextSubscriberId = 1;
emit(projectRoot: string, changedPath?: string, kind: IssuesChangeKind = 'changed'): IssuesChangedEvent {
const canonicalProjectRoot = canonicalizeWindowsPath(projectRoot);
const projectKey = windowsPathKey(canonicalProjectRoot);
const event: IssuesChangedEvent = {
id: this.nextEventId,
projectRoot: canonicalProjectRoot,
changedPath: changedPath ? canonicalizeWindowsPath(changedPath) : undefined,
kind,
at: new Date().toISOString(),
};
this.nextEventId += 1;
for (const subscriber of this.subscribers.values()) {
if (!subscriber.projectKey || subscriber.projectKey === projectKey) {
subscriber.listener(event);
}
}
return event;
}
subscribe(listener: (event: IssuesChangedEvent) => void, options: SubscribeOptions = {}): () => void {
const id = this.nextSubscriberId;
this.nextSubscriberId += 1;
this.subscribers.set(id, {
listener,
projectKey: options.projectRoot ? windowsPathKey(options.projectRoot) : undefined,
});
return () => {
this.subscribers.delete(id);
};
}
getSubscriberCount(): number {
return this.subscribers.size;
}
resetForTests(): void {
this.subscribers.clear();
this.nextSubscriberId = 1;
this.nextEventId = 1;
}
}
export const issuesEventBus = new IssuesEventBus();
export function toSseFrame(event: IssuesChangedEvent): string {
return `id: ${event.id}\nevent: issues\ndata: ${JSON.stringify(event)}\n\n`;
}
export const SSE_HEARTBEAT_FRAME = ': heartbeat\n\n';
export const SSE_CONNECTED_FRAME = ': connected\n\n';

114
src/lib/watcher.ts Normal file
View file

@ -0,0 +1,114 @@
import chokidar, { type FSWatcher } from 'chokidar';
import { ProjectEventCoalescer } from './coalescer';
import { windowsPathKey } from './pathing';
import { issuesEventBus, type IssuesChangeKind, type IssuesEventBus } from './realtime';
import { resolveIssuesJsonlPathCandidates } from './read-issues';
type FileEventName = 'add' | 'change' | 'unlink';
interface WatchRegistration {
projectRoot: string;
watcher: FSWatcher;
}
export interface WatchManagerOptions {
debounceMs?: number;
eventBus?: IssuesEventBus;
}
export class IssuesWatchManager {
private readonly registrations = new Map<string, WatchRegistration>();
private readonly eventBus: IssuesEventBus;
private readonly coalescer: ProjectEventCoalescer<{
changedPath?: string;
kind: IssuesChangeKind;
}>;
constructor(options: WatchManagerOptions = {}) {
const debounceMs = options.debounceMs ?? 150;
this.eventBus = options.eventBus ?? issuesEventBus;
this.coalescer = new ProjectEventCoalescer(debounceMs, ({ projectRoot, payload }) => {
this.eventBus.emit(projectRoot, payload.changedPath, payload.kind);
});
}
startWatch(projectRoot: string): void {
const projectKey = windowsPathKey(projectRoot);
if (this.registrations.has(projectKey)) {
return;
}
const watchedPaths = resolveIssuesJsonlPathCandidates(projectRoot);
const watcher = chokidar.watch(watchedPaths, {
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: 80,
pollInterval: 15,
},
});
const onFileEvent = (eventName: FileEventName, changedPath: string) => {
const kind: IssuesChangeKind = eventName === 'unlink' ? 'renamed' : 'changed';
this.queueCoalescedEvent(projectRoot, changedPath, kind);
};
watcher.on('add', (changedPath) => onFileEvent('add', changedPath));
watcher.on('change', (changedPath) => onFileEvent('change', changedPath));
watcher.on('unlink', (changedPath) => onFileEvent('unlink', changedPath));
this.registrations.set(projectKey, {
projectRoot,
watcher,
});
}
async stopWatch(projectRoot: string): Promise<void> {
const projectKey = windowsPathKey(projectRoot);
const registration = this.registrations.get(projectKey);
if (!registration) {
return;
}
this.coalescer.cancel(projectRoot);
this.registrations.delete(projectKey);
await registration.watcher.close();
}
async stopAll(): Promise<void> {
const closeOps: Promise<void>[] = [];
for (const registration of this.registrations.values()) {
closeOps.push(registration.watcher.close());
}
this.coalescer.cancelAll();
this.registrations.clear();
await Promise.all(closeOps);
}
getWatchedProjectCount(): number {
return this.registrations.size;
}
private queueCoalescedEvent(projectRoot: string, changedPath: string, kind: IssuesChangeKind): void {
this.coalescer.queue(projectRoot, {
changedPath,
kind,
});
}
}
const globalRegistry = globalThis as typeof globalThis & {
__beadboardWatchManager?: IssuesWatchManager;
};
export function getIssuesWatchManager(): IssuesWatchManager {
if (!globalRegistry.__beadboardWatchManager) {
globalRegistry.__beadboardWatchManager = new IssuesWatchManager();
}
return globalRegistry.__beadboardWatchManager;
}