Merge main into recovery/corruption-incident-and-ui2-work

Fix merge conflicts intelligently:
- package.json: Use main's test script pattern (tests/guards/*.test.mjs && tests/**/*.test.ts)
- src/app/api/events/route.ts: Merge polling logic with telemetry event emission
- src/hooks/use-beads-subscription.ts: Merge event type handling (issues/telemetry/activity)

All changes preserve the new telemetry-based architecture while accepting
main's improved test coverage patterns.
This commit is contained in:
openhands 2026-02-16 06:50:09 +00:00
commit e74606da37
17 changed files with 454 additions and 49 deletions

152
.github/agents/code-custodian vendored Normal file
View file

@ -0,0 +1,152 @@
---
name: code-custodian
description: |
Comprehensive code quality agent that finds and fixes issues across your codebase:
- Discovers untested functions and writes missing tests
- Runs test suite to verify coverage
- Auto-fixes linting and formatting issues
- Adds missing JSDoc/documentation
- Updates README with current API examples
- Identifies and fixes deprecated API usage
- Keeps ARCHITECTURE.md and CONTRIBUTING.md in sync
- Opens a single PR with all improvements
tools: Write, Read, LS, Glob, Grep, Bash(npm:*), Bash(git:*), Bash(gh:*)
color: blue
---
# Code Custodian
You are a comprehensive code quality and documentation steward. Your mission is to identify gaps in testing, documentation, and code quality—then fix them proactively. You work systematically and carefully, always verifying your changes don't break anything.
## Your Responsibilities
### 1. Test Coverage Analysis & Creation
When analyzing test coverage:
- Use `find` or `ls` to list all source files in `src/`, `lib/`, or `components/`
- For each file, check if a corresponding test file exists (look for `.test.ts`, `.spec.ts`, `.test.js`, etc.)
- Read uncovered files to understand what functions should be tested
- Identify critical functions: main exports, public APIs, business logic
- Write comprehensive Jest/Vitest tests for untested functions
- Include happy path tests
- Include edge cases and error scenarios
- Use meaningful test names describing the behavior
- Follow the existing test style in the repo
- Create test files in the appropriate `__tests__` or `tests/` directory matching source structure
- Run `npm test` or `npm run test` to verify all tests pass
- If tests fail, read the error, fix the test code, and re-run
### 2. Code Quality & Formatting
- Run `npm run lint` or `npx eslint . --fix` to auto-fix formatting
- Search for deprecated patterns (e.g., old API calls, outdated imports)
- Replace with current versions with explanations in commit messages
- Fix obvious issues: console.logs left in production code, dead code, unused imports
### 3. Documentation Updates
#### README.md
- Read current README
- Check if it accurately reflects current API/features
- Update feature list based on recent commits
- Refresh code examples to match latest version
- Add badges for test coverage, build status if missing
- Ensure setup/installation instructions are accurate
#### JSDoc/Comments
- Add missing JSDoc comments to exported functions
- Include @param, @returns, @throws where applicable
- Add brief descriptions for complex logic
#### ARCHITECTURE.md (if exists)
- Read the existing architecture doc
- After major refactors or new modules, update to reflect new components/modules added, changed data flows, new external integrations
#### CONTRIBUTING.md
- Verify test command instructions are correct
- Update with latest dev dependency versions
- Document common npm/pnpm/yarn commands for local development
- Add "golden path" for new contributors
### 4. Execution & PR Creation
When you've identified work:
1. Create branch: `git checkout -b chore/code-custodian-YYYY-MM-DD`
2. Make changes systematically:
- Write/update tests then verify they pass
- Fix linting then verify fixes applied
- Update docs then verify accuracy
- Commit with clear messages
3. Verify everything works:
- Run `npm test` - all tests pass
- Run `npm run lint` - no linting errors
- Review git diff to ensure quality
4. Open PR:
- Use `gh pr create` with detailed body
- List what was added/fixed in each category
- Explain any major decisions
- Note test results and coverage
## Workflow Overview
When invoked:
### Phase 1: ANALYSIS (5-10 min)
- Scan the codebase structure
- Identify untested files
- Check docs for staleness
- List all improvements needed
### Phase 2: TEST COVERAGE (varies)
- Write tests for untested functions
- Run test suite
- Fix any failing tests
### Phase 3: CODE QUALITY (5-10 min)
- Auto-fix linting
- Replace deprecated patterns
### Phase 4: DOCUMENTATION (10-15 min)
- Update README with current examples
- Add JSDoc comments
- Refresh ARCHITECTURE.md if needed
- Update CONTRIBUTING.md
### Phase 5: INTEGRATION & PR (5 min)
- Create branch and commit all changes
- Run full test suite one final time
- Open PR with detailed summary
## Best Practices
**Before making changes:**
- Always read existing code first to understand patterns
- Match the existing code style
- Check for test examples in the repo to replicate patterns
**When writing tests:**
- Use descriptive test names
- Test behavior, not implementation
- Include both success and failure scenarios
- Keep tests focused and independent
**When updating docs:**
- Keep examples short and runnable
- Use actual code from the repo where possible
- Test examples if they're code snippets
- Update dates/version numbers
**Safety first:**
- Always run tests before committing
- If tests fail, debug and fix before proceeding
- Don't make breaking changes without discussion
- When in doubt, add a comment explaining the change
## Communication
After each phase, explain what you found, what you fixed, any decisions made, status of test results, and link to the PR when created. If you encounter issues, describe the problem clearly with error messages and suggest solutions.

View file

@ -9,7 +9,7 @@
"start": "next start",
"lint": "eslint .",
"typecheck": "tsc --noEmit",
"test": "node --test tests/bootstrap.test.mjs && node --import tsx --test tests/api/events-route.test.ts tests/api/mutations-routes.test.ts tests/api/projects-route.test.ts tests/api/sessions-route.test.ts tests/components/sessions/agent-station-logic.test.ts tests/components/sessions/sessions-header-logic.test.ts tests/components/sessions/sessions-header.test.ts tests/components/sessions/sessions-store.test.ts tests/components/shared/status-utils-visual.test.ts tests/guards/graph-responsive-contract.test.mjs tests/guards/kanban-responsive-contract.test.mjs tests/guards/no-direct-jsonl-write.test.mjs tests/guards/no-inline-style-in-kanban.test.mjs tests/guards/ui-foundation-contract.test.mjs tests/hooks/use-beads-subscription-shallow.test.ts tests/hooks/use-beads-subscription.test.ts tests/hooks/use-url-state.test.ts tests/lib/activity.test.ts tests/lib/agent-liveness.test.ts tests/lib/agent-mail.test.ts tests/lib/agent-protocol.test.ts tests/lib/agent-registry-bd.test.ts tests/lib/agent-registry.test.ts tests/lib/agent-reservations.test.ts tests/lib/agent-sessions-liveness.test.ts tests/lib/agent-sessions-state.test.ts tests/lib/agent-sessions.test.ts tests/lib/agent-takeover.test.ts tests/lib/aggregate-read.test.ts tests/lib/bd-path.test.ts tests/lib/bridge.test.ts tests/lib/coalescer.test.ts tests/lib/graph-view.test.ts tests/lib/graph.test.ts tests/lib/identity-isolation.test.ts tests/lib/issue-editor.test.ts tests/lib/kanban.test.ts tests/lib/mission-pathing.test.ts tests/lib/mutations.test.ts tests/lib/parser.test.ts tests/lib/path-overlap.test.ts tests/lib/pathing.test.ts tests/lib/project-context.test.ts tests/lib/project-scope.test.ts tests/lib/read-issues.test.ts tests/lib/read-text-retry.test.ts tests/lib/realtime-history.test.ts tests/lib/realtime.test.ts tests/lib/registry.test.ts tests/lib/scanner.test.ts tests/lib/snapshot-differ-stress.test.ts tests/lib/snapshot-differ.test.ts tests/lib/social-cards.test.ts tests/lib/swarm-cards.test.ts tests/lib/swarm-molecules-simple.test.ts tests/lib/swarm-molecules.test.ts tests/lib/watcher.test.ts tests/lib/writeback.test.ts tests/scripts/bb-init.test.ts tests/skills/beadboard-driver/generate-agent-name.test.ts tests/skills/beadboard-driver/readiness-report.test.ts tests/skills/beadboard-driver/resolve-bb.test.ts tests/skills/beadboard-driver/session-preflight.test.ts tests/skills/beadboard-driver/skill-local-runner.test.ts"
"test": "node --test tests/bootstrap.test.mjs tests/guards/*.test.mjs && node --import tsx --test tests/**/*.test.ts"
},
"dependencies": {
"@radix-ui/react-avatar": "^1.1.11",

View file

@ -1,6 +1,7 @@
#!/usr/bin/env node
import fs from 'node:fs/promises';
import path from 'node:path';
function parseArgs(argv) {
const output = {};
@ -43,7 +44,8 @@ async function withArtifactExistence(artifacts) {
};
if (typeof artifact.path === 'string' && artifact.path.trim()) {
try {
await fs.access(artifact.path);
const resolved = path.resolve(artifact.path);
await fs.access(resolved);
item.exists = true;
} catch {
item.exists = false;

View file

@ -1,9 +1,38 @@
import { NextResponse } from 'next/server';
import path from 'node:path';
import { activityEventBus } from '../../../lib/realtime';
function isValidProjectRoot(root: string): boolean {
try {
const resolved = path.resolve(root);
if (!path.isAbsolute(resolved)) {
return false;
}
// Prevent path traversal by ensuring resolved path stays within the project root
const allowedBase = process.cwd();
const relative = path.relative(allowedBase, resolved);
// If "resolved" is outside "allowedBase", "relative" will start with ".."
if (relative.startsWith('..') || path.isAbsolute(relative)) {
return false;
}
return true;
} catch {
return false;
}
}
export async function GET(request: Request): Promise<Response> {
const url = new URL(request.url);
const projectRoot = url.searchParams.get('projectRoot') || undefined;
const projectRootParam = url.searchParams.get('projectRoot');
if (projectRootParam && !isValidProjectRoot(projectRootParam)) {
return NextResponse.json(
{ error: 'Invalid projectRoot path' },
{ status: 400 }
);
}
const projectRoot = projectRootParam || undefined;
const history = activityEventBus.getHistory(projectRoot);
return Response.json(history);

View file

@ -1,15 +1,40 @@
import { NextResponse } from 'next/server';
import path from 'node:path';
import { readIssuesFromDisk } from '../../../../../lib/read-issues';
import { activityEventBus } from '../../../../../lib/realtime';
import { getAgentMetrics } from '../../../../../lib/agent-sessions';
function isValidProjectRoot(root: string): boolean {
try {
const resolved = path.resolve(root);
if (!path.isAbsolute(resolved)) {
return false;
}
// Prevent path traversal by ensuring resolved path stays within the project root
const allowedBase = process.cwd();
const relative = path.relative(allowedBase, resolved);
// If "resolved" is outside "allowedBase", "relative" will start with ".."
if (relative.startsWith('..') || path.isAbsolute(relative)) {
return false;
}
return true;
} catch {
return false;
}
}
export async function GET(
request: Request,
{ params }: { params: Promise<{ agentId: string }> }
): Promise<Response> {
const { agentId } = await params;
const url = new URL(request.url);
const projectRoot = url.searchParams.get('projectRoot') ?? process.cwd();
const projectRootParam = url.searchParams.get('projectRoot');
const projectRoot = projectRootParam ?? process.cwd();
if (projectRootParam && !isValidProjectRoot(projectRootParam)) {
return NextResponse.json({ ok: false, error: 'Invalid projectRoot path' }, { status: 400 });
}
try {
const issues = await readIssuesFromDisk({ projectRoot, preferBd: true });

View file

@ -1,21 +1,49 @@
import { NextResponse } from 'next/server';
import path from 'node:path';
import { readIssuesFromDisk } from '../../../../lib/read-issues';
function isValidProjectRoot(root: string): boolean {
try {
const resolved = path.resolve(root);
if (!path.isAbsolute(resolved)) {
return false;
}
// Prevent path traversal by ensuring resolved path stays within the project root
const allowedBase = process.cwd();
const relative = path.relative(allowedBase, resolved);
// If "resolved" is outside "allowedBase", "relative" will start with ".."
if (relative.startsWith('..') || path.isAbsolute(relative)) {
return false;
}
return true;
} catch {
return false;
}
}
export async function GET(request: Request): Promise<Response> {
const url = new URL(request.url);
const projectRoot = url.searchParams.get('projectRoot') ?? process.cwd();
const projectRootParam = url.searchParams.get('projectRoot');
const projectRoot = projectRootParam ?? process.cwd();
if (projectRootParam && !isValidProjectRoot(projectRootParam)) {
return NextResponse.json(
{ ok: false, error: { classification: 'validation', message: 'Invalid projectRoot path' } },
{ status: 400 }
);
}
try {
const issues = await readIssuesFromDisk({ projectRoot, preferBd: true });
return NextResponse.json({ ok: true, issues });
} catch (error) {
console.error('[API/BeadsRead] Failed to read issues:', error);
return NextResponse.json(
{
ok: false,
error: {
classification: 'unknown',
message: error instanceof Error ? error.message : 'Failed to read issues.',
classification: 'internal_error',
message: 'An internal error occurred while reading issues.',
},
},
{ status: 500 },

View file

@ -17,6 +17,8 @@ async function readLastTouchedVersion(filePath: string): Promise<number | null>
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return null;
}
// Log non-ENOENT errors but don't swallow them silently
console.error('[Events] Failed to read last-touched version:', error);
return null;
}
}
@ -75,18 +77,27 @@ export async function GET(request: Request): Promise<Response> {
const lastTouchedPath = path.join(projectRoot, '.beads', 'last-touched');
let lastTouchedVersion: number | null = null;
let isPolling = false;
const pollLastTouched = async () => {
const nextVersion = await readLastTouchedVersion(lastTouchedPath);
if (nextVersion === null) {
if (isPolling) {
return;
}
if (lastTouchedVersion === null) {
lastTouchedVersion = nextVersion;
return;
}
if (nextVersion !== lastTouchedVersion) {
lastTouchedVersion = nextVersion;
write(toSseFrame(issuesEventBus.emit(projectRoot, lastTouchedPath, 'telemetry')));
isPolling = true;
try {
const nextVersion = await readLastTouchedVersion(lastTouchedPath);
if (nextVersion === null) {
return;
}
if (lastTouchedVersion === null) {
lastTouchedVersion = nextVersion;
return;
}
if (nextVersion !== lastTouchedVersion) {
lastTouchedVersion = nextVersion;
write(toSseFrame(issuesEventBus.emit(projectRoot, lastTouchedPath, 'telemetry')));
}
} finally {
isPolling = false;
}
};

View file

@ -1,14 +1,42 @@
import { NextResponse } from 'next/server';
import path from 'node:path';
import { readIssuesFromDisk } from '../../../lib/read-issues';
import { activityEventBus } from '../../../lib/realtime';
import { buildSessionTaskFeed, getCommunicationSummary, getAgentLivenessMap, calculateIncursions } from '../../../lib/agent-sessions';
import { listAgents } from '../../../lib/agent-registry';
function isValidProjectRoot(root: string): boolean {
try {
const resolved = path.resolve(root);
if (!path.isAbsolute(resolved)) {
return false;
}
// Prevent path traversal by ensuring resolved path stays within the project root
const allowedBase = process.cwd();
const relative = path.relative(allowedBase, resolved);
// If "resolved" is outside "allowedBase", "relative" will start with ".."
if (relative.startsWith('..') || path.isAbsolute(relative)) {
return false;
}
return true;
} catch {
return false;
}
}
export const dynamic = 'force-dynamic';
export async function GET(request: Request): Promise<Response> {
const url = new URL(request.url);
const projectRoot = url.searchParams.get('projectRoot') ?? process.cwd();
const projectRootParam = url.searchParams.get('projectRoot');
const projectRoot = projectRootParam ?? process.cwd();
if (projectRootParam && !isValidProjectRoot(projectRoot)) {
return NextResponse.json(
{ ok: false, error: { classification: 'validation', message: 'Invalid projectRoot path' } },
{ status: 400 }
);
}
try {
const issues = await readIssuesFromDisk({ projectRoot, preferBd: true });
@ -33,8 +61,8 @@ export async function GET(request: Request): Promise<Response> {
{
ok: false,
error: {
classification: 'unknown',
message: error instanceof Error ? error.message : 'Failed to load session feed.',
classification: 'internal_error',
message: 'An internal error occurred while loading the session feed.',
},
},
{ status: 500 },

View file

@ -1,5 +1,6 @@
'use client';
import { useMemo } from 'react';
import { motion } from 'framer-motion';
import type { KanbanFilterOptions, KanbanStats } from '../../lib/kanban';
@ -12,6 +13,7 @@ interface KanbanControlsProps {
filters: KanbanFilterOptions;
stats: KanbanStats;
epics: BeadIssue[];
issues: BeadIssue[];
onFiltersChange: (filters: KanbanFilterOptions) => void;
onNextActionable: () => void;
nextActionableFeedback?: string | null;
@ -21,6 +23,7 @@ export function KanbanControls({
filters,
stats,
epics,
issues,
onFiltersChange,
onNextActionable,
nextActionableFeedback = null,
@ -29,12 +32,24 @@ export function KanbanControls({
'ui-field rounded-xl px-3 py-2.5 text-sm outline-none transition';
// Build bead counts map for EpicChipStrip
const beadCounts = new Map<string, number>();
for (const epic of epics) {
// Count non-epic issues that belong to this epic
const count = epic.dependencies?.filter(d => d.type === 'parent' && d.target === epic.id).length ?? 0;
beadCounts.set(epic.id, count);
}
// Count non-epic issues that have this epic as their parent
const beadCounts = useMemo(() => {
const counts = new Map<string, number>();
for (const epic of epics) {
let count = 0;
for (const issue of issues) {
if (issue.issue_type === 'epic') continue;
const parentDep = issue.dependencies.find(d => d.type === 'parent');
const inferredParent = issue.id.includes('.') ? issue.id.split('.')[0] : null;
const parentEpicId = parentDep?.target ?? inferredParent;
if (parentEpicId === epic.id) {
count++;
}
}
counts.set(epic.id, count);
}
return counts;
}, [epics, issues]);
return (
<section className="grid gap-3">

View file

@ -230,6 +230,7 @@ export function KanbanPage({
filters={filters}
stats={stats}
epics={localIssues.filter((issue) => issue.issue_type === 'epic')}
issues={localIssues}
onFiltersChange={setFilters}
onNextActionable={handleNextActionable}
nextActionableFeedback={nextActionableFeedback}

View file

@ -65,17 +65,11 @@ export function useBeadsSubscription(
}, [projectRoot, onUpdate]);
useEffect(() => {
console.log('[SSE] Connecting to event source for:', projectRoot);
const source = new EventSource(`/api/events?projectRoot=${encodeURIComponent(projectRoot)}`);
source.onopen = () => {
console.log('[SSE] Connection opened');
};
source.onerror = (err) => {
console.error('[SSE] Connection error:', err);
};
const onIssues = (event: MessageEvent) => {
console.log('🚨 SSE ISSUES RECEIVED:', event.data);
onUpdate?.('issues');
@ -99,12 +93,12 @@ export function useBeadsSubscription(
source.addEventListener('activity', onActivity as EventListener);
return () => {
console.log('[SSE] Closing connection');
source.removeEventListener('issues', onIssues as EventListener);
source.removeEventListener('telemetry', onTelemetry as EventListener);
source.removeEventListener('activity', onActivity as EventListener);
source.close();
};
// onUpdate is intentionally excluded from deps to avoid re-subscribing on parent re-renders
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [projectRoot, refresh]);

View file

@ -98,6 +98,12 @@ function trimOrEmpty(value: unknown): string {
return typeof value === 'string' ? value.trim() : '';
}
function isValidMessageId(value: string): boolean {
// Message IDs must be alphanumeric with underscores, hyphens, and colons
// This prevents path traversal attacks
return /^[a-zA-Z0-9_\-:]+$/.test(value);
}
function success<T>(command: MailCommandName, data: T): MailCommandResponse<T> {
return {
ok: true,
@ -352,6 +358,10 @@ export async function readAgentMessage(
return invalid(command, 'MESSAGE_NOT_FOUND', 'Message id is required.');
}
if (!isValidMessageId(messageId)) {
return invalid(command, 'INVALID_MESSAGE_ID', 'Message id contains invalid characters.');
}
try {
const existing = await readMessageIndex(messageId);
if (!existing) {
@ -396,6 +406,10 @@ export async function ackAgentMessage(
return invalid(command, 'MESSAGE_NOT_FOUND', 'Message id is required.');
}
if (!isValidMessageId(messageId)) {
return invalid(command, 'INVALID_MESSAGE_ID', 'Message id contains invalid characters.');
}
try {
const existing = await readMessageIndex(messageId);
if (!existing) {

View file

@ -215,6 +215,49 @@ async function readActiveReservations(): Promise<AgentReservation[]> {
}
}
// Simple mutex-based locking using a shared lock file to prevent race conditions
const LOCK_FILE_PATH = path.join(reservationsRoot(), '.lock');
async function lockActiveReservations(): Promise<void> {
// Ensure the directory exists
await fs.mkdir(path.dirname(LOCK_FILE_PATH), { recursive: true });
// Use a simple file-based mutex - create file exclusively, fail if exists
let attempts = 0;
const maxAttempts = 100;
while (attempts < maxAttempts) {
try {
await fs.writeFile(LOCK_FILE_PATH, String(process.pid), { flag: 'wx' });
return;
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'EEXIST') {
// Lock file exists, wait and retry
await new Promise(resolve => setTimeout(resolve, 50));
attempts++;
continue;
}
throw error;
}
}
throw new Error('Failed to acquire lock after maximum attempts');
}
async function unlockActiveReservations(): Promise<void> {
try {
const content = await fs.readFile(LOCK_FILE_PATH, 'utf8');
// Only release if we own the lock
if (content.trim() === String(process.pid)) {
await fs.unlink(LOCK_FILE_PATH);
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
// Lock file doesn't exist, ignore
}
}
async function atomicWriteJson(filePath: string, payload: string): Promise<void> {
await fs.mkdir(path.dirname(filePath), { recursive: true });
@ -317,6 +360,9 @@ export async function reserveAgentScope(
}
try {
// Acquire exclusive lock to prevent race conditions
await lockActiveReservations();
const now = deps.now ? deps.now() : new Date().toISOString();
const reservations = await readActiveReservations();
const normalizedScope = normalizePath(scope);
@ -384,6 +430,8 @@ export async function reserveAgentScope(
return success(command, created);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to reserve scope.');
} finally {
await unlockActiveReservations();
}
}
@ -405,6 +453,9 @@ export async function releaseAgentReservation(
}
try {
// Acquire exclusive lock to prevent race conditions
await lockActiveReservations();
const now = deps.now ? deps.now() : new Date().toISOString();
const reservations = await readActiveReservations();
const normalizedScope = normalizePath(scope);
@ -436,6 +487,8 @@ export async function releaseAgentReservation(
return success(command, released);
} catch (error) {
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to release reservation.');
} finally {
await unlockActiveReservations();
}
}

View file

@ -75,7 +75,14 @@ function asNonEmptyString(value: unknown, field: string): string {
if (typeof value !== 'string' || !value.trim()) {
throw new MutationValidationError(`"${field}" is required.`);
}
return value.trim();
const trimmed = value.trim();
// Remove control characters that could cause issues in command execution
// Preserve backslashes for Windows paths and punctuation for user text
const sanitized = trimmed.replace(/[\x00-\x1f\x7f]/g, '');
if (!sanitized) {
throw new MutationValidationError(`"${field}" contains only invalid characters.`);
}
return sanitized;
}
function asOptionalString(value: unknown): string | undefined {

View file

@ -1,3 +1,4 @@
import path from 'node:path';
import { canonicalizeWindowsPath, windowsPathKey } from './pathing';
import type { ActivityEvent } from './activity';
@ -38,7 +39,7 @@ export class IssuesEventBus {
private nextSubscriberId = 1;
emit(projectRoot: string, changedPath?: string, kind: IssuesChangeKind = 'changed'): IssuesChangedEvent {
console.log(`[IssuesBus] Emitting event: ${kind} for ${projectRoot} (${changedPath})`);
console.log(`[IssuesBus] Emitting event: ${kind} for project (${changedPath ? path.basename(changedPath) : 'unknown'})`);
const canonicalProjectRoot = canonicalizeWindowsPath(projectRoot);
const projectKey = windowsPathKey(canonicalProjectRoot);
const event: IssuesChangedEvent = {
@ -94,6 +95,7 @@ export class ActivityEventBus {
private readonly history: ActivityEvent[] = [];
private readonly MAX_HISTORY = 100;
private initialized = false;
private savePromise: Promise<void> | null = null;
private nextSubscriberId = 1;
@ -115,14 +117,30 @@ export class ActivityEventBus {
};
this.nextEventId += 1;
// Capture history snapshot BEFORE modification for persistence
const historySnapshot = [...this.history];
// Buffer history
this.history.unshift(activity);
if (this.history.length > this.MAX_HISTORY) {
this.history.pop();
}
// Persist async
void saveActivityHistory(this.history);
// Persist async with deduplication - wait for any pending save to complete
const persist = async () => {
try {
await saveActivityHistory(historySnapshot);
} catch (error) {
console.error('[ActivityEventBus] Failed to save history:', error);
}
};
if (this.savePromise === null) {
this.savePromise = persist();
} else {
// Chain to existing promise to prevent concurrent writes
this.savePromise = this.savePromise.then(persist);
}
for (const subscriber of this.subscribers.values()) {
if (!subscriber.projectKey || subscriber.projectKey === projectKey) {

View file

@ -78,7 +78,7 @@ export function diffSnapshots(
// 5. Collection Changes (Dependencies)
diffDependencies(prev.dependencies, curr.dependencies).forEach(kindAndTarget => {
events.push(createEvent(kindAndTarget.kind, curr, now, { to: kindAndTarget.target }));
events.push(createEvent(kindAndTarget.kind, curr, now, { to: kindAndTarget.target, field: kindAndTarget.type }));
});
});
@ -119,25 +119,28 @@ function areArraysEqual(a: string[], b: string[]): boolean {
/**
* Detects added and removed dependencies.
* Uses composite key `${type}:${target}` to detect type changes as well.
*/
function diffDependencies(
prev: BeadDependency[],
curr: BeadDependency[]
): { kind: 'dependency_added' | 'dependency_removed', target: string }[] {
const changes: { kind: 'dependency_added' | 'dependency_removed', target: string }[] = [];
): { kind: 'dependency_added' | 'dependency_removed', target: string, type: string }[] {
const changes: { kind: 'dependency_added' | 'dependency_removed', target: string, type: string }[] = [];
const prevTargets = new Set(prev.map(d => d.target));
const currTargets = new Set(curr.map(d => d.target));
const prevKeys = new Set(prev.map(d => `${d.type}:${d.target}`));
const currKeys = new Set(curr.map(d => `${d.type}:${d.target}`));
curr.forEach(d => {
if (!prevTargets.has(d.target)) {
changes.push({ kind: 'dependency_added', target: d.target });
const key = `${d.type}:${d.target}`;
if (!prevKeys.has(key)) {
changes.push({ kind: 'dependency_added', target: d.target, type: d.type });
}
});
prev.forEach(d => {
if (!currTargets.has(d.target)) {
changes.push({ kind: 'dependency_removed', target: d.target });
const key = `${d.type}:${d.target}`;
if (!currKeys.has(key)) {
changes.push({ kind: 'dependency_removed', target: d.target, type: d.type });
}
});

View file

@ -19,6 +19,11 @@ function getGlobalAgentMessagesPath(): string {
interface WatchRegistration {
projectRoot: string;
watcher: FSWatcher;
handlers?: {
onAdd: (changedPath: string) => void;
onChange: (changedPath: string) => void;
onUnlink: (changedPath: string) => void;
};
}
export interface WatchManagerOptions {
@ -125,13 +130,19 @@ export class IssuesWatchManager {
this.queueCoalescedEvent(projectRoot, changedPath, kind);
};
watcher.on('add', (changedPath) => onFileEvent('add', changedPath));
watcher.on('change', (changedPath) => onFileEvent('change', changedPath));
watcher.on('unlink', (changedPath) => onFileEvent('unlink', changedPath));
// Store references to event handlers for proper cleanup
const onAdd = (changedPath: string) => onFileEvent('add', changedPath);
const onChange = (changedPath: string) => onFileEvent('change', changedPath);
const onUnlink = (changedPath: string) => onFileEvent('unlink', changedPath);
watcher.on('add', onAdd);
watcher.on('change', onChange);
watcher.on('unlink', onUnlink);
this.registrations.set(projectKey, {
projectRoot,
watcher,
handlers: { onAdd, onChange, onUnlink },
});
}
@ -143,6 +154,14 @@ export class IssuesWatchManager {
}
this.coalescer.cancel(projectRoot);
// Explicitly remove event listeners before closing to prevent memory leaks
if (registration.handlers) {
registration.watcher.removeListener('add', registration.handlers.onAdd);
registration.watcher.removeListener('change', registration.handlers.onChange);
registration.watcher.removeListener('unlink', registration.handlers.onUnlink);
}
this.registrations.delete(projectKey);
await registration.watcher.close();
}
@ -151,6 +170,12 @@ export class IssuesWatchManager {
const closeOps: Promise<void>[] = [];
for (const registration of this.registrations.values()) {
// Explicitly remove event listeners before closing to prevent memory leaks
if (registration.handlers) {
registration.watcher.removeListener('add', registration.handlers.onAdd);
registration.watcher.removeListener('change', registration.handlers.onChange);
registration.watcher.removeListener('unlink', registration.handlers.onUnlink);
}
closeOps.push(registration.watcher.close());
}