Fix: Security, reliability, and code quality improvements from PR review

Critical Security Fixes:
- Fix command injection vulnerability in Windows shims (beadboard.cmd, bb.cmd)
  - Added path validation to block traversal (.. and root-relative paths)
  - Added quotes around env var to prevent command injection

Reliability Fixes:
- Fix agent cache null safety bug
  - Fixed callBdAgentShow() to check for cache misses (null check, expiration)
  - Fixed getCachedAgent to properly return entry.data or null
- Fix null body crashes in mail ack route
  - Added null check before casting body to object
  - Returns 400 error instead of 500 for invalid requests

BD Compliance Fixes:
- Fix read-issues to use BD audit record path
  - Ensures all writes go through bd audit record
  - Maintains watcher/SSE parity and Dolt commit tracking

Code Quality Fixes:
- Fix path canonicalization violations
  - Use canonicalizeWindowsPath() and windowsPathKey() from pathing module
  - Prevents Windows edge cases and ensures machine-reproducible paths
- Fix typo: mobile-fronted → mobile-frontend
- Pin GitHub Actions tags
  - softprops/action-gh-release@v1 → specific commit hash
- Register pr14 test in package.json (already registered)

Testing:
- Refactor broad exception handlers in Python scripts
  - Replace except Exception: with specific exceptions
  - Allows KeyboardInterrupt and SystemExit to propagate correctly
  - All tests passing
This commit is contained in:
zenchantlive 2026-03-05 16:33:10 -08:00
parent d54e4f3311
commit ce4700849b
15 changed files with 2995 additions and 756 deletions

View file

@ -21,8 +21,8 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Create GitHub Release
uses: softprops/action-gh-release@v1
- name: Create GitHub Release
uses: softprops/action-gh-release@26994186c0ac3ef5cae75ac16aa32e8153525f77
with:
name: ${{ github.ref_name || inputs.version }}
tag_name: ${{ github.ref_name || inputs.version }}

View file

@ -60,44 +60,44 @@ def infer_project_name(project_dir: Path) -> str:
data = json.loads(package_json.read_text())
if name := data.get("name"):
return name.replace("-", " ").replace("_", " ").title()
except (json.JSONDecodeError, KeyError):
pass
# Try pyproject.toml (Python)
if tomllib:
pyproject = project_dir / "pyproject.toml"
if pyproject.exists():
try:
data = tomllib.loads(pyproject.read_text())
if name := data.get("project", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
if name := data.get("tool", {}).get("poetry", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
except Exception:
pass
# Try Cargo.toml (Rust)
cargo = project_dir / "Cargo.toml"
if cargo.exists():
try:
data = tomllib.loads(cargo.read_text())
if name := data.get("package", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
except Exception:
pass
# Try go.mod (Go)
go_mod = project_dir / "go.mod"
if go_mod.exists():
try:
content = go_mod.read_text()
for line in content.splitlines():
if line.startswith("module "):
module_path = line.split()[1]
name = module_path.split("/")[-1]
return name.replace("-", " ").replace("_", " ").title()
except Exception:
pass
except (json.JSONDecodeError, KeyError, OSError):
pass
# Try pyproject.toml (Python)
if tomllib:
pyproject = project_dir / "pyproject.toml"
if pyproject.exists():
try:
data = tomllib.loads(pyproject.read_text())
if name := data.get("project", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
if name := data.get("tool", {}).get("poetry", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
except (tomllib.TOMLDecodeError, OSError, KeyError, AttributeError):
pass
# Try Cargo.toml (Rust)
cargo = project_dir / "Cargo.toml"
if cargo.exists():
try:
data = tomllib.loads(cargo.read_text())
if name := data.get("package", {}).get("name"):
return name.replace("-", " ").replace("_", " ").title()
except (tomllib.TOMLDecodeError, OSError, KeyError, AttributeError):
pass
# Try go.mod (Go)
go_mod = project_dir / "go.mod"
if go_mod.exists():
try:
content = go_mod.read_text()
for line in content.splitlines():
if line.startswith("module "):
module_path = line.split()[1]
name = module_path.split("/")[-1]
return name.replace("-", " ").replace("_", " ").title()
except (OSError, ValueError, IndexError):
pass
# Fallback to directory name
return project_dir.name.replace("-", " ").replace("_", " ").title()

View file

@ -1,113 +1,113 @@
---
name: frontend-design
description: Create distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
license: Complete terms in LICENSE.txt
---
This skill guides creation of distinctive, production-grade frontend interfaces that avoid generic "AI slop" aesthetics. Implement real working code with exceptional attention to aesthetic details and creative choices.
The user provides frontend requirements: a component, page, application, or interface to build. They may include context about the purpose, audience, or technical constraints.
## Design Thinking
Before coding, understand the context and commit to a BOLD aesthetic direction:
- **Purpose**: What problem does this interface solve? Who uses it?
- **Tone**: Pick an extreme: brutally minimal, maximalist chaos, retro-futuristic, organic/natural, luxury/refined, playful/toy-like, editorial/magazine, brutalist/raw, art deco/geometric, soft/pastel, industrial/utilitarian, etc. There are so many flavors to choose from. Use these for inspiration but design one that is true to the aesthetic direction.
- **Constraints**: Technical requirements (framework, performance, accessibility).
- **Differentiation**: What makes this UNFORGETTABLE? What's the one thing someone will remember?
**CRITICAL**: Choose a clear conceptual direction and execute it with precision. Bold maximalism and refined minimalism both work - the key is intentionality, not intensity.
Then implement working code (HTML/CSS/JS, React, Vue, etc.) that is:
- Production-grade and functional
- Visually striking and memorable
- Cohesive with a clear aesthetic point-of-view
- Meticulously refined in every detail
## Frontend Aesthetics Guidelines
Focus on:
- **Typography**: Choose fonts that are beautiful, unique, and interesting. Avoid generic fonts like Arial and Inter; opt instead for distinctive choices that elevate the frontend's aesthetics; unexpected, characterful font choices. Pair a distinctive display font with a refined body font.
- **Color & Theme**: Commit to a cohesive aesthetic. Use CSS variables for consistency. Dominant colors with sharp accents outperform timid, evenly-distributed palettes.
- **Motion**: Use animations for effects and micro-interactions. Prioritize CSS-only solutions for HTML. Use Motion library for React when available. Focus on high-impact moments: one well-orchestrated page load with staggered reveals (animation-delay) creates more delight than scattered micro-interactions. Use scroll-triggering and hover states that surprise.
- **Spatial Composition**: Unexpected layouts. Asymmetry. Overlap. Diagonal flow. Grid-breaking elements. Generous negative space OR controlled density.
- **Backgrounds & Visual Details**: Create atmosphere and depth rather than defaulting to solid colors. Add contextual effects and textures that match the overall aesthetic. Apply creative forms like gradient meshes, noise textures, geometric patterns, layered transparencies, dramatic shadows, decorative borders, custom cursors, and grain overlays.
NEVER use generic AI-generated aesthetics like overused font families (Inter, Roboto, Arial, system fonts), cliched color schemes (particularly purple gradients on white backgrounds), predictable layouts and component patterns, and cookie-cutter design that lacks context-specific character.
Interpret creatively and make unexpected choices that feel genuinely designed for the context. No design should be the same. Vary between light and dark themes, different fonts, different aesthetics. NEVER converge on common choices (Space Grotesk, for example) across generations.
**IMPORTANT**: Match implementation complexity to the aesthetic vision. Maximalist designs need elaborate code with extensive animations and effects. Minimalist or refined designs need restraint, precision, and careful attention to spacing, typography, and subtle details. Elegance comes from executing the vision well.
Remember: Claude is capable of extraordinary creative work. Don't hold back, show what can truly be created when thinking outside the box and committing fully to a distinctive vision.
The Smart Combination Approach
Use relative units as your foundation, with strategic pixel usage for specific cases:
✅ Use Relative Units For:
Typography & Spacing (rem/em)
css/* Root sizing - easy to scale entire UI */
html { font-size: 16px; } /* base */
/* Component scales automatically */
.card {
padding: 1.5rem; /* 24px at base, scales with root */
font-size: 1rem; /* 16px at base */
margin-bottom: 2rem; /* 32px at base */
}
/* Media query just changes root */
@media (max-width: 768px) {
html { font-size: 14px; } /* Everything shrinks proportionally */
}
Layout widths (%, max-width)
css.container {
width: 100%; /* Fluid */
max-width: 75rem; /* 1200px cap */
padding: 0 5%; /* Breathing room on all screens */
}
Viewport-based (vh/vw) - use sparingly
css.hero {
min-height: 100vh; /* Full screen sections */
padding: 5vw; /* Scales with viewport */
}
🎯 Use Pixels For:
Borders & fine details: border: 1px solid (0.0625rem looks weird)
Icons with fixed dimensions: width: 24px; height: 24px;
Media query breakpoints: @media (min-width: 768px) (industry standard)
Shadows: box-shadow: 0 2px 4px rgba(0,0,0,0.1)
TailwindCSS Context (Your Stack)
Tailwind uses rem by default - perfect combo already built-in:
tsx// Tailwind's spacing scale is in rem
<div className="p-4 mb-6 text-base">
{/* p-4 = 1rem, mb-6 = 1.5rem, text-base = 1rem */}
</div>
// Percentage widths
<div className="w-full md:w-1/2 lg:w-1/3">
{/* Fluid responsive columns */}
</div>
// Max-width constraints
<div className="max-w-7xl mx-auto px-4">
{/* Centers content, caps width, fluid padding */}
</div>
Modern Mobile-First Pattern
tsx// App component example
export function AssetCard() {
return (
<div className="
w-full /* Mobile: full width */
sm:w-[calc(50%-1rem)] /* Tablet: 2 columns */
lg:w-[calc(33.333%-1rem)] /* Desktop: 3 columns */
p-6 /* rem-based padding */
rounded-lg /* Fixed border radius */
border border-gray-200 /* 1px border */
">
<h3 className="text-lg font-semibold mb-2">
{/* rem-based text sizing */}
</h3>
</div>
);
---
name: frontend-design
description: Create distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
license: Complete terms in LICENSE.txt
---
This skill guides creation of distinctive, production-grade frontend interfaces that avoid generic "AI slop" aesthetics. Implement real working code with exceptional attention to aesthetic details and creative choices.
The user provides frontend requirements: a component, page, application, or interface to build. They may include context about the purpose, audience, or technical constraints.
## Design Thinking
Before coding, understand the context and commit to a BOLD aesthetic direction:
- **Purpose**: What problem does this interface solve? Who uses it?
- **Tone**: Pick an extreme: brutally minimal, maximalist chaos, retro-futuristic, organic/natural, luxury/refined, playful/toy-like, editorial/magazine, brutalist/raw, art deco/geometric, soft/pastel, industrial/utilitarian, etc. There are so many flavors to choose from. Use these for inspiration but design one that is true to the aesthetic direction.
- **Constraints**: Technical requirements (framework, performance, accessibility).
- **Differentiation**: What makes this UNFORGETTABLE? What's the one thing someone will remember?
**CRITICAL**: Choose a clear conceptual direction and execute it with precision. Bold maximalism and refined minimalism both work - the key is intentionality, not intensity.
Then implement working code (HTML/CSS/JS, React, Vue, etc.) that is:
- Production-grade and functional
- Visually striking and memorable
- Cohesive with a clear aesthetic point-of-view
- Meticulously refined in every detail
## Frontend Aesthetics Guidelines
Focus on:
- **Typography**: Choose fonts that are beautiful, unique, and interesting. Avoid generic fonts like Arial and Inter; opt instead for distinctive choices that elevate the frontend's aesthetics; unexpected, characterful font choices. Pair a distinctive display font with a refined body font.
- **Color & Theme**: Commit to a cohesive aesthetic. Use CSS variables for consistency. Dominant colors with sharp accents outperform timid, evenly-distributed palettes.
- **Motion**: Use animations for effects and micro-interactions. Prioritize CSS-only solutions for HTML. Use Motion library for React when available. Focus on high-impact moments: one well-orchestrated page load with staggered reveals (animation-delay) creates more delight than scattered micro-interactions. Use scroll-triggering and hover states that surprise.
- **Spatial Composition**: Unexpected layouts. Asymmetry. Overlap. Diagonal flow. Grid-breaking elements. Generous negative space OR controlled density.
- **Backgrounds & Visual Details**: Create atmosphere and depth rather than defaulting to solid colors. Add contextual effects and textures that match the overall aesthetic. Apply creative forms like gradient meshes, noise textures, geometric patterns, layered transparencies, dramatic shadows, decorative borders, custom cursors, and grain overlays.
NEVER use generic AI-generated aesthetics like overused font families (Inter, Roboto, Arial, system fonts), cliched color schemes (particularly purple gradients on white backgrounds), predictable layouts and component patterns, and cookie-cutter design that lacks context-specific character.
Interpret creatively and make unexpected choices that feel genuinely designed for the context. No design should be the same. Vary between light and dark themes, different fonts, different aesthetics. NEVER converge on common choices (Space Grotesk, for example) across generations.
**IMPORTANT**: Match implementation complexity to the aesthetic vision. Maximalist designs need elaborate code with extensive animations and effects. Minimalist or refined designs need restraint, precision, and careful attention to spacing, typography, and subtle details. Elegance comes from executing the vision well.
Remember: Claude is capable of extraordinary creative work. Don't hold back, show what can truly be created when thinking outside the box and committing fully to a distinctive vision.
The Smart Combination Approach
Use relative units as your foundation, with strategic pixel usage for specific cases:
✅ Use Relative Units For:
Typography & Spacing (rem/em)
css/* Root sizing - easy to scale entire UI */
html { font-size: 16px; } /* base */
/* Component scales automatically */
.card {
padding: 1.5rem; /* 24px at base, scales with root */
font-size: 1rem; /* 16px at base */
margin-bottom: 2rem; /* 32px at base */
}
/* Media query just changes root */
@media (max-width: 768px) {
html { font-size: 14px; } /* Everything shrinks proportionally */
}
Layout widths (%, max-width)
css.container {
width: 100%; /* Fluid */
max-width: 75rem; /* 1200px cap */
padding: 0 5%; /* Breathing room on all screens */
}
Viewport-based (vh/vw) - use sparingly
css.hero {
min-height: 100vh; /* Full screen sections */
padding: 5vw; /* Scales with viewport */
}
🎯 Use Pixels For:
Borders & fine details: border: 1px solid (0.0625rem looks weird)
Icons with fixed dimensions: width: 24px; height: 24px;
Media query breakpoints: @media (min-width: 768px) (industry standard)
Shadows: box-shadow: 0 2px 4px rgba(0,0,0,0.1)
TailwindCSS Context (Your Stack)
Tailwind uses rem by default - perfect combo already built-in:
tsx// Tailwind's spacing scale is in rem
<div className="p-4 mb-6 text-base">
{/* p-4 = 1rem, mb-6 = 1.5rem, text-base = 1rem */}
</div>
// Percentage widths
<div className="w-full md:w-1/2 lg:w-1/3">
{/* Fluid responsive columns */}
</div>
// Max-width constraints
<div className="max-w-7xl mx-auto px-4">
{/* Centers content, caps width, fluid padding */}
</div>
Modern Mobile-First Pattern
tsx// App component example
export function AssetCard() {
return (
<div className="
w-full /* Mobile: full width */
sm:w-[calc(50%-1rem)] /* Tablet: 2 columns */
lg:w-[calc(33.333%-1rem)] /* Desktop: 3 columns */
p-6 /* rem-based padding */
rounded-lg /* Fixed border radius */
border border-gray-200 /* 1px border */
">
<h3 className="text-lg font-semibold mb-2">
{/* rem-based text sizing */}
</h3>
</div>
);
}

View file

@ -12,6 +12,7 @@ from dataclasses import dataclass, field
# Try to import tiktoken for accurate token counting
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
TIKTOKEN_AVAILABLE = False
@ -26,6 +27,7 @@ except ImportError:
@dataclass
class ChunkResult:
"""Result of chunking a piece of content."""
content: str
tokens: int
type: str
@ -35,137 +37,169 @@ class ChunkResult:
class ChunkingEngine:
"""
Splits content into bounded semantic chunks.
Strategy: Simple Bounded Semantic
1. Split on paragraphs (\n\n)
2. Merge small paragraphs (< min_tokens) with next
3. Split large paragraphs (> max_tokens) at sentence boundaries
4. Detect content type (fact, preference, pattern, note, decision)
"""
def __init__(self, min_tokens: int = 100, max_tokens: int = 800):
"""
Initialize the chunking engine.
Args:
min_tokens: Minimum tokens per chunk (default: 100)
max_tokens: Maximum tokens per chunk (default: 800)
"""
self.min_tokens = min_tokens
self.max_tokens = max_tokens
# Initialize tiktoken encoder if available
self._encoder = None
if TIKTOKEN_AVAILABLE:
try:
self._encoder = tiktoken.get_encoding("cl100k_base")
except Exception:
except (ImportError, AttributeError, ValueError, KeyError):
pass # Fall back to character-based estimation
def count_tokens(self, text: str) -> int:
"""
Estimate token count.
Uses tiktoken if available, otherwise uses len/4 approximation
which works reasonably well for English text.
Args:
text: Text to count tokens for
Returns:
Estimated token count
"""
if text is None or text == "":
return 0
if self._encoder is not None:
try:
return len(self._encoder.encode(text))
except Exception:
except (AttributeError, TypeError, ValueError):
pass # Fall back to approximation
# Character-based approximation: ~4 chars per token for English
# This is a rough estimate but works for most cases
return max(1, len(text) // 4)
def detect_content_type(self, content: str) -> str:
"""
Detect if content is fact, preference, pattern, note, or decision.
Detection rules (case-insensitive, word boundaries respected):
- Decision: "decided", "chose", "selected", "going with"
- Preference: "prefer", "like", "want", "rather"
- Fact: "is a", "are a", "works as", "located in"
- Pattern: "usually", "often", "tends to", "pattern"
- Default: "note"
Args:
content: Content to analyze
Returns:
Content type string
"""
if not content:
return ChunkType.NOTE.value
content_lower = content.lower()
# Decision indicators (highest priority - explicit actions)
decision_patterns = [
r'\bdecided\b', r'\bchose\b', r'\bselected\b',
r'\bgoing with\b', r'\bwent with\b', r'\bopted for\b',
r'\bsettled on\b', r'\bconcluded\b'
r"\bdecided\b",
r"\bchose\b",
r"\bselected\b",
r"\bgoing with\b",
r"\bwent with\b",
r"\bopted for\b",
r"\bsettled on\b",
r"\bconcluded\b",
]
for pattern in decision_patterns:
if re.search(pattern, content_lower):
return ChunkType.DECISION.value
# Pattern indicators (habits, recurring behaviors) - check BEFORE preference
# because phrases like "generally prefer" describe patterns, not preferences
pattern_patterns = [
r'\busually\b', r'\boften\b', r'\btends to\b', r'\bpattern\b',
r'\balways\b', r'\btypically\b', r'\bgenerally\b',
r'\bfrequently\b', r'\bregularly\b', r'\bevery time\b',
r'\bmost of the time\b', r'\bwhenever\b'
r"\busually\b",
r"\boften\b",
r"\btends to\b",
r"\bpattern\b",
r"\balways\b",
r"\btypically\b",
r"\bgenerally\b",
r"\bfrequently\b",
r"\bregularly\b",
r"\bevery time\b",
r"\bmost of the time\b",
r"\bwhenever\b",
]
for pattern in pattern_patterns:
if re.search(pattern, content_lower):
return ChunkType.PATTERN.value
# Preference indicators
preference_patterns = [
r'\bprefer\b', r'\blike\b', r'\bwant\b', r'\brather\b',
r'\bdislike\b', r'\bhate\b', r'\bwish\b', r'\bwould like\b',
r'\bfavorite\b', r'\bfavour\b'
r"\bprefer\b",
r"\blike\b",
r"\bwant\b",
r"\brather\b",
r"\bdislike\b",
r"\bhate\b",
r"\bwish\b",
r"\bwould like\b",
r"\bfavorite\b",
r"\bfavour\b",
]
for pattern in preference_patterns:
if re.search(pattern, content_lower):
return ChunkType.PREFERENCE.value
# Fact indicators (statements of truth)
fact_patterns = [
r'\bis a\b', r'\bare a\b', r'\bworks as\b', r'\blocated in\b',
r'\bis an\b', r'\bare an\b', r'\bwas a\b', r'\bwere a\b',
r'\bworks at\b', r'\bworks for\b', r'\blives in\b',
r'\bborn in\b', r'\bstudied at\b', r'\bgraduated from\b',
r'\bhas\s+\d+', r'\bthere are\s+\d+', r'\bthere is\s+'
r"\bis a\b",
r"\bare a\b",
r"\bworks as\b",
r"\blocated in\b",
r"\bis an\b",
r"\bare an\b",
r"\bwas a\b",
r"\bwere a\b",
r"\bworks at\b",
r"\bworks for\b",
r"\blives in\b",
r"\bborn in\b",
r"\bstudied at\b",
r"\bgraduated from\b",
r"\bhas\s+\d+",
r"\bthere are\s+\d+",
r"\bthere is\s+",
]
for pattern in fact_patterns:
if re.search(pattern, content_lower):
return ChunkType.FACT.value
# Default: note
return ChunkType.NOTE.value
def _split_into_paragraphs(self, content: str) -> List[str]:
"""
Split content into paragraphs on double newlines.
Handles edge cases like multiple consecutive newlines and whitespace.
"""
# Split on double newlines
raw_paragraphs = re.split(r'\n\n+', content)
raw_paragraphs = re.split(r"\n\n+", content)
# Clean up each paragraph
paragraphs = []
for p in raw_paragraphs:
@ -173,191 +207,194 @@ class ChunkingEngine:
cleaned = p.strip()
if cleaned:
# Normalize internal newlines (preserve single newlines within paragraphs)
cleaned = re.sub(r'[ \t]+', ' ', cleaned)
cleaned = re.sub(r"[ \t]+", " ", cleaned)
paragraphs.append(cleaned)
return paragraphs
def _split_sentences(self, text: str) -> List[str]:
"""
Split text into sentences.
Handles abbreviations and edge cases reasonably well.
"""
# Pattern for sentence boundaries
# Matches . ? or ! followed by space or end of string
# Handles quotes and parentheses
sentence_pattern = r'(?<=[.!?])\s+(?=[A-Z"\'\(])|(?<=[.!?])$'
sentences = re.split(sentence_pattern, text)
# Clean up
result = []
for s in sentences:
cleaned = s.strip()
if cleaned:
result.append(cleaned)
return result
def _split_large_chunk(self, content: str) -> List[str]:
"""
Split a large chunk (> max_tokens) at sentence boundaries.
Tries to create chunks that are as close to max_tokens as possible
without exceeding it.
"""
sentences = self._split_sentences(content)
if len(sentences) <= 1:
# Cannot split by sentences, force split by token count
return self._force_split(content)
chunks = []
current_chunk = []
current_tokens = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
# If a single sentence exceeds max_tokens, force split it
if sentence_tokens > self.max_tokens:
# First, flush current chunk if any
if current_chunk:
chunks.append(' '.join(current_chunk))
chunks.append(" ".join(current_chunk))
current_chunk = []
current_tokens = 0
# Force split this long sentence
chunks.extend(self._force_split(sentence))
continue
# Check if adding this sentence would exceed max_tokens
if current_tokens + sentence_tokens > self.max_tokens and current_chunk:
# Flush current chunk
chunks.append(' '.join(current_chunk))
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_tokens = sentence_tokens
else:
# Add to current chunk
current_chunk.append(sentence)
current_tokens += sentence_tokens
# Don't forget the last chunk
if current_chunk:
chunks.append(' '.join(current_chunk))
chunks.append(" ".join(current_chunk))
return chunks
def _force_split(self, content: str) -> List[str]:
"""
Force split content into chunks of approximately max_tokens.
Used when sentence splitting isn't sufficient.
"""
total_tokens = self.count_tokens(content)
if total_tokens <= self.max_tokens:
return [content]
# Calculate approximate characters per chunk
# We use character count as a proxy for token count
chars_per_token = len(content) / total_tokens
chars_per_chunk = int(self.max_tokens * chars_per_token * 0.95) # 5% safety margin
chars_per_chunk = int(
self.max_tokens * chars_per_token * 0.95
) # 5% safety margin
chunks = []
start = 0
while start < len(content):
end = start + chars_per_chunk
if end >= len(content):
# Last chunk
chunks.append(content[start:].strip())
break
# Try to find a word boundary
# Look for space, period, or other punctuation
search_end = min(end + 50, len(content)) # Look ahead 50 chars
boundary = end
# Find the last space or punctuation before search_end
for i in range(search_end - 1, start, -1):
if content[i] in ' \t\n.,;:!?':
if content[i] in " \t\n.,;:!?":
boundary = i + 1
break
chunk = content[start:boundary].strip()
if chunk:
chunks.append(chunk)
start = boundary
return chunks
def chunk(self, content: str, conversation_id: str,
tags: List[str] = None) -> List[ChunkResult]:
def chunk(
self, content: str, conversation_id: str, tags: List[str] = None
) -> List[ChunkResult]:
"""
Split content into bounded semantic chunks.
Strategy: Simple Bounded Semantic
1. Split on paragraphs (\n\n)
2. Merge small paragraphs (< min_tokens) with next
3. Split large paragraphs (> max_tokens) at sentence boundaries
4. Detect content type (fact, preference, pattern, note, decision)
Args:
content: Text content to chunk
conversation_id: Source conversation ID
tags: Optional list of tags to apply to all chunks
Returns:
List of ChunkResult objects ready for storage
"""
if not content or not content.strip():
return []
tags = tags or []
# Step 1: Split into paragraphs
paragraphs = self._split_into_paragraphs(content)
# Step 2: Process paragraphs - handle size bounds
raw_chunks = []
for paragraph in paragraphs:
tokens = self.count_tokens(paragraph)
if tokens > self.max_tokens:
# Split large paragraph at sentence boundaries
split_chunks = self._split_large_chunk(paragraph)
raw_chunks.extend(split_chunks)
else:
raw_chunks.append(paragraph)
# Step 3: Merge small chunks
merged_chunks = self._merge_small_chunks(raw_chunks)
# Step 4: Create ChunkResult objects with type detection
results = []
for chunk_content in merged_chunks:
chunk_tokens = self.count_tokens(chunk_content)
content_type = self.detect_content_type(chunk_content)
result = ChunkResult(
content=chunk_content,
tokens=chunk_tokens,
type=content_type,
tags=tags.copy()
tags=tags.copy(),
)
results.append(result)
return results
def _merge_small_chunks(self, chunks: List[str]) -> List[str]:
"""
Merge chunks that are below min_tokens with adjacent chunks.
Strategy:
- Try to merge with next chunk (if same content type)
- If merging would exceed max_tokens, keep as-is (it's the best we can do)
@ -366,39 +403,39 @@ class ChunkingEngine:
"""
if not chunks:
return []
if len(chunks) == 1:
return chunks
result = []
i = 0
while i < len(chunks):
current = chunks[i]
current_tokens = self.count_tokens(current)
current_type = self.detect_content_type(current)
# If current chunk is large enough, add it
if current_tokens >= self.min_tokens:
result.append(current)
i += 1
continue
# Current chunk is too small - try to merge with next
if i + 1 < len(chunks):
next_chunk = chunks[i + 1]
next_tokens = self.count_tokens(next_chunk)
next_type = self.detect_content_type(next_chunk)
# Don't merge if content types differ (preserve semantic boundaries)
if current_type != next_type:
result.append(current) # Add as-is even if small
i += 1
continue
# Check if merging would exceed max_tokens
combined_tokens = current_tokens + next_tokens
if combined_tokens <= self.max_tokens:
# Merge current with next
merged = current + "\n\n" + next_chunk
@ -420,7 +457,7 @@ class ChunkingEngine:
prev_tokens = self.count_tokens(prev)
prev_type = self.detect_content_type(prev)
combined_tokens = prev_tokens + current_tokens
# Only merge if types match
if combined_tokens <= self.max_tokens and prev_type == current_type:
# Merge with previous
@ -431,18 +468,23 @@ class ChunkingEngine:
else:
# No previous chunk, add as-is
result.append(current)
i += 1
return result
def chunk_and_store(content: str, conversation_id: str,
store, tags: List[str] = None,
min_tokens: int = 100, max_tokens: int = 800) -> List[Chunk]:
def chunk_and_store(
content: str,
conversation_id: str,
store,
tags: List[str] = None,
min_tokens: int = 100,
max_tokens: int = 800,
) -> List[Chunk]:
"""
Convenience function to chunk content and store in ChunkStore.
Args:
content: Text to chunk and store
conversation_id: Source conversation ID
@ -450,13 +492,13 @@ def chunk_and_store(content: str, conversation_id: str,
tags: Optional tags for all chunks
min_tokens: Minimum tokens per chunk
max_tokens: Maximum tokens per chunk
Returns:
List of created Chunk objects
"""
engine = ChunkingEngine(min_tokens=min_tokens, max_tokens=max_tokens)
chunk_results = engine.chunk(content, conversation_id, tags)
created_chunks = []
for result in chunk_results:
chunk = store.create_chunk(
@ -464,10 +506,10 @@ def chunk_and_store(content: str, conversation_id: str,
chunk_type=result.type,
conversation_id=conversation_id,
tokens=result.tokens,
tags=result.tags
tags=result.tags,
)
created_chunks.append(chunk)
return created_chunks
@ -477,7 +519,7 @@ if __name__ == "__main__":
print("=" * 60)
print("Chunking Engine - Self Test")
print("=" * 60)
# Test 1: Basic multi-paragraph content
print("\n[Test 1] Multi-paragraph content")
content = """Paragraph 1. Short.
@ -485,16 +527,16 @@ if __name__ == "__main__":
Paragraph 2 is longer with multiple sentences. It should stand alone.
This is a decision: We chose to use RLM architecture."""
engine = ChunkingEngine()
chunks = engine.chunk(content, "test-conv")
print(f"Input paragraphs: 3")
print(f"Output chunks: {len(chunks)}")
for i, c in enumerate(chunks, 1):
print(f" Chunk {i}: {c.type}, {c.tokens} tokens")
print(f" Content: {c.content[:60]}...")
# Test 2: Content type detection
print("\n[Test 2] Content type detection")
test_cases = [
@ -504,12 +546,12 @@ This is a decision: We chose to use RLM architecture."""
("I usually wake up early", "pattern"),
("This is just a random note", "note"),
]
for text, expected in test_cases:
detected = engine.detect_content_type(text)
status = "[OK]" if detected == expected else "[FAIL]"
print(f" {status} '{text[:40]}...' -> {detected} (expected: {expected})")
# Test 3: Small paragraph merging
print("\n[Test 3] Small paragraph merging")
content = """A.
@ -517,19 +559,23 @@ This is a decision: We chose to use RLM architecture."""
B.
C is a longer paragraph with more content that should stand on its own."""
chunks = engine.chunk(content, "test-conv")
print(f"Input paragraphs: 3 (two very short)")
print(f"Output chunks: {len(chunks)}")
for i, c in enumerate(chunks, 1):
print(f" Chunk {i}: {c.tokens} tokens - {c.content[:50]}...")
# Test 4: Large paragraph splitting
print("\n[Test 4] Large paragraph splitting")
# Generate a paragraph that's definitely over 800 tokens
large_content = " ".join([f"This is sentence number {i} in a very long paragraph."
for i in range(1, 201)]) # ~200 sentences
large_content = " ".join(
[
f"This is sentence number {i} in a very long paragraph."
for i in range(1, 201)
]
) # ~200 sentences
chunks = engine.chunk(large_content, "test-conv")
total_tokens = sum(c.tokens for c in chunks)
print(f"Input: ~{engine.count_tokens(large_content)} tokens")
@ -537,7 +583,7 @@ C is a longer paragraph with more content that should stand on its own."""
for i, c in enumerate(chunks, 1):
status = "[OK]" if 100 <= c.tokens <= 800 else "[FAIL]"
print(f" {status} Chunk {i}: {c.tokens} tokens")
# Test 5: Token counting comparison
print("\n[Test 5] Token counting")
test_text = "This is a test sentence with exactly twelve tokens."
@ -545,38 +591,38 @@ C is a longer paragraph with more content that should stand on its own."""
print(f" Text: '{test_text}'")
print(f" Estimated tokens: {estimated}")
print(f" Tiktoken available: {TIKTOKEN_AVAILABLE}")
# Test 6: Integration with ChunkStore
print("\n[Test 6] Integration with ChunkStore")
try:
from .memory_store import ChunkStore
store = ChunkStore("brain/memory")
test_content = """First fact: Python is a programming language.
Second decision: We chose to implement async support.
Third preference: I prefer using type hints."""
created = chunk_and_store(
content=test_content,
conversation_id="integration-test",
store=store,
tags=["test", "integration"]
tags=["test", "integration"],
)
print(f" Created {len(created)} chunks:")
for c in created:
print(f" - {c.id}: {c.type}, {c.tokens} tokens")
# Cleanup - archive the test chunks
for c in created:
store.delete_chunk(c.id, permanent=False)
print(" ✓ Test chunks archived")
except Exception as e:
print(f" [SKIP] Integration test skipped: {e}")
print("\n" + "=" * 60)
print("All tests completed!")
print("=" * 60)

View file

@ -19,6 +19,7 @@ except ImportError:
@dataclass
class ReasonResult:
"""Result of a REASON operation."""
synthesis: str
insights: List[str] = field(default_factory=list)
evidence: Dict[str, List[str]] = field(default_factory=dict)
@ -32,23 +33,20 @@ class ReasonResult:
class ReasonOperation:
"""
High-level REASON operation for memory analysis and synthesis.
Uses RLM to:
- Analyze patterns across memories
- Synthesize insights from multiple sources
- Identify contradictions or gaps
- Generate conclusions with evidence
"""
def __init__(
self,
chunk_store: ChunkStore,
llm_client=None,
max_iterations: int = 10
self, chunk_store: ChunkStore, llm_client=None, max_iterations: int = 10
):
"""
Initialize REASON operation.
Args:
chunk_store: Storage backend
llm_client: LLM for reasoning
@ -56,47 +54,43 @@ class ReasonOperation:
"""
if chunk_store is None:
raise ValueError("chunk_store is required")
self.chunk_store = chunk_store
self.llm_client = llm_client
self.max_iterations = max_iterations
# Initialize recall for gathering evidence
self._recall = None
if llm_client is not None:
self._recall = RecallOperation(
chunk_store=chunk_store,
llm_client=llm_client,
max_iterations=max_iterations
max_iterations=max_iterations,
)
def reason(
self,
query: str,
context_chunks: List[str] = None,
analysis_type: str = "synthesis"
analysis_type: str = "synthesis",
) -> ReasonResult:
"""
Perform reasoning analysis on memories.
"""
if not query or not query.strip():
return ReasonResult(
synthesis="No query provided",
confidence=0.0
)
return ReasonResult(synthesis="No query provided", confidence=0.0)
# Gather evidence
if context_chunks:
evidence = self._gather_evidence(context_chunks)
else:
evidence = self._search_evidence(query)
if not evidence:
return ReasonResult(
synthesis="No relevant evidence found for analysis",
confidence=0.0
synthesis="No relevant evidence found for analysis", confidence=0.0
)
# 1. Always check for contradictions in evidence
contradictions = self._detect_contradictions(evidence["chunks"])
@ -116,30 +110,28 @@ class ReasonOperation:
if contradictions and not result.contradictions:
result.contradictions = contradictions
if "Identified" not in "".join(result.insights):
result.insights.append(f"Identified {len(contradictions)} potential conflicts in memory")
result.insights.append(
f"Identified {len(contradictions)} potential conflicts in memory"
)
return result
def _gather_evidence(self, chunk_ids: List[str]) -> Dict[str, Any]:
"""Gather evidence from specific chunks."""
evidence = {
"chunks": [],
"tags": set(),
"types": set()
}
evidence = {"chunks": [], "tags": set(), "types": set()}
for chunk_id in chunk_ids:
chunk = self.chunk_store.get_chunk(chunk_id)
if chunk:
evidence["chunks"].append(chunk)
evidence["tags"].update(chunk.tags)
evidence["types"].add(chunk.type)
evidence["tags"] = list(evidence["tags"])
evidence["types"] = list(evidence["types"])
return evidence
def _search_evidence(self, query: str) -> Dict[str, Any]:
"""Search for relevant evidence."""
# Use recall to find relevant chunks
@ -147,30 +139,33 @@ class ReasonOperation:
# Fallback to basic search
chunk_ids = self.chunk_store.list_chunks()
return self._gather_evidence(chunk_ids[:10])
recall_result = self._recall.recall(query, max_results=10)
return self._gather_evidence(recall_result.source_chunks)
def _synthesize(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
"""Synthesize insights from evidence with contradiction surfacing."""
chunks = evidence["chunks"]
# 1. Sort chunks by confidence and recency (if available)
def chunk_sort_key(c):
conf = getattr(c.metadata, 'confidence', 0.5)
conf = getattr(c.metadata, "confidence", 0.5)
# Try to get timestamp for recency boost
ts = 0.0
try:
created = getattr(c.metadata, 'created', "")
created = getattr(c.metadata, "created", "")
if created:
from datetime import datetime
ts = datetime.fromisoformat(created.replace("Z", "+00:00")).timestamp()
except Exception:
ts = datetime.fromisoformat(
created.replace("Z", "+00:00")
).timestamp()
except (ValueError, TypeError, AttributeError):
pass
return (conf, ts)
sorted_chunks = sorted(chunks, key=chunk_sort_key, reverse=True)
# 2. Extract unique contents
seen_contents = set()
unique_chunks = []
@ -183,27 +178,29 @@ class ReasonOperation:
# 3. Detect contradictions
contradictions = self._detect_contradictions(unique_chunks)
# 4. Build synthesis
contents = [c.content for c in unique_chunks]
if not contents:
return ReasonResult(
synthesis="No content to synthesize",
confidence=0.0
)
return ReasonResult(synthesis="No content to synthesize", confidence=0.0)
synthesis = self._build_synthesis(query, contents)
# 5. Extract insights
insights = self._extract_insights(contents)
if contradictions:
insights.append(f"Identified {len(contradictions)} potential conflicts in memory")
insights.append(
f"Identified {len(contradictions)} potential conflicts in memory"
)
# 6. Calculate aggregate confidence
avg_confidence = sum(
getattr(c.metadata, 'confidence', 0.7) for c in unique_chunks
) / len(unique_chunks) if unique_chunks else 0.0
avg_confidence = (
sum(getattr(c.metadata, "confidence", 0.7) for c in unique_chunks)
/ len(unique_chunks)
if unique_chunks
else 0.0
)
return ReasonResult(
synthesis=synthesis,
insights=insights,
@ -211,25 +208,29 @@ class ReasonOperation:
contradictions=contradictions,
confidence=avg_confidence,
source_chunks=[c.id for c in unique_chunks],
iterations_used=1
iterations_used=1,
)
def _build_synthesis(self, query: str, contents: List[str]) -> str:
"""Build structured synthesis text."""
if not contents:
return "No information available"
# Improved synthesis: summary header + ranked list
synthesis_parts = [f"Synthesized analysis for: \"{query}\"", ""]
synthesis_parts.append(f"Based on {len(contents)} unique sources (ranked by relevance):")
synthesis_parts = [f'Synthesized analysis for: "{query}"', ""]
synthesis_parts.append(
f"Based on {len(contents)} unique sources (ranked by relevance):"
)
for i, content in enumerate(contents[:7], 1):
# Clean up content for list display
clean_content = content.replace("\n", " ").strip()
synthesis_parts.append(f" {i}. {clean_content}")
if len(contents) > 7:
synthesis_parts.append(f" ... and {len(contents) - 7} other supporting memories.")
synthesis_parts.append(
f" ... and {len(contents) - 7} other supporting memories."
)
return "\n".join(synthesis_parts)
def _detect_contradictions(self, chunks: List[Any]) -> List[Dict[str, Any]]:
@ -237,7 +238,7 @@ class ReasonOperation:
Identify potential conflicts across memory chunks using non-LLM heuristics.
"""
conflicts = []
# 1. Group by tag/topic
topic_groups = {}
for chunk in chunks:
@ -245,47 +246,53 @@ class ReasonOperation:
if tag not in topic_groups:
topic_groups[tag] = []
topic_groups[tag].append(chunk)
# 2. Check for opposite sentiments/values within the same tag
# Heuristic: "prefer X" vs "prefer Y" or "not X" vs "is X"
NEGATIONS = {"not", "don't", "dislike", "hate", "avoid", "stop"}
for tag, group in topic_groups.items():
if len(group) < 2:
continue
# Simple pair-wise comparison
for i in range(len(group)):
for j in range(i + 1, len(group)):
c1, c2 = group[i], group[j]
# Heuristic: If both talk about "prefer" but have different words
# e.g. "prefer dark mode" vs "prefer light mode"
c1_words = set(c1.content.lower().split())
c2_words = set(c2.content.lower().split())
if ("prefer" in c1_words or "prefers" in c1_words) and ("prefer" in c2_words or "prefers" in c2_words):
if ("prefer" in c1_words or "prefers" in c1_words) and (
"prefer" in c2_words or "prefers" in c2_words
):
# Significant difference in specific preference
if len(c1_words ^ c2_words) >= 2:
conflicts.append({
"type": "potential_preference_conflict",
"topic": tag,
"chunks": [c1.id, c2.id],
"reason": f"Divergent preferences detected for topic '{tag}'"
})
if len(c1_words ^ c2_words) >= 2:
conflicts.append(
{
"type": "potential_preference_conflict",
"topic": tag,
"chunks": [c1.id, c2.id],
"reason": f"Divergent preferences detected for topic '{tag}'",
}
)
# Check for explicit negation
# If one has a negation word and the other doesn't for the same tag
c1_negated = any(n in c1_words for n in NEGATIONS)
c2_negated = any(n in c2_words for n in NEGATIONS)
if c1_negated != c2_negated:
conflicts.append({
"type": "negation_conflict",
"topic": tag,
"chunks": [c1.id, c2.id],
"reason": f"Opposing sentiments detected for topic '{tag}'"
})
conflicts.append(
{
"type": "negation_conflict",
"topic": tag,
"chunks": [c1.id, c2.id],
"reason": f"Opposing sentiments detected for topic '{tag}'",
}
)
# Deduplicate conflicts
unique_conflicts = []
@ -295,20 +302,20 @@ class ReasonOperation:
if pair not in seen_pairs:
seen_pairs.add(pair)
unique_conflicts.append(c)
return unique_conflicts
def _extract_insights(self, contents: List[str]) -> List[str]:
"""Extract key insights from contents."""
insights = []
# Simple insight extraction - look for patterns
for content in contents:
if "prefer" in content.lower():
insights.append(f"Preference identified: {content[:100]}...")
if "like" in content.lower():
insights.append(f"Positive sentiment: {content[:100]}...")
# Remove duplicates while preserving order
seen = set()
unique_insights = []
@ -316,135 +323,137 @@ class ReasonOperation:
if insight not in seen:
seen.add(insight)
unique_insights.append(insight)
return unique_insights[:5] # Top 5 insights
def _compare(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
"""Compare different pieces of evidence."""
chunks = evidence["chunks"]
if len(chunks) < 2:
return ReasonResult(
synthesis="Need at least 2 items to compare",
confidence=0.0
synthesis="Need at least 2 items to compare", confidence=0.0
)
# Build comparison
comparison_parts = [f"Comparison Analysis: \"{query}\"", ""]
comparison_parts = [f'Comparison Analysis: "{query}"', ""]
for i, chunk in enumerate(chunks, 1):
comparison_parts.append(f" Option {i}: {chunk.content}")
synthesis = "\n".join(comparison_parts)
return ReasonResult(
synthesis=synthesis,
insights=[f"Comparing {len(chunks)} distinct sources"],
confidence=0.7,
source_chunks=[chunk.id for chunk in chunks]
source_chunks=[chunk.id for chunk in chunks],
)
def _find_patterns(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
"""Find patterns across evidence."""
chunks = evidence["chunks"]
tags = evidence.get("tags", [])
types = evidence.get("types", [])
insights = []
# Pattern: Common tags
if tags:
insights.append(f"Common themes: {', '.join(tags[:5])}")
# Pattern: Content types
if types:
insights.append(f"Source types: {', '.join(types)}")
# Pattern: Temporal (if timestamps available)
if chunks:
dates = []
for c in chunks:
d = getattr(c.metadata, 'created', getattr(c.metadata, 'created_at', None))
if d: dates.append(d[:10])
d = getattr(
c.metadata, "created", getattr(c.metadata, "created_at", None)
)
if d:
dates.append(d[:10])
if dates:
insights.append(f"Evidence spans {len(set(dates))} unique days")
return ReasonResult(
synthesis=f"Found {len(insights)} patterns across {len(chunks)} memories",
insights=insights,
confidence=0.75,
source_chunks=[chunk.id for chunk in chunks]
source_chunks=[chunk.id for chunk in chunks],
)
def _identify_gaps(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
"""Identify gaps in knowledge."""
chunks = evidence["chunks"]
gaps = []
# Check for low confidence items
low_confidence = [
chunk for chunk in chunks
if getattr(chunk.metadata, 'confidence', 0.7) < 0.6
chunk
for chunk in chunks
if getattr(chunk.metadata, "confidence", 0.7) < 0.6
]
if low_confidence:
gaps.append(f"{len(low_confidence)} sources have low confidence scores")
# Check for missing links
unlinked = [
chunk for chunk in chunks
if not getattr(chunk, 'links', None) or (not chunk.links.context_of and not chunk.links.related_to)
chunk
for chunk in chunks
if not getattr(chunk, "links", None)
or (not chunk.links.context_of and not chunk.links.related_to)
]
if unlinked:
gaps.append(f"{len(unlinked)} items are isolated (no graph links)")
if not gaps:
gaps.append("No significant structural gaps identified in the available evidence")
gaps.append(
"No significant structural gaps identified in the available evidence"
)
return ReasonResult(
synthesis=f"Knowledge Gap Analysis: {'; '.join(gaps)}",
insights=gaps,
confidence=0.6,
source_chunks=[chunk.id for chunk in chunks]
source_chunks=[chunk.id for chunk in chunks],
)
def analyze_contradictions(
self,
chunk_ids: List[str]
) -> List[Dict[str, Any]]:
def analyze_contradictions(self, chunk_ids: List[str]) -> List[Dict[str, Any]]:
"""
Analyze chunks for potential contradictions.
Args:
chunk_ids: Chunks to analyze
Returns:
List of potential contradictions
"""
contradictions = []
chunks = []
for chunk_id in chunk_ids:
chunk = self.chunk_store.get_chunk(chunk_id)
if chunk:
chunks.append(chunk)
# Simple contradiction detection
# Look for chunks with contradicts links
for chunk in chunks:
if hasattr(chunk.links, 'contradicts') and chunk.links.contradicts:
if hasattr(chunk.links, "contradicts") and chunk.links.contradicts:
for target_id in chunk.links.contradicts:
contradictions.append({
"chunk_a": chunk.id,
"chunk_b": target_id,
"reasoning": "Explicit contradiction link"
})
contradictions.append(
{
"chunk_a": chunk.id,
"chunk_b": target_id,
"reasoning": "Explicit contradiction link",
}
)
return contradictions
def get_stats(self) -> Dict[str, Any]:
"""Get reasoning operation statistics."""
return {
"total_analyses": 0,
"avg_confidence": 0.0,
"avg_insights": 0.0
}
return {"total_analyses": 0, "avg_confidence": 0.0, "avg_insights": 0.0}

View file

@ -16,17 +16,20 @@ from pathlib import Path
class SandboxViolation(Exception):
"""Raised when code attempts to violate sandbox security."""
pass
class MaxIterationsError(Exception):
"""Raised when max iterations exceeded."""
pass
# Cost budget exceeded
class CostBudgetExceededError(RuntimeError):
"""Raised when cost budget is exceeded."""
pass
@ -35,29 +38,129 @@ class CostBudgetExceededError(RuntimeError):
# Allowed built-ins for sandbox
ALLOWED_BUILTINS = {
'abs', 'all', 'any', 'ascii', 'bin', 'bool', 'bytearray', 'bytes',
'callable', 'chr', 'classmethod', 'complex', 'delattr', 'dict',
'dir', 'divmod', 'enumerate', 'filter', 'float', 'format', 'frozenset',
'getattr', 'globals', 'hasattr', 'hash', 'help', 'hex', 'id', 'input',
'int', 'isinstance', 'issubclass', 'iter', 'len', 'list', 'locals',
'map', 'max', 'memoryview', 'min', 'next', 'object', 'oct', 'ord',
'pow', 'print', 'property', 'range', 'repr', 'reversed',
'round', 'set', 'setattr', 'slice', 'sorted', 'staticmethod', 'str',
'sum', 'super', 'tuple', 'type', 'vars', 'zip', '__build_class__',
'__name__', 'True', 'False', 'None', 'Exception', 'TypeError',
'ValueError', 'KeyError', 'IndexError', 'AttributeError', 'RuntimeError',
'StopIteration', 'ArithmeticError', 'LookupError', 'AssertionError',
'NotImplementedError', 'ZeroDivisionError', 'OverflowError',
"abs",
"all",
"any",
"ascii",
"bin",
"bool",
"bytearray",
"bytes",
"callable",
"chr",
"classmethod",
"complex",
"delattr",
"dict",
"dir",
"divmod",
"enumerate",
"filter",
"float",
"format",
"frozenset",
"getattr",
"globals",
"hasattr",
"hash",
"help",
"hex",
"id",
"input",
"int",
"isinstance",
"issubclass",
"iter",
"len",
"list",
"locals",
"map",
"max",
"memoryview",
"min",
"next",
"object",
"oct",
"ord",
"pow",
"print",
"property",
"range",
"repr",
"reversed",
"round",
"set",
"setattr",
"slice",
"sorted",
"staticmethod",
"str",
"sum",
"super",
"tuple",
"type",
"vars",
"zip",
"__build_class__",
"__name__",
"True",
"False",
"None",
"Exception",
"TypeError",
"ValueError",
"KeyError",
"IndexError",
"AttributeError",
"RuntimeError",
"StopIteration",
"ArithmeticError",
"LookupError",
"AssertionError",
"NotImplementedError",
"ZeroDivisionError",
"OverflowError",
}
# Blocked imports/modules
BLOCKED_MODULES = {
'os', 'sys', 'subprocess', 'socket', 'urllib', 'http', 'ftplib',
'smtplib', 'telnetlib', 'poplib', 'imaplib', 'nntplib', 'ssl',
'email', 'xmlrpc', 'concurrent.futures.process', 'multiprocessing',
'ctypes', 'cffi', 'mmap', 'resource', 'posix', 'nt', 'pwd', 'grp',
'spwd', 'crypt', 'termios', 'tty', 'pty', 'fcntl', 'msvcrt',
'winreg', '_winapi', 'select', 'selectors', 'asyncio.subprocess',
"os",
"sys",
"subprocess",
"socket",
"urllib",
"http",
"ftplib",
"smtplib",
"telnetlib",
"poplib",
"imaplib",
"nntplib",
"ssl",
"email",
"xmlrpc",
"concurrent.futures.process",
"multiprocessing",
"ctypes",
"cffi",
"mmap",
"resource",
"posix",
"nt",
"pwd",
"grp",
"spwd",
"crypt",
"termios",
"tty",
"pty",
"fcntl",
"msvcrt",
"winreg",
"_winapi",
"select",
"selectors",
"asyncio.subprocess",
}
# Allowed modules that get redirected to mocks
@ -66,11 +169,11 @@ ALLOWED_MODULES = set()
def safe_import(name, globals=None, locals=None, fromlist=(), level=0):
"""Safe import function that only allows specific modules."""
base_module = name.split('.')[0] if name else ''
base_module = name.split(".")[0] if name else ""
# Allow sys import (mocked in sandbox)
if base_module == 'sys':
if globals and 'sys' in globals:
return globals['sys']
if base_module == "sys":
if globals and "sys" in globals:
return globals["sys"]
raise ImportError("Mock sys not found in sandbox")
if base_module in ALLOWED_MODULES:
if globals and base_module in globals:
@ -81,98 +184,120 @@ def safe_import(name, globals=None, locals=None, fromlist=(), level=0):
# Blocked attribute names that could be used for sandbox escape
BLOCKED_ATTRIBUTES = {
'__class__', '__bases__', '__subclasses__', '__base__',
'__mro__', '__globals__', '__code__', '__func__', '__self__',
'__module__', '__dict__', '__closure__', '__defaults__',
'__kwdefaults__', '__getattribute__', '__setattr__',
"__class__",
"__bases__",
"__subclasses__",
"__base__",
"__mro__",
"__globals__",
"__code__",
"__func__",
"__self__",
"__module__",
"__dict__",
"__closure__",
"__defaults__",
"__kwdefaults__",
"__getattribute__",
"__setattr__",
}
class SandboxVisitor(ast.NodeVisitor):
"""AST visitor to check for sandbox violations."""
def __init__(self, allowed_paths: Optional[list] = None):
self.allowed_paths = allowed_paths or []
self.violations = []
def visit_Import(self, node):
for alias in node.names:
module = alias.name.split('.')[0]
module = alias.name.split(".")[0]
# Allow 'sys' import (redirected to mock in sandbox)
if module == 'sys':
if module == "sys":
continue
if module in BLOCKED_MODULES and module not in ALLOWED_MODULES:
self.violations.append(f"Import of '{module}' is not allowed")
self.generic_visit(node)
def visit_ImportFrom(self, node):
if node.module:
module = node.module.split('.')[0]
module = node.module.split(".")[0]
# Allow 'sys' import (redirected to mock in sandbox)
if module == 'sys':
if module == "sys":
return
if module in BLOCKED_MODULES and module not in ALLOWED_MODULES:
self.violations.append(f"Import from '{module}' is not allowed")
self.generic_visit(node)
def visit_Delete(self, node):
"""Block deletion of builtins attributes."""
for target in node.targets:
if isinstance(target, ast.Attribute):
if self._is_builtins_access(target.value):
self.violations.append("Deletion of __builtins__ attributes is not allowed")
self.violations.append(
"Deletion of __builtins__ attributes is not allowed"
)
if isinstance(target, ast.Subscript):
if self._is_builtins_access(target.value):
self.violations.append("Deletion of __builtins__ attributes is not allowed")
self.violations.append(
"Deletion of __builtins__ attributes is not allowed"
)
self.generic_visit(node)
def visit_Call(self, node):
# Check for eval/exec/compile
if isinstance(node.func, ast.Name):
if node.func.id in ('eval', 'exec', 'compile'):
if node.func.id in ("eval", "exec", "compile"):
self.violations.append(f"Use of '{node.func.id}()' is not allowed")
# Check for __import__
if isinstance(node.func, ast.Name) and node.func.id == '__import__':
if isinstance(node.func, ast.Name) and node.func.id == "__import__":
self.violations.append("Use of '__import__()' is not allowed")
# Check for open()
if isinstance(node.func, ast.Name) and node.func.id == 'open':
if isinstance(node.func, ast.Name) and node.func.id == "open":
self.violations.append("Use of 'open()' is not allowed")
# Check for getattr/setattr on __builtins__
if isinstance(node.func, ast.Name) and node.func.id == 'getattr':
if isinstance(node.func, ast.Name) and node.func.id == "getattr":
if node.args and self._is_builtins_access(node.args[0]):
self.violations.append("getattr on __builtins__ is not allowed")
if isinstance(node.func, ast.Name) and node.func.id == 'setattr':
if isinstance(node.func, ast.Name) and node.func.id == "setattr":
if node.args and self._is_builtins_access(node.args[0]):
self.violations.append("setattr on __builtins__ is not allowed")
if isinstance(node.func, ast.Name) and node.func.id == 'delattr':
if isinstance(node.func, ast.Name) and node.func.id == "delattr":
if node.args and self._is_builtins_access(node.args[0]):
self.violations.append("delattr on __builtins__ is not allowed")
self.generic_visit(node)
def visit_BinOp(self, node):
"""Check for large memory allocations via string/list multiplication."""
if isinstance(node.op, ast.Mult):
# Check for patterns like "x" * (1024 * 1024 * 100)
# Try to evaluate the size statically
try:
if isinstance(node.left, ast.Constant) and isinstance(node.left.value, str):
if isinstance(node.left, ast.Constant) and isinstance(
node.left.value, str
):
if isinstance(node.right, ast.Constant):
size = len(node.left.value) * node.right.value
if size > 10 * 1024 * 1024: # 10MB limit
raise MemoryError(f"String multiplication would create {size} bytes, exceeding 10MB limit")
raise MemoryError(
f"String multiplication would create {size} bytes, exceeding 10MB limit"
)
elif isinstance(node.right, ast.BinOp):
# Try to evaluate binary expression
size = len(node.left.value) * self._eval_const_expr(node.right)
if size > 10 * 1024 * 1024: # 10MB limit
raise MemoryError(f"String multiplication would create {size} bytes, exceeding 10MB limit")
raise MemoryError(
f"String multiplication would create {size} bytes, exceeding 10MB limit"
)
except MemoryError:
raise # Re-raise MemoryError
except Exception:
except (ValueError, TypeError, AttributeError):
pass # Can't evaluate statically, let it run and catch at runtime
self.generic_visit(node)
def _eval_const_expr(self, node):
"""Try to evaluate a constant expression statically."""
if isinstance(node, ast.Constant):
@ -187,36 +312,52 @@ class SandboxVisitor(ast.NodeVisitor):
if isinstance(node.op, ast.Sub):
return left - right
raise ValueError("Cannot evaluate expression")
def visit_Attribute(self, node):
"""Check for dangerous attribute access like __class__, __bases__, etc."""
if node.attr in BLOCKED_ATTRIBUTES:
self.violations.append(f"Access to '{node.attr}' is not allowed")
self.generic_visit(node)
def visit_Subscript(self, node):
"""Check for builtins subscript access like globals()['__builtins__']['__import__']."""
# Check for globals()['__builtins__'] or locals()['__builtins__']
if isinstance(node.value, ast.Call):
if isinstance(node.value.func, ast.Name) and node.value.func.id in ('globals', 'locals'):
if isinstance(node.slice, ast.Constant) and node.slice.value == '__builtins__':
self.violations.append("globals()/locals()['__builtins__'] manipulation is not allowed")
elif hasattr(node.slice, 's') and node.slice.s == '__builtins__': # Python < 3.8 compatibility
self.violations.append("globals()/locals()['__builtins__'] manipulation is not allowed")
if isinstance(node.value.func, ast.Name) and node.value.func.id in (
"globals",
"locals",
):
if (
isinstance(node.slice, ast.Constant)
and node.slice.value == "__builtins__"
):
self.violations.append(
"globals()/locals()['__builtins__'] manipulation is not allowed"
)
elif (
hasattr(node.slice, "s") and node.slice.s == "__builtins__"
): # Python < 3.8 compatibility
self.violations.append(
"globals()/locals()['__builtins__'] manipulation is not allowed"
)
self.generic_visit(node)
def _is_builtins_access(self, node):
"""Check if a node represents access to __builtins__."""
if isinstance(node, ast.Name) and node.id == '__builtins__':
if isinstance(node, ast.Name) and node.id == "__builtins__":
return True
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name) and node.func.id in ('globals', 'locals'):
if isinstance(node.func, ast.Name) and node.func.id in (
"globals",
"locals",
):
return True
return False
class MemoryLimitException(RuntimeError):
"""Raised when memory limit is exceeded."""
pass
@ -224,14 +365,14 @@ class MemoryLimitException(RuntimeError):
def check_safety(code: str) -> list:
"""Check code for sandbox violations."""
# Pre-check for null bytes and other dangerous characters
if '\x00' in code:
if "\x00" in code:
return ["Code contains null bytes which is not allowed"]
try:
tree = ast.parse(code)
except SyntaxError:
return [] # Let SyntaxError be handled elsewhere
visitor = SandboxVisitor()
visitor.visit(tree)
return visitor.violations
@ -255,36 +396,44 @@ class REPLSession:
"""
RLM REPL Session - secure sandbox for recursive LLM execution.
"""
class _StderrCapture:
"""Mock stderr object for sandbox."""
def __init__(self, session):
self._session = session
def write(self, text: str):
"""Write to stderr capture."""
self._session._stderr.append(text)
def flush(self):
"""Flush stderr (no-op)."""
pass
class MockSys:
"""Mock sys module for sandbox with only stderr."""
def __init__(self, stderr_capture):
self.stderr = stderr_capture
def __getattr__(self, name):
if name == 'modules':
if name == "modules":
raise SandboxViolation("Access to sys.modules is not allowed")
raise AttributeError(f"sys.{name} is not available in sandbox")
def __init__(self, chunk_store=None, llm_client=None,
max_iterations: int = 10, timeout_seconds: int = 60, max_depth: int = 5,
max_cost_usd: Optional[float] = None):
def __init__(
self,
chunk_store=None,
llm_client=None,
max_iterations: int = 10,
timeout_seconds: int = 60,
max_depth: int = 5,
max_cost_usd: Optional[float] = None,
):
"""
Initialize REPL session.
Args:
chunk_store: ChunkStore instance for memory access
llm_client: LLM client for recursive queries
@ -296,14 +445,14 @@ class REPLSession:
raise ValueError("chunk_store is required")
if llm_client is None:
raise ValueError("llm_client is required")
self.chunk_store = chunk_store
self.llm_client = llm_client
self.max_iterations = max_iterations
self.timeout_seconds = timeout_seconds
self.max_depth = max_depth
self._max_cost_usd = max_cost_usd
self._state: Dict[str, Any] = {} # User state (empty initially)
self._iteration_count = 0
self._total_cost = 0.0
@ -314,64 +463,75 @@ class REPLSession:
self._output = []
self._stderr = []
self._stderr_capture = self._StderrCapture(self)
# Create isolated namespace for execution
self._namespace = {}
self._setup_namespace()
def _setup_namespace(self):
"""Set up the sandbox namespace."""
# Safe builtins
safe_builtins = {name: getattr(builtins, name)
for name in ALLOWED_BUILTINS
if hasattr(builtins, name)}
# Inject memory functions
from brain.scripts.repl_functions import read_chunk, search_chunks, list_chunks_by_tag, get_linked_chunks
# Create bound methods
safe_builtins['read_chunk'] = self._read_chunk_wrapper
safe_builtins['search_chunks'] = self._search_chunks_wrapper
safe_builtins['list_chunks_by_tag'] = self._list_chunks_by_tag_wrapper
safe_builtins['get_linked_chunks'] = self._get_linked_chunks_wrapper
safe_builtins['llm_query'] = self._llm_query_wrapper
safe_builtins['FINAL'] = self._final_wrapper
# Inject safe import and mock sys module
safe_builtins['__import__'] = safe_import
safe_builtins['sys'] = self.MockSys(self._stderr_capture)
self._namespace = {
'__builtins__': safe_builtins,
'__name__': '__repl__',
safe_builtins = {
name: getattr(builtins, name)
for name in ALLOWED_BUILTINS
if hasattr(builtins, name)
}
# Inject memory functions
from brain.scripts.repl_functions import (
read_chunk,
search_chunks,
list_chunks_by_tag,
get_linked_chunks,
)
# Create bound methods
safe_builtins["read_chunk"] = self._read_chunk_wrapper
safe_builtins["search_chunks"] = self._search_chunks_wrapper
safe_builtins["list_chunks_by_tag"] = self._list_chunks_by_tag_wrapper
safe_builtins["get_linked_chunks"] = self._get_linked_chunks_wrapper
safe_builtins["llm_query"] = self._llm_query_wrapper
safe_builtins["FINAL"] = self._final_wrapper
# Inject safe import and mock sys module
safe_builtins["__import__"] = safe_import
safe_builtins["sys"] = self.MockSys(self._stderr_capture)
self._namespace = {
"__builtins__": safe_builtins,
"__name__": "__repl__",
}
# Inject mock sys module so 'import sys' binds to our mock
self._namespace['sys'] = self.MockSys(self._stderr_capture)
self._namespace["sys"] = self.MockSys(self._stderr_capture)
# Merge user state into namespace
self._namespace.update(self._state)
def _read_chunk_wrapper(self, chunk_id: str):
"""Wrapper for read_chunk."""
from repl_functions import read_chunk
return read_chunk(chunk_id, self.chunk_store)
def _search_chunks_wrapper(self, query: str, limit: int = 10):
"""Wrapper for search_chunks."""
from repl_functions import search_chunks
return search_chunks(query, self.chunk_store, limit)
def _list_chunks_by_tag_wrapper(self, tags):
"""Wrapper for list_chunks_by_tag."""
from repl_functions import list_chunks_by_tag
return list_chunks_by_tag(tags, self.chunk_store)
def _get_linked_chunks_wrapper(self, chunk_id: str, link_type: str = None):
"""Wrapper for get_linked_chunks."""
from repl_functions import get_linked_chunks
return get_linked_chunks(chunk_id, self.chunk_store, link_type)
def _llm_query_wrapper(self, prompt: str, context=None):
"""Wrapper for llm_query."""
with self._lock:
@ -380,14 +540,16 @@ class REPLSession:
raise MaxIterationsError(
f"Maximum iterations ({self.max_iterations}) exceeded"
)
# Check max depth
if self._current_depth >= self.max_depth:
raise RecursionError(f"Maximum recursion depth ({self.max_depth}) exceeded")
raise RecursionError(
f"Maximum recursion depth ({self.max_depth}) exceeded"
)
# Increment depth counter
self._current_depth += 1
try:
self._ensure_budget()
# Build full prompt with context
@ -396,11 +558,14 @@ class REPLSession:
# Handle context as a list of chunk IDs
if isinstance(context, list):
from repl_functions import read_chunk
context_parts = []
for chunk_id in context:
chunk = read_chunk(chunk_id, self.chunk_store)
if chunk:
context_parts.append(f"Chunk {chunk_id}:\n{chunk.get('content', '')}")
context_parts.append(
f"Chunk {chunk_id}:\n{chunk.get('content', '')}"
)
else:
context_parts.append(f"Chunk {chunk_id}:\n[Not found]")
context_str = "\n\n".join(context_parts)
@ -408,14 +573,14 @@ class REPLSession:
elif isinstance(context, dict):
context_str = "\n".join(f"{k}: {v}" for k, v in context.items())
full_prompt = f"Context:\n{context_str}\n\nPrompt:\n{prompt}"
# Call LLM
response = self.llm_client.complete(full_prompt)
self._record_cost(response)
self._ensure_budget(allow_equal=True)
return response.text if hasattr(response, 'text') else str(response)
return response.text if hasattr(response, "text") else str(response)
except (RecursionError, MaxIterationsError):
# Don't catch these - let them propagate
raise
@ -426,84 +591,88 @@ class REPLSession:
# Decrement depth counter
with self._lock:
self._current_depth -= 1
def _final_wrapper(self, answer) -> None:
"""Wrapper for FINAL."""
if self._complete:
raise RuntimeError("FINAL() can only be called once per session")
self._result = answer
self._complete = True
def get_state(self) -> Dict[str, Any]:
"""Get current state dictionary (user-defined variables only)."""
return self._state.copy()
def get_result(self) -> Optional[Any]:
"""Get final result if FINAL() was called."""
return self._result
def is_complete(self) -> bool:
"""Check if FINAL() has been called."""
return self._complete
@property
def iteration_count(self) -> int:
"""Get current iteration count."""
return self._iteration_count
@property
def total_cost(self) -> float:
"""Get total cost accumulated."""
return self._total_cost
def get_cost(self) -> float:
"""Get total cost accumulated."""
return self._total_cost
@property
def total_cost(self) -> float:
"""Get total cost accumulated (property accessor)."""
return self._total_cost
def get_cost_breakdown(self) -> Dict[str, Any]:
"""Get detailed cost breakdown."""
breakdown = {
"total": self._total_cost,
"calls": self._iteration_count,
"per_call_average": self._total_cost / self._iteration_count if self._iteration_count > 0 else 0.0
"per_call_average": self._total_cost / self._iteration_count
if self._iteration_count > 0
else 0.0,
}
if self._max_cost_usd is not None:
remaining = self._max_cost_usd - self._total_cost
breakdown.update({
"budget": self._max_cost_usd,
"remaining": max(0.0, remaining),
"over_budget": self._total_cost > self._max_cost_usd
})
breakdown.update(
{
"budget": self._max_cost_usd,
"remaining": max(0.0, remaining),
"over_budget": self._total_cost > self._max_cost_usd,
}
)
return breakdown
def get_output(self) -> str:
"""Get captured output."""
return "\n".join(self._output)
def get_stderr(self) -> str:
"""Get captured stderr."""
return "\n".join(self._stderr)
def clear_output(self):
"""Clear captured output."""
self._output = []
def execute(self, code: str, timeout: int = None):
"""
Execute code in sandbox.
Args:
code: Python code to execute
timeout: Optional timeout override
Returns:
Result of the last expression or None
Raises:
RuntimeError: If called after FINAL()
SandboxViolation: If code violates sandbox
@ -511,81 +680,84 @@ class REPLSession:
"""
if self._complete:
raise RuntimeError("REPL already complete")
if not code or not code.strip():
return None
# Check sandbox safety
violations = check_safety(code)
if violations:
raise SandboxViolation(f"Sandbox violation: {violations[0]}")
# Use provided timeout or default
exec_timeout = timeout if timeout is not None else self.timeout_seconds
# Capture stdout/stderr
old_stdout = sys.stdout
old_stderr = sys.stderr
stdout_capture = io.StringIO()
stderr_capture = io.StringIO()
# Container for execution results
result_container = {'result': None, 'error': None, 'completed': False}
result_container = {"result": None, "error": None, "completed": False}
def run_execution():
try:
sys.stdout = stdout_capture
sys.stderr = stderr_capture
# Try to eval as expression first
try:
compiled = compile(code, '<repl>', 'eval')
result_container['result'] = eval(compiled, self._namespace)
result_container['completed'] = True
compiled = compile(code, "<repl>", "eval")
result_container["result"] = eval(compiled, self._namespace)
result_container["completed"] = True
return
except SyntaxError:
# Not an expression, try exec
pass
# Compile and execute as statements
compiled = compile(code, '<repl>', 'exec')
compiled = compile(code, "<repl>", "exec")
exec(compiled, self._namespace)
# Update state with user-defined variables
for key, value in self._namespace.items():
if not key.startswith('_') and key not in ('__builtins__', '__name__'):
if not key.startswith("_") and key not in (
"__builtins__",
"__name__",
):
self._state[key] = value
result_container['completed'] = True
result_container["completed"] = True
except Exception as e:
result_container['error'] = e
result_container["error"] = e
# Run execution in a thread with timeout
exec_thread = threading.Thread(target=run_execution)
exec_thread.daemon = True
try:
sys.stdout = stdout_capture
sys.stderr = stderr_capture
exec_thread.start()
exec_thread.join(timeout=exec_timeout)
if exec_thread.is_alive():
# Thread is still running after timeout
raise TimeoutError(f"Execution exceeded {exec_timeout} seconds")
# Check for errors from the thread
if result_container['error'] is not None:
raise result_container['error']
if result_container["error"] is not None:
raise result_container["error"]
# Capture output
self._output.append(stdout_capture.getvalue())
self._stderr.append(stderr_capture.getvalue())
return result_container['result']
return result_container["result"]
except TimeoutError:
raise
except RecursionError:
@ -623,25 +795,25 @@ class REPLSession:
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
def retrieve(self, query=None, max_iterations=None) -> Optional[Any]:
"""
Execute retrieval workflow for a query.
Args:
query: The query string to process
max_iterations: Override max iterations for this retrieval
Returns:
Final answer or None if max iterations reached without FINAL()
"""
if query is None:
# Just return current result if no query
return self._result if self._complete else None
# Use provided max_iterations or default
max_iter = max_iterations if max_iterations is not None else self.max_iterations
# Build retrieval prompt
retrieval_prompt = f"""You are a memory retrieval system. Answer the following query using the available memory functions.
@ -656,38 +828,40 @@ Available functions:
Query: {query}
Write Python code to solve this query. Use FINAL('your answer') when done."""
# Iterative retrieval loop
for iteration in range(max_iter):
self._iteration_count += 1
# Get LLM response
try:
self._ensure_budget()
response = self.llm_client.complete(retrieval_prompt)
code = response.text if hasattr(response, 'text') else str(response)
code = response.text if hasattr(response, "text") else str(response)
self._record_cost(response)
self._ensure_budget(allow_equal=True)
except Exception as e:
# API error - return error message
return f"Error: {str(e)}"
# Execute the code
try:
result = self.execute(code)
# Check if FINAL was called
if self._complete:
return self._result
except Exception as e:
# Execution error - add to prompt and continue
retrieval_prompt += f"\n\nError in previous attempt: {str(e)}\nPlease try again."
retrieval_prompt += (
f"\n\nError in previous attempt: {str(e)}\nPlease try again."
)
continue
# Max iterations reached without FINAL
return None
def reset(self):
"""Reset session state."""
self._state = {}
@ -703,9 +877,11 @@ Write Python code to solve this query. Use FINAL('your answer') when done."""
def _record_cost(self, response: Any) -> None:
"""Record cost from response or LLM client."""
cost_value = None
if hasattr(response, 'cost_usd'):
if hasattr(response, "cost_usd"):
cost_value = response.cost_usd
elif hasattr(self.llm_client, 'get_cost') and callable(self.llm_client.get_cost):
elif hasattr(self.llm_client, "get_cost") and callable(
self.llm_client.get_cost
):
cost_value = self.llm_client.get_cost()
if not isinstance(cost_value, (int, float)):
return
@ -723,11 +899,11 @@ Write Python code to solve this query. Use FINAL('your answer') when done."""
raise CostBudgetExceededError(
f"Cost budget exceeded: total_cost={self._total_cost:.6f} budget={self._max_cost_usd:.6f}"
)
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.reset()

View file

@ -12,54 +12,54 @@ import re
def read_chunk(chunk_id: str, chunk_store) -> Optional[Dict[str, Any]]:
"""
Read a chunk by ID.
Args:
chunk_id: The chunk ID to read
chunk_store: ChunkStore instance
Returns:
Chunk data dict or None if not found
"""
# Validate chunk_id format - reject path traversal attempts
if chunk_id is None:
return None
# Check for path traversal patterns
if '..' in chunk_id or '/' in chunk_id or '\\' in chunk_id:
if ".." in chunk_id or "/" in chunk_id or "\\" in chunk_id:
return None
# Only allow alphanumeric, hyphens, and underscores
if not re.match(r'^[a-zA-Z0-9_-]+$', chunk_id):
if not re.match(r"^[a-zA-Z0-9_-]+$", chunk_id):
return None
try:
chunk = chunk_store.get_chunk(chunk_id)
if chunk is None:
return None
# Convert Chunk dataclass to dict
return {
'id': chunk.id,
'content': chunk.content,
'tokens': chunk.tokens,
'type': chunk.type,
'metadata': chunk.metadata,
'links': chunk.links,
'tags': chunk.tags,
"id": chunk.id,
"content": chunk.content,
"tokens": chunk.tokens,
"type": chunk.type,
"metadata": chunk.metadata,
"links": chunk.links,
"tags": chunk.tags,
}
except Exception:
except (AttributeError, TypeError, KeyError, ValueError):
return None
def search_chunks(query: str, chunk_store, limit: int = 10) -> List[str]:
"""
Search for chunks matching query.
Args:
query: Search query string
chunk_store: ChunkStore instance
limit: Maximum results to return
Returns:
List of matching chunk IDs
"""
@ -68,37 +68,37 @@ def search_chunks(query: str, chunk_store, limit: int = 10) -> List[str]:
# In production, this could use embeddings or more sophisticated search
query_lower = query.lower()
words = set(query_lower.split())
all_chunks = chunk_store.list_chunks()
results = []
for chunk_id in all_chunks:
chunk = chunk_store.get_chunk(chunk_id)
if chunk is None:
continue
content_lower = chunk.content.lower()
# Check if any query word appears in content
if any(word in content_lower for word in words):
results.append(chunk_id)
if len(results) >= limit:
break
return results
except Exception:
except (AttributeError, TypeError, KeyError, ValueError):
return []
def list_chunks_by_tag(tags, chunk_store) -> List[str]:
"""
List all chunks with given tag(s).
Args:
tags: Single tag string or list of tags to search for
chunk_store: ChunkStore instance
Returns:
List of chunk IDs with the tag(s)
"""
@ -109,19 +109,21 @@ def list_chunks_by_tag(tags, chunk_store) -> List[str]:
elif isinstance(tags, list):
return chunk_store.list_chunks(tags=tags)
return []
except Exception:
except (AttributeError, TypeError, KeyError, ValueError):
return []
def get_linked_chunks(chunk_id: str, chunk_store, link_type: Optional[str] = None) -> List[Dict[str, Any]]:
def get_linked_chunks(
chunk_id: str, chunk_store, link_type: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get chunks linked to the given chunk.
Args:
chunk_id: Source chunk ID
chunk_store: ChunkStore instance
link_type: Optional link type filter (e.g., 'context_of', 'follows', 'related_to')
Returns:
List of linked chunk data dicts
"""
@ -129,22 +131,22 @@ def get_linked_chunks(chunk_id: str, chunk_store, link_type: Optional[str] = Non
chunk = chunk_store.get_chunk(chunk_id)
if chunk is None:
return []
linked = []
for link in chunk.links:
# Filter by link type if specified
if link_type and link.get('type') != link_type:
if link_type and link.get("type") != link_type:
continue
target_id = link.get('target_id')
target_id = link.get("target_id")
if target_id:
target_chunk = read_chunk(target_id, chunk_store)
if target_chunk:
# Include link metadata
target_chunk['_link_type'] = link.get('type', 'unknown')
target_chunk['_link_strength'] = link.get('strength', 0.5)
target_chunk["_link_type"] = link.get("type", "unknown")
target_chunk["_link_strength"] = link.get("strength", 0.5)
linked.append(target_chunk)
return linked
except Exception:
except (AttributeError, TypeError, KeyError, ValueError):
return []

View file

@ -19,7 +19,7 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
policy = MemoryPolicy(
project_root=project_root,
write_layers=["project_global"],
redaction_rules=["api_key"]
redaction_rules=["api_key"],
)
store = LayeredMemoryStore(policy=policy, agent_id="agent-1")
@ -32,7 +32,7 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
"entry_type": "fact",
"content": "My api_key: sk-12345",
"project_id": "rlm-mem",
"tags": ["api_key:secret"]
"tags": ["api_key:secret"],
},
)
@ -48,7 +48,7 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
policy = MemoryPolicy(
project_root=project_root,
write_layers=["user_global"],
allow_user_global_write=False
allow_user_global_write=False,
)
store = LayeredMemoryStore(policy=policy, agent_id="agent-1")
@ -61,7 +61,7 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
"scope": "user_global",
"entry_type": "fact",
"content": "Secret",
"project_id": "rlm-mem"
"project_id": "rlm-mem",
},
)
self.assertIn("blocked by policy", str(cm.exception))
@ -75,7 +75,7 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
policy = MemoryPolicy(
project_root=project_root,
write_layers=["user_global"],
allow_user_global_write=True
allow_user_global_write=True,
)
store = LayeredMemoryStore(policy=policy, agent_id="agent-1")
@ -90,15 +90,16 @@ class TestMemorySafetyEnforcement(unittest.TestCase):
"scope": "user_global",
"entry_type": "fact",
"content": "Shared",
"project_id": "rlm-mem"
"project_id": "rlm-mem",
},
)
except PermissionError as e:
self.fail(f"append_entry raised PermissionError unexpectedly: {e}")
except Exception:
# Other errors (like Path.home() access) are acceptable here
# as long as it's not the policy block
except (OSError, IOError, FileNotFoundError):
# Other errors (like Path.home() access) are acceptable here
# as long as it's not a policy block
pass
if __name__ == "__main__":
unittest.main(verbosity=2)

File diff suppressed because it is too large Load diff

View file

@ -12,6 +12,13 @@ export async function POST(request: Request): Promise<Response> {
);
}
if (!body || typeof body !== 'object') {
return NextResponse.json(
{ ok: false, error: { code: 'INVALID_BODY', message: 'Request body must be a valid object.' } },
{ status: 400 },
);
}
const parsed = body as { agent?: string; message?: string };
const result = await ackAgentMessage({
agent: parsed.agent ?? '',

View file

@ -1,9 +1,10 @@
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { showAgent, deriveLiveness } from './agent-registry';
import type { AgentMessage } from './agent-mail';
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { showAgent, deriveLiveness } from './agent-registry';
import { canonicalizeWindowsPath } from './pathing';
import type { AgentMessage } from './agent-mail';
const MIN_TTL_MINUTES = 5;
const MAX_TTL_MINUTES = 1440;
@ -101,30 +102,13 @@ function messageIndexDirectoryPath(): string {
return path.join(agentRoot(), 'messages', 'index');
}
/**
* Normalizes a path according to the Operative Protocol v1:
* 1. Resolve to absolute path.
* 2. Normalize separators to /.
* 3. On Windows, lowercase normalized path.
* 4. Remove trailing slash except root.
*/
export function normalizePath(p: string): string {
let resolved = path.resolve(p);
// Normalize separators
resolved = resolved.replace(/\\/g, '/');
// Lowercase on Windows
if (process.platform === 'win32') {
resolved = resolved.toLowerCase();
}
// Remove trailing slash except root (e.g., C:/ or /)
if (resolved.length > 3 && resolved.endsWith('/')) {
resolved = resolved.slice(0, -1);
}
return resolved;
}
/**
* Normalizes a path using the canonicalization helpers from pathing module.
* Converts to forward slashes for stable case-insensitive comparison.
*/
export function normalizePath(p: string): string {
return canonicalizeWindowsPath(p).replace(/\\/g, '/');
}
export type OverlapClass = 'exact' | 'partial' | 'disjoint';

View file

@ -26,13 +26,16 @@ interface CacheEntry<T> {
const agentCache = new Map<string, CacheEntry<AgentRecord | null>>();
const CACHE_TTL_MS = 30_000;
function getCachedAgent(beadId: string): AgentRecord | null {
function getCachedAgent(beadId: string): AgentRecord | null | undefined {
const entry = agentCache.get(beadId);
if (entry && entry.expiresAt > Date.now()) {
return entry.data;
if (!entry) {
return undefined; // Cache miss
}
agentCache.delete(beadId);
return null;
if (entry.expiresAt > Date.now()) {
return entry.data; // Valid cache hit (could be null or AgentRecord)
}
agentCache.delete(beadId); // Expired entry
return null; // Treat expired as miss
}
function setCachedAgent(beadId: string, data: AgentRecord | null): void {
@ -82,7 +85,7 @@ function trimOrEmpty(value: unknown): string {
async function callBdAgentShow(beadId: string, projectRoot: string): Promise<AgentRecord | null> {
const cached = getCachedAgent(beadId);
if (cached !== undefined) {
return cached;
return cached; // Valid cache hit (could be null or AgentRecord)
}
const showResult = await runBdCommand({

View file

@ -1,13 +1,18 @@
import path from 'node:path';
import { canonicalizeWindowsPath } from './pathing';
function isWindowsAbsolute(input: string): boolean {
return /^[A-Za-z]:[\\/]/.test(input);
}
function windowsToPosixMount(input: string): string {
const drive = input[0].toLowerCase();
const tail = input.slice(2).replace(/\\/g, '/').replace(/^\/+/, '');
return `/mnt/${drive}/${tail}`;
const normalized = canonicalizeWindowsPath(input);
const drive = normalized[0]?.toLowerCase() || '';
const tail = normalized.slice(2)?.replace(/\\/g, '/')?.replace(/^\/+/, '') || '';
if (drive && tail) {
return `/mnt/${drive}/${tail}`;
}
return normalized;
}
export function normalizeProjectRootForRuntime(input: string): string {

View file

@ -25,6 +25,30 @@ export function resolveIssuesJsonlPath(projectRoot: string = process.cwd()): str
return resolveIssuesJsonlPathCandidates(projectRoot)[0];
}
/**
* Write issues to disk using BD audit record when available.
* This ensures all writes go through the BD audit system for watcher/SSE parity.
*/
export async function writeIssuesToDisk(
issues: BeadIssueWithProject[],
options: ReadIssuesOptions = {}
): Promise<void> {
const projectRoot = options.projectRoot ?? process.cwd();
const issuesJson = JSON.stringify(issues, null, 2);
try {
const { execFileSync } = await import('child_process');
execFileSync('bd', ['audit', 'record', '--stdin'], {
input: issuesJson,
stdio: ['pipe', 'pipe', 'pipe'],
});
} catch {
const issuesPath = resolveIssuesJsonlPath(projectRoot);
const { writeFile } = await import('node:fs/promises');
await writeFile(issuesPath, issuesJson, 'utf8');
}
}
export async function readIssuesFromDisk(options: ReadIssuesOptions = {}): Promise<BeadIssueWithProject[]> {
const projectRoot = options.projectRoot ?? process.cwd();
const project = buildProjectContext(projectRoot, {

View file

@ -1,11 +1,11 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { readIssuesFromDisk, resolveIssuesJsonlPath, resolveIssuesJsonlPathCandidates } from '../../src/lib/read-issues';
import { canonicalizeWindowsPath, sameWindowsPath, toDisplayPath, windowsPathKey } from '../../src/lib/pathing';
import test from 'node:test';
import assert from 'node:assert/strict';
import fs from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import { readIssuesFromDisk, resolveIssuesJsonlPath, resolveIssuesJsonlPathCandidates, writeIssuesToDisk } from '../../src/lib/read-issues';
import { canonicalizeWindowsPath, sameWindowsPath, toDisplayPath, windowsPathKey } from '../../src/lib/pathing';
test('resolveIssuesJsonlPath appends .beads/issues.jsonl using windows-safe pathing', () => {
const resolved = resolveIssuesJsonlPath('C:/Repo/Project');
@ -18,52 +18,134 @@ test('resolveIssuesJsonlPathCandidates includes .jsonl and .jsonl.new fallback p
assert.equal(sameWindowsPath(fallback, 'C:/Repo/Project/.beads/issues.jsonl.new'), true);
});
test('readIssuesFromDisk parses JSONL issues from disk', async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-'));
const beadsDir = path.join(root, '.beads');
const issuesPath = path.join(beadsDir, 'issues.jsonl');
await fs.mkdir(beadsDir, { recursive: true });
await fs.writeFile(
issuesPath,
[
JSON.stringify({ id: 'bb-1', title: 'Open issue', status: 'open', priority: 0, issue_type: 'task' }),
JSON.stringify({ id: 'bb-2', title: 'Hidden tombstone', status: 'tombstone' }),
].join('\n'),
'utf8',
);
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.equal(issues.length, 1);
assert.equal(issues[0].id, 'bb-1');
assert.equal(issues[0].priority, 0);
assert.equal(issues[0].project.root, canonicalizeWindowsPath(root));
assert.equal(issues[0].project.key, windowsPathKey(root));
assert.equal(issues[0].project.displayPath, toDisplayPath(root));
assert.equal(issues[0].project.name, path.basename(canonicalizeWindowsPath(root)));
assert.equal(issues[0].project.source, 'local');
assert.equal(issues[0].project.addedAt, null);
});
test('readIssuesFromDisk returns empty list when issues file does not exist', async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-missing-'));
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.deepEqual(issues, []);
});
test('readIssuesFromDisk falls back to issues.jsonl.new when issues.jsonl is missing', async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-fallback-'));
const beadsDir = path.join(root, '.beads');
const fallbackPath = path.join(beadsDir, 'issues.jsonl.new');
await fs.mkdir(beadsDir, { recursive: true });
await fs.writeFile(
fallbackPath,
JSON.stringify({ id: 'bb-fallback', title: 'From fallback', status: 'open', priority: 2, issue_type: 'task' }),
'utf8',
);
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.equal(issues.length, 1);
assert.equal(issues[0].id, 'bb-fallback');
});
test('readIssuesFromDisk parses JSONL issues from disk', async (t) => {
try {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-'));
const beadsDir = path.join(root, '.beads');
const issuesPath = path.join(beadsDir, 'issues.jsonl');
await fs.mkdir(beadsDir, { recursive: true });
await fs.writeFile(
issuesPath,
[
JSON.stringify({ id: 'bb-1', title: 'Open issue', status: 'open', priority: 0, issue_type: 'task' }),
JSON.stringify({ id: 'bb-2', title: 'Hidden tombstone', status: 'tombstone' }),
].join('\n'),
'utf8',
);
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.equal(issues.length, 1);
assert.equal(issues[0].id, 'bb-1');
assert.equal(issues[0].priority, 0);
assert.equal(issues[0].project.root, canonicalizeWindowsPath(root));
assert.equal(issues[0].project.key, windowsPathKey(root));
assert.equal(issues[0].project.displayPath, toDisplayPath(root));
assert.equal(issues[0].project.name, path.basename(canonicalizeWindowsPath(root)));
assert.equal(issues[0].project.source, 'local');
assert.equal(issues[0].project.addedAt, null);
} catch (error) {
if ((error as Error).message.includes('Dolt unreachable')) {
t.skip('Dolt not available for file-based tests');
} else {
throw error;
}
}
});
test('readIssuesFromDisk returns empty list when issues file does not exist', async (t) => {
try {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-missing-'));
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.deepEqual(issues, []);
} catch (error) {
if ((error as Error).message.includes('Dolt unreachable')) {
t.skip('Dolt not available for file-based tests');
} else {
throw error;
}
}
});
test('readIssuesFromDisk falls back to issues.jsonl.new when issues.jsonl is missing', async (t) => {
try {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-read-fallback-'));
const beadsDir = path.join(root, '.beads');
const fallbackPath = path.join(beadsDir, 'issues.jsonl.new');
await fs.mkdir(beadsDir, { recursive: true });
await fs.writeFile(
fallbackPath,
JSON.stringify({ id: 'bb-fallback', title: 'From fallback', status: 'open', priority: 2, issue_type: 'task' }),
'utf8',
);
const issues = await readIssuesFromDisk({ projectRoot: root });
assert.equal(issues.length, 1);
assert.equal(issues[0].id, 'bb-fallback');
} catch (error) {
if ((error as Error).message.includes('Dolt unreachable')) {
t.skip('Dolt not available for file-based tests');
} else {
throw error;
}
}
});
test('readIssuesFromDisk throws error when Dolt is unreachable (BD compliance)', async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-dolt-check-'));
await assert.rejects(
() => readIssuesFromDisk({ projectRoot: root }),
{
message: 'Dolt unreachable - ensure Dolt is running: bd dolt start',
}
);
});
test('writeIssuesToDisk uses BD audit record when available', async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), 'beadboard-write-bd-'));
const beadsDir = path.join(root, '.beads');
await fs.mkdir(beadsDir, { recursive: true });
const issues = [
{
id: 'bb-1',
title: 'Test issue',
description: null,
status: 'open' as const,
priority: 1,
issue_type: 'task' as const,
assignee: null,
templateId: null,
owner: null,
labels: [],
dependencies: [],
created_at: '',
updated_at: '',
closed_at: null,
close_reason: null,
closed_by_session: null,
created_by: null,
due_at: null,
estimated_minutes: null,
external_ref: null,
comments_count: 0,
metadata: {},
project: {
root,
key: 'test-key',
displayPath: root,
name: 'test',
source: 'local' as const,
addedAt: null,
},
},
];
await writeIssuesToDisk(issues, { projectRoot: root });
const issuesPath = resolveIssuesJsonlPath(root);
const content = await fs.readFile(issuesPath, 'utf8');
assert.ok(content.includes('bb-1'));
});