commit
e28db4b936
10 changed files with 142 additions and 22 deletions
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/env node
|
||||
|
||||
import fs from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
|
||||
function parseArgs(argv) {
|
||||
const output = {};
|
||||
|
|
@ -43,7 +44,8 @@ async function withArtifactExistence(artifacts) {
|
|||
};
|
||||
if (typeof artifact.path === 'string' && artifact.path.trim()) {
|
||||
try {
|
||||
await fs.access(artifact.path);
|
||||
const resolved = path.resolve(artifact.path);
|
||||
await fs.access(resolved);
|
||||
item.exists = true;
|
||||
} catch {
|
||||
item.exists = false;
|
||||
|
|
|
|||
|
|
@ -5,7 +5,17 @@ import { activityEventBus } from '../../../lib/realtime';
|
|||
function isValidProjectRoot(root: string): boolean {
|
||||
try {
|
||||
const resolved = path.resolve(root);
|
||||
return path.isAbsolute(resolved);
|
||||
if (!path.isAbsolute(resolved)) {
|
||||
return false;
|
||||
}
|
||||
// Prevent path traversal by ensuring resolved path stays within the project root
|
||||
const allowedBase = process.cwd();
|
||||
const relative = path.relative(allowedBase, resolved);
|
||||
// If "resolved" is outside "allowedBase", "relative" will start with ".."
|
||||
if (relative.startsWith('..') || path.isAbsolute(relative)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,17 @@ import { getAgentMetrics } from '../../../../../lib/agent-sessions';
|
|||
function isValidProjectRoot(root: string): boolean {
|
||||
try {
|
||||
const resolved = path.resolve(root);
|
||||
return path.isAbsolute(resolved);
|
||||
if (!path.isAbsolute(resolved)) {
|
||||
return false;
|
||||
}
|
||||
// Prevent path traversal by ensuring resolved path stays within the project root
|
||||
const allowedBase = process.cwd();
|
||||
const relative = path.relative(allowedBase, resolved);
|
||||
// If "resolved" is outside "allowedBase", "relative" will start with ".."
|
||||
if (relative.startsWith('..') || path.isAbsolute(relative)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,17 @@ import { readIssuesFromDisk } from '../../../../lib/read-issues';
|
|||
function isValidProjectRoot(root: string): boolean {
|
||||
try {
|
||||
const resolved = path.resolve(root);
|
||||
return path.isAbsolute(resolved);
|
||||
if (!path.isAbsolute(resolved)) {
|
||||
return false;
|
||||
}
|
||||
// Prevent path traversal by ensuring resolved path stays within the project root
|
||||
const allowedBase = process.cwd();
|
||||
const relative = path.relative(allowedBase, resolved);
|
||||
// If "resolved" is outside "allowedBase", "relative" will start with ".."
|
||||
if (relative.startsWith('..') || path.isAbsolute(relative)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,19 @@ import { activityEventBus } from '../../../lib/realtime';
|
|||
import { buildSessionTaskFeed, getCommunicationSummary } from '../../../lib/agent-sessions';
|
||||
|
||||
function isValidProjectRoot(root: string): boolean {
|
||||
// Basic validation: path should not contain traversal patterns
|
||||
// and should resolve to an absolute path
|
||||
try {
|
||||
const resolved = path.resolve(root);
|
||||
return path.isAbsolute(resolved);
|
||||
if (!path.isAbsolute(resolved)) {
|
||||
return false;
|
||||
}
|
||||
// Prevent path traversal by ensuring resolved path stays within the project root
|
||||
const allowedBase = process.cwd();
|
||||
const relative = path.relative(allowedBase, resolved);
|
||||
// If "resolved" is outside "allowedBase", "relative" will start with ".."
|
||||
if (relative.startsWith('..') || path.isAbsolute(relative)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,19 +65,13 @@ export function useBeadsSubscription(
|
|||
}, [projectRoot, onUpdate]);
|
||||
|
||||
useEffect(() => {
|
||||
console.log('[SSE] Connecting to event source for:', projectRoot);
|
||||
const source = new EventSource(`/api/events?projectRoot=${encodeURIComponent(projectRoot)}`);
|
||||
|
||||
source.onopen = () => {
|
||||
console.log('[SSE] Connection opened');
|
||||
};
|
||||
|
||||
source.onerror = (err) => {
|
||||
console.error('[SSE] Connection error:', err);
|
||||
};
|
||||
|
||||
const onIssues = (event: MessageEvent) => {
|
||||
console.log('🚨 SSE RECEIVED:', event.data);
|
||||
const onIssues = () => {
|
||||
onUpdate?.();
|
||||
void refresh({ silent: true });
|
||||
};
|
||||
|
|
@ -85,11 +79,10 @@ export function useBeadsSubscription(
|
|||
source.addEventListener('issues', onIssues as EventListener);
|
||||
|
||||
return () => {
|
||||
console.log('[SSE] Closing connection');
|
||||
source.removeEventListener('issues', onIssues as EventListener);
|
||||
source.close();
|
||||
};
|
||||
}, [projectRoot, refresh]);
|
||||
}, [projectRoot, refresh, onUpdate]);
|
||||
|
||||
return { issues, refresh, updateLocal };
|
||||
}
|
||||
|
|
|
|||
|
|
@ -169,6 +169,49 @@ async function readActiveReservations(): Promise<AgentReservation[]> {
|
|||
}
|
||||
}
|
||||
|
||||
// Simple mutex-based locking using a shared lock file to prevent race conditions
|
||||
const LOCK_FILE_PATH = path.join(reservationsRoot(), '.lock');
|
||||
|
||||
async function lockActiveReservations(): Promise<void> {
|
||||
// Ensure the directory exists
|
||||
await fs.mkdir(path.dirname(LOCK_FILE_PATH), { recursive: true });
|
||||
|
||||
// Use a simple file-based mutex - create file exclusively, fail if exists
|
||||
let attempts = 0;
|
||||
const maxAttempts = 100;
|
||||
|
||||
while (attempts < maxAttempts) {
|
||||
try {
|
||||
await fs.writeFile(LOCK_FILE_PATH, String(process.pid), { flag: 'wx' });
|
||||
return;
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code === 'EEXIST') {
|
||||
// Lock file exists, wait and retry
|
||||
await new Promise(resolve => setTimeout(resolve, 50));
|
||||
attempts++;
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
throw new Error('Failed to acquire lock after maximum attempts');
|
||||
}
|
||||
|
||||
async function unlockActiveReservations(): Promise<void> {
|
||||
try {
|
||||
const content = await fs.readFile(LOCK_FILE_PATH, 'utf8');
|
||||
// Only release if we own the lock
|
||||
if (content.trim() === String(process.pid)) {
|
||||
await fs.unlink(LOCK_FILE_PATH);
|
||||
}
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
|
||||
throw error;
|
||||
}
|
||||
// Lock file doesn't exist, ignore
|
||||
}
|
||||
}
|
||||
|
||||
async function atomicWriteJson(filePath: string, payload: string): Promise<void> {
|
||||
await fs.mkdir(path.dirname(filePath), { recursive: true });
|
||||
|
||||
|
|
@ -271,6 +314,9 @@ export async function reserveAgentScope(
|
|||
}
|
||||
|
||||
try {
|
||||
// Acquire exclusive lock to prevent race conditions
|
||||
await lockActiveReservations();
|
||||
|
||||
const now = deps.now ? deps.now() : new Date().toISOString();
|
||||
const reservations = await readActiveReservations();
|
||||
const existing = reservations.find((reservation) => reservation.scope === scope);
|
||||
|
|
@ -320,6 +366,8 @@ export async function reserveAgentScope(
|
|||
return success(command, created);
|
||||
} catch (error) {
|
||||
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to reserve scope.');
|
||||
} finally {
|
||||
await unlockActiveReservations();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -341,6 +389,9 @@ export async function releaseAgentReservation(
|
|||
}
|
||||
|
||||
try {
|
||||
// Acquire exclusive lock to prevent race conditions
|
||||
await lockActiveReservations();
|
||||
|
||||
const now = deps.now ? deps.now() : new Date().toISOString();
|
||||
const reservations = await readActiveReservations();
|
||||
const existing = reservations.find((reservation) => reservation.scope === scope);
|
||||
|
|
@ -371,6 +422,8 @@ export async function releaseAgentReservation(
|
|||
return success(command, released);
|
||||
} catch (error) {
|
||||
return invalid(command, 'INTERNAL_ERROR', error instanceof Error ? error.message : 'Failed to release reservation.');
|
||||
} finally {
|
||||
await unlockActiveReservations();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -75,7 +75,14 @@ function asNonEmptyString(value: unknown, field: string): string {
|
|||
if (typeof value !== 'string' || !value.trim()) {
|
||||
throw new MutationValidationError(`"${field}" is required.`);
|
||||
}
|
||||
return value.trim();
|
||||
const trimmed = value.trim();
|
||||
// Remove control characters that could cause issues in command execution
|
||||
// Preserve backslashes for Windows paths and punctuation for user text
|
||||
const sanitized = trimmed.replace(/[\x00-\x1f\x7f]/g, '');
|
||||
if (!sanitized) {
|
||||
throw new MutationValidationError(`"${field}" contains only invalid characters.`);
|
||||
}
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
function asOptionalString(value: unknown): string | undefined {
|
||||
|
|
|
|||
|
|
@ -117,6 +117,9 @@ export class ActivityEventBus {
|
|||
};
|
||||
this.nextEventId += 1;
|
||||
|
||||
// Capture history snapshot BEFORE modification for persistence
|
||||
const historySnapshot = [...this.history];
|
||||
|
||||
// Buffer history
|
||||
this.history.unshift(activity);
|
||||
if (this.history.length > this.MAX_HISTORY) {
|
||||
|
|
@ -124,10 +127,9 @@ export class ActivityEventBus {
|
|||
}
|
||||
|
||||
// Persist async with deduplication - wait for any pending save to complete
|
||||
const currentHistory = [...this.history];
|
||||
const persist = async () => {
|
||||
try {
|
||||
await saveActivityHistory(currentHistory);
|
||||
await saveActivityHistory(historySnapshot);
|
||||
} catch (error) {
|
||||
console.error('[ActivityEventBus] Failed to save history:', error);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,11 @@ function getGlobalAgentMessagesPath(): string {
|
|||
interface WatchRegistration {
|
||||
projectRoot: string;
|
||||
watcher: FSWatcher;
|
||||
handlers?: {
|
||||
onAdd: (changedPath: string) => void;
|
||||
onChange: (changedPath: string) => void;
|
||||
onUnlink: (changedPath: string) => void;
|
||||
};
|
||||
}
|
||||
|
||||
export interface WatchManagerOptions {
|
||||
|
|
@ -119,13 +124,19 @@ export class IssuesWatchManager {
|
|||
this.queueCoalescedEvent(projectRoot, changedPath, kind);
|
||||
};
|
||||
|
||||
watcher.on('add', (changedPath) => onFileEvent('add', changedPath));
|
||||
watcher.on('change', (changedPath) => onFileEvent('change', changedPath));
|
||||
watcher.on('unlink', (changedPath) => onFileEvent('unlink', changedPath));
|
||||
// Store references to event handlers for proper cleanup
|
||||
const onAdd = (changedPath: string) => onFileEvent('add', changedPath);
|
||||
const onChange = (changedPath: string) => onFileEvent('change', changedPath);
|
||||
const onUnlink = (changedPath: string) => onFileEvent('unlink', changedPath);
|
||||
|
||||
watcher.on('add', onAdd);
|
||||
watcher.on('change', onChange);
|
||||
watcher.on('unlink', onUnlink);
|
||||
|
||||
this.registrations.set(projectKey, {
|
||||
projectRoot,
|
||||
watcher,
|
||||
handlers: { onAdd, onChange, onUnlink },
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -137,6 +148,14 @@ export class IssuesWatchManager {
|
|||
}
|
||||
|
||||
this.coalescer.cancel(projectRoot);
|
||||
|
||||
// Explicitly remove event listeners before closing to prevent memory leaks
|
||||
if (registration.handlers) {
|
||||
registration.watcher.removeListener('add', registration.handlers.onAdd);
|
||||
registration.watcher.removeListener('change', registration.handlers.onChange);
|
||||
registration.watcher.removeListener('unlink', registration.handlers.onUnlink);
|
||||
}
|
||||
|
||||
this.registrations.delete(projectKey);
|
||||
await registration.watcher.close();
|
||||
}
|
||||
|
|
@ -145,6 +164,12 @@ export class IssuesWatchManager {
|
|||
const closeOps: Promise<void>[] = [];
|
||||
|
||||
for (const registration of this.registrations.values()) {
|
||||
// Explicitly remove event listeners before closing to prevent memory leaks
|
||||
if (registration.handlers) {
|
||||
registration.watcher.removeListener('add', registration.handlers.onAdd);
|
||||
registration.watcher.removeListener('change', registration.handlers.onChange);
|
||||
registration.watcher.removeListener('unlink', registration.handlers.onUnlink);
|
||||
}
|
||||
closeOps.push(registration.watcher.close());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue