docs+skills: add main UI/UX visual-truth PRD and skill links
This commit is contained in:
parent
1c36223e7f
commit
14a50ad4ae
289 changed files with 54463 additions and 0 deletions
450
.agents/skills/rlm-mem/brain/scripts/reason_operation.py
Normal file
450
.agents/skills/rlm-mem/brain/scripts/reason_operation.py
Normal file
|
|
@ -0,0 +1,450 @@
|
|||
"""
|
||||
RLM-MEM - REASON Operation (D3.3)
|
||||
High-level memory analysis and synthesis using RLM.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict, Any
|
||||
import time
|
||||
|
||||
# Handle both relative and direct imports
|
||||
try:
|
||||
from brain.scripts.memory_store import ChunkStore
|
||||
from brain.scripts.recall_operation import RecallOperation, RecallResult
|
||||
except ImportError:
|
||||
from memory_store import ChunkStore
|
||||
from recall_operation import RecallOperation, RecallResult
|
||||
|
||||
|
||||
@dataclass
|
||||
class ReasonResult:
|
||||
"""Result of a REASON operation."""
|
||||
synthesis: str
|
||||
insights: List[str] = field(default_factory=list)
|
||||
evidence: Dict[str, List[str]] = field(default_factory=dict)
|
||||
contradictions: List[Dict[str, Any]] = field(default_factory=list)
|
||||
confidence: float = 0.0
|
||||
source_chunks: List[str] = field(default_factory=list)
|
||||
iterations_used: int = 0
|
||||
cost_usd: float = 0.0
|
||||
|
||||
|
||||
class ReasonOperation:
|
||||
"""
|
||||
High-level REASON operation for memory analysis and synthesis.
|
||||
|
||||
Uses RLM to:
|
||||
- Analyze patterns across memories
|
||||
- Synthesize insights from multiple sources
|
||||
- Identify contradictions or gaps
|
||||
- Generate conclusions with evidence
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
chunk_store: ChunkStore,
|
||||
llm_client=None,
|
||||
max_iterations: int = 10
|
||||
):
|
||||
"""
|
||||
Initialize REASON operation.
|
||||
|
||||
Args:
|
||||
chunk_store: Storage backend
|
||||
llm_client: LLM for reasoning
|
||||
max_iterations: Maximum analysis iterations
|
||||
"""
|
||||
if chunk_store is None:
|
||||
raise ValueError("chunk_store is required")
|
||||
|
||||
self.chunk_store = chunk_store
|
||||
self.llm_client = llm_client
|
||||
self.max_iterations = max_iterations
|
||||
|
||||
# Initialize recall for gathering evidence
|
||||
self._recall = None
|
||||
if llm_client is not None:
|
||||
self._recall = RecallOperation(
|
||||
chunk_store=chunk_store,
|
||||
llm_client=llm_client,
|
||||
max_iterations=max_iterations
|
||||
)
|
||||
|
||||
def reason(
|
||||
self,
|
||||
query: str,
|
||||
context_chunks: List[str] = None,
|
||||
analysis_type: str = "synthesis"
|
||||
) -> ReasonResult:
|
||||
"""
|
||||
Perform reasoning analysis on memories.
|
||||
"""
|
||||
if not query or not query.strip():
|
||||
return ReasonResult(
|
||||
synthesis="No query provided",
|
||||
confidence=0.0
|
||||
)
|
||||
|
||||
# Gather evidence
|
||||
if context_chunks:
|
||||
evidence = self._gather_evidence(context_chunks)
|
||||
else:
|
||||
evidence = self._search_evidence(query)
|
||||
|
||||
if not evidence:
|
||||
return ReasonResult(
|
||||
synthesis="No relevant evidence found for analysis",
|
||||
confidence=0.0
|
||||
)
|
||||
|
||||
# 1. Always check for contradictions in evidence
|
||||
contradictions = self._detect_contradictions(evidence["chunks"])
|
||||
|
||||
# 2. Perform analysis based on type
|
||||
if analysis_type == "synthesis":
|
||||
result = self._synthesize(query, evidence)
|
||||
elif analysis_type == "comparison":
|
||||
result = self._compare(query, evidence)
|
||||
elif analysis_type == "pattern":
|
||||
result = self._find_patterns(query, evidence)
|
||||
elif analysis_type == "gap":
|
||||
result = self._identify_gaps(query, evidence)
|
||||
else:
|
||||
result = self._synthesize(query, evidence)
|
||||
|
||||
# 3. Ensure contradictions are attached
|
||||
if contradictions and not result.contradictions:
|
||||
result.contradictions = contradictions
|
||||
if "Identified" not in "".join(result.insights):
|
||||
result.insights.append(f"Identified {len(contradictions)} potential conflicts in memory")
|
||||
|
||||
return result
|
||||
|
||||
def _gather_evidence(self, chunk_ids: List[str]) -> Dict[str, Any]:
|
||||
"""Gather evidence from specific chunks."""
|
||||
evidence = {
|
||||
"chunks": [],
|
||||
"tags": set(),
|
||||
"types": set()
|
||||
}
|
||||
|
||||
for chunk_id in chunk_ids:
|
||||
chunk = self.chunk_store.get_chunk(chunk_id)
|
||||
if chunk:
|
||||
evidence["chunks"].append(chunk)
|
||||
evidence["tags"].update(chunk.tags)
|
||||
evidence["types"].add(chunk.type)
|
||||
|
||||
evidence["tags"] = list(evidence["tags"])
|
||||
evidence["types"] = list(evidence["types"])
|
||||
|
||||
return evidence
|
||||
|
||||
def _search_evidence(self, query: str) -> Dict[str, Any]:
|
||||
"""Search for relevant evidence."""
|
||||
# Use recall to find relevant chunks
|
||||
if self._recall is None:
|
||||
# Fallback to basic search
|
||||
chunk_ids = self.chunk_store.list_chunks()
|
||||
return self._gather_evidence(chunk_ids[:10])
|
||||
|
||||
recall_result = self._recall.recall(query, max_results=10)
|
||||
return self._gather_evidence(recall_result.source_chunks)
|
||||
|
||||
def _synthesize(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
|
||||
"""Synthesize insights from evidence with contradiction surfacing."""
|
||||
chunks = evidence["chunks"]
|
||||
|
||||
# 1. Sort chunks by confidence and recency (if available)
|
||||
def chunk_sort_key(c):
|
||||
conf = getattr(c.metadata, 'confidence', 0.5)
|
||||
# Try to get timestamp for recency boost
|
||||
ts = 0.0
|
||||
try:
|
||||
created = getattr(c.metadata, 'created', "")
|
||||
if created:
|
||||
from datetime import datetime
|
||||
ts = datetime.fromisoformat(created.replace("Z", "+00:00")).timestamp()
|
||||
except Exception:
|
||||
pass
|
||||
return (conf, ts)
|
||||
|
||||
sorted_chunks = sorted(chunks, key=chunk_sort_key, reverse=True)
|
||||
|
||||
# 2. Extract unique contents
|
||||
seen_contents = set()
|
||||
unique_chunks = []
|
||||
for chunk in sorted_chunks:
|
||||
# Simple deduplication based on content normalization
|
||||
norm_content = " ".join(chunk.content.lower().split())
|
||||
if norm_content not in seen_contents:
|
||||
seen_contents.add(norm_content)
|
||||
unique_chunks.append(chunk)
|
||||
|
||||
# 3. Detect contradictions
|
||||
contradictions = self._detect_contradictions(unique_chunks)
|
||||
|
||||
# 4. Build synthesis
|
||||
contents = [c.content for c in unique_chunks]
|
||||
if not contents:
|
||||
return ReasonResult(
|
||||
synthesis="No content to synthesize",
|
||||
confidence=0.0
|
||||
)
|
||||
|
||||
synthesis = self._build_synthesis(query, contents)
|
||||
|
||||
# 5. Extract insights
|
||||
insights = self._extract_insights(contents)
|
||||
if contradictions:
|
||||
insights.append(f"Identified {len(contradictions)} potential conflicts in memory")
|
||||
|
||||
# 6. Calculate aggregate confidence
|
||||
avg_confidence = sum(
|
||||
getattr(c.metadata, 'confidence', 0.7) for c in unique_chunks
|
||||
) / len(unique_chunks) if unique_chunks else 0.0
|
||||
|
||||
return ReasonResult(
|
||||
synthesis=synthesis,
|
||||
insights=insights,
|
||||
evidence={"sources": [c.id for c in unique_chunks]},
|
||||
contradictions=contradictions,
|
||||
confidence=avg_confidence,
|
||||
source_chunks=[c.id for c in unique_chunks],
|
||||
iterations_used=1
|
||||
)
|
||||
|
||||
def _build_synthesis(self, query: str, contents: List[str]) -> str:
|
||||
"""Build structured synthesis text."""
|
||||
if not contents:
|
||||
return "No information available"
|
||||
|
||||
# Improved synthesis: summary header + ranked list
|
||||
synthesis_parts = [f"Synthesized analysis for: \"{query}\"", ""]
|
||||
synthesis_parts.append(f"Based on {len(contents)} unique sources (ranked by relevance):")
|
||||
for i, content in enumerate(contents[:7], 1):
|
||||
# Clean up content for list display
|
||||
clean_content = content.replace("\n", " ").strip()
|
||||
synthesis_parts.append(f" {i}. {clean_content}")
|
||||
|
||||
if len(contents) > 7:
|
||||
synthesis_parts.append(f" ... and {len(contents) - 7} other supporting memories.")
|
||||
|
||||
return "\n".join(synthesis_parts)
|
||||
|
||||
def _detect_contradictions(self, chunks: List[Any]) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Identify potential conflicts across memory chunks using non-LLM heuristics.
|
||||
"""
|
||||
conflicts = []
|
||||
|
||||
# 1. Group by tag/topic
|
||||
topic_groups = {}
|
||||
for chunk in chunks:
|
||||
for tag in chunk.tags:
|
||||
if tag not in topic_groups:
|
||||
topic_groups[tag] = []
|
||||
topic_groups[tag].append(chunk)
|
||||
|
||||
# 2. Check for opposite sentiments/values within the same tag
|
||||
# Heuristic: "prefer X" vs "prefer Y" or "not X" vs "is X"
|
||||
NEGATIONS = {"not", "don't", "dislike", "hate", "avoid", "stop"}
|
||||
|
||||
for tag, group in topic_groups.items():
|
||||
if len(group) < 2:
|
||||
continue
|
||||
|
||||
# Simple pair-wise comparison
|
||||
for i in range(len(group)):
|
||||
for j in range(i + 1, len(group)):
|
||||
c1, c2 = group[i], group[j]
|
||||
|
||||
# Heuristic: If both talk about "prefer" but have different words
|
||||
# e.g. "prefer dark mode" vs "prefer light mode"
|
||||
c1_words = set(c1.content.lower().split())
|
||||
c2_words = set(c2.content.lower().split())
|
||||
|
||||
if ("prefer" in c1_words or "prefers" in c1_words) and ("prefer" in c2_words or "prefers" in c2_words):
|
||||
# Significant difference in specific preference
|
||||
if len(c1_words ^ c2_words) >= 2:
|
||||
conflicts.append({
|
||||
"type": "potential_preference_conflict",
|
||||
"topic": tag,
|
||||
"chunks": [c1.id, c2.id],
|
||||
"reason": f"Divergent preferences detected for topic '{tag}'"
|
||||
})
|
||||
|
||||
# Check for explicit negation
|
||||
# If one has a negation word and the other doesn't for the same tag
|
||||
c1_negated = any(n in c1_words for n in NEGATIONS)
|
||||
c2_negated = any(n in c2_words for n in NEGATIONS)
|
||||
|
||||
if c1_negated != c2_negated:
|
||||
conflicts.append({
|
||||
"type": "negation_conflict",
|
||||
"topic": tag,
|
||||
"chunks": [c1.id, c2.id],
|
||||
"reason": f"Opposing sentiments detected for topic '{tag}'"
|
||||
})
|
||||
|
||||
# Deduplicate conflicts
|
||||
unique_conflicts = []
|
||||
seen_pairs = set()
|
||||
for c in conflicts:
|
||||
pair = tuple(sorted(c["chunks"]))
|
||||
if pair not in seen_pairs:
|
||||
seen_pairs.add(pair)
|
||||
unique_conflicts.append(c)
|
||||
|
||||
return unique_conflicts
|
||||
|
||||
def _extract_insights(self, contents: List[str]) -> List[str]:
|
||||
"""Extract key insights from contents."""
|
||||
insights = []
|
||||
|
||||
# Simple insight extraction - look for patterns
|
||||
for content in contents:
|
||||
if "prefer" in content.lower():
|
||||
insights.append(f"Preference identified: {content[:100]}...")
|
||||
if "like" in content.lower():
|
||||
insights.append(f"Positive sentiment: {content[:100]}...")
|
||||
|
||||
# Remove duplicates while preserving order
|
||||
seen = set()
|
||||
unique_insights = []
|
||||
for insight in insights:
|
||||
if insight not in seen:
|
||||
seen.add(insight)
|
||||
unique_insights.append(insight)
|
||||
|
||||
return unique_insights[:5] # Top 5 insights
|
||||
|
||||
def _compare(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
|
||||
"""Compare different pieces of evidence."""
|
||||
chunks = evidence["chunks"]
|
||||
|
||||
if len(chunks) < 2:
|
||||
return ReasonResult(
|
||||
synthesis="Need at least 2 items to compare",
|
||||
confidence=0.0
|
||||
)
|
||||
|
||||
# Build comparison
|
||||
comparison_parts = [f"Comparison Analysis: \"{query}\"", ""]
|
||||
for i, chunk in enumerate(chunks, 1):
|
||||
comparison_parts.append(f" Option {i}: {chunk.content}")
|
||||
|
||||
synthesis = "\n".join(comparison_parts)
|
||||
|
||||
return ReasonResult(
|
||||
synthesis=synthesis,
|
||||
insights=[f"Comparing {len(chunks)} distinct sources"],
|
||||
confidence=0.7,
|
||||
source_chunks=[chunk.id for chunk in chunks]
|
||||
)
|
||||
|
||||
def _find_patterns(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
|
||||
"""Find patterns across evidence."""
|
||||
chunks = evidence["chunks"]
|
||||
tags = evidence.get("tags", [])
|
||||
types = evidence.get("types", [])
|
||||
|
||||
insights = []
|
||||
|
||||
# Pattern: Common tags
|
||||
if tags:
|
||||
insights.append(f"Common themes: {', '.join(tags[:5])}")
|
||||
|
||||
# Pattern: Content types
|
||||
if types:
|
||||
insights.append(f"Source types: {', '.join(types)}")
|
||||
|
||||
# Pattern: Temporal (if timestamps available)
|
||||
if chunks:
|
||||
dates = []
|
||||
for c in chunks:
|
||||
d = getattr(c.metadata, 'created', getattr(c.metadata, 'created_at', None))
|
||||
if d: dates.append(d[:10])
|
||||
if dates:
|
||||
insights.append(f"Evidence spans {len(set(dates))} unique days")
|
||||
|
||||
return ReasonResult(
|
||||
synthesis=f"Found {len(insights)} patterns across {len(chunks)} memories",
|
||||
insights=insights,
|
||||
confidence=0.75,
|
||||
source_chunks=[chunk.id for chunk in chunks]
|
||||
)
|
||||
|
||||
def _identify_gaps(self, query: str, evidence: Dict[str, Any]) -> ReasonResult:
|
||||
"""Identify gaps in knowledge."""
|
||||
chunks = evidence["chunks"]
|
||||
|
||||
gaps = []
|
||||
|
||||
# Check for low confidence items
|
||||
low_confidence = [
|
||||
chunk for chunk in chunks
|
||||
if getattr(chunk.metadata, 'confidence', 0.7) < 0.6
|
||||
]
|
||||
if low_confidence:
|
||||
gaps.append(f"{len(low_confidence)} sources have low confidence scores")
|
||||
|
||||
# Check for missing links
|
||||
unlinked = [
|
||||
chunk for chunk in chunks
|
||||
if not getattr(chunk, 'links', None) or (not chunk.links.context_of and not chunk.links.related_to)
|
||||
]
|
||||
if unlinked:
|
||||
gaps.append(f"{len(unlinked)} items are isolated (no graph links)")
|
||||
|
||||
if not gaps:
|
||||
gaps.append("No significant structural gaps identified in the available evidence")
|
||||
|
||||
return ReasonResult(
|
||||
synthesis=f"Knowledge Gap Analysis: {'; '.join(gaps)}",
|
||||
insights=gaps,
|
||||
confidence=0.6,
|
||||
source_chunks=[chunk.id for chunk in chunks]
|
||||
)
|
||||
|
||||
def analyze_contradictions(
|
||||
self,
|
||||
chunk_ids: List[str]
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Analyze chunks for potential contradictions.
|
||||
|
||||
Args:
|
||||
chunk_ids: Chunks to analyze
|
||||
|
||||
Returns:
|
||||
List of potential contradictions
|
||||
"""
|
||||
contradictions = []
|
||||
|
||||
chunks = []
|
||||
for chunk_id in chunk_ids:
|
||||
chunk = self.chunk_store.get_chunk(chunk_id)
|
||||
if chunk:
|
||||
chunks.append(chunk)
|
||||
|
||||
# Simple contradiction detection
|
||||
# Look for chunks with contradicts links
|
||||
for chunk in chunks:
|
||||
if hasattr(chunk.links, 'contradicts') and chunk.links.contradicts:
|
||||
for target_id in chunk.links.contradicts:
|
||||
contradictions.append({
|
||||
"chunk_a": chunk.id,
|
||||
"chunk_b": target_id,
|
||||
"reasoning": "Explicit contradiction link"
|
||||
})
|
||||
|
||||
return contradictions
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get reasoning operation statistics."""
|
||||
return {
|
||||
"total_analyses": 0,
|
||||
"avg_confidence": 0.0,
|
||||
"avg_insights": 0.0
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue