# Architecture **Analysis Date:** 2025-02-23 ## Pattern Overview **Overall:** Event-driven microservices architecture using Redis Streams for inter-service communication. **Key Characteristics:** - 8 independent Python services communicating asynchronously via Redis Streams - Layered abstraction for extensibility: broker abstraction (`BaseBroker`), strategy abstraction (`BaseStrategy`), source abstraction for news - Each service consumes from one or more Redis Streams, processes data, persists to PostgreSQL, and publishes downstream results - Shared libraries provide domain models, database layer, telemetry, and abstractions - React TypeScript frontend connected via WebSocket and REST API ## Layers **Service Layer (Event Processors):** - Purpose: Independent microservices that transform data across the trading pipeline - Location: `services/*/main.py` (entry points) - Contains: RSS/Reddit fetchers, sentiment analysis, signal generation, trade execution, learning engine, market data subscriptions, API gateway - Depends on: `shared/` libraries, external APIs (Alpaca, Reddit, Ollama) - Used by: Each other (via Redis Streams), dashboard (via API gateway) **Infrastructure/Abstraction Layer:** - Purpose: Pluggable interfaces and database access - Location: `shared/broker/`, `shared/strategies/`, `shared/db.py` - Contains: `BaseBroker` (Alpaca implementation), `BaseStrategy` (Momentum/MeanReversion/NewsDriven), database engine setup - Depends on: SQLAlchemy, alpaca-py, external ML models - Used by: Trade executor, signal generator, backtester **Data Layer:** - Purpose: Persistent storage of trades, signals, articles, market data, user credentials - Location: `shared/models/` (SQLAlchemy ORM models) - Contains: 14 tables including `trades`, `signals`, `articles`, `article_sentiments`, `market_data`, `positions`, `users`, `strategies`, strategy weights history - Depends on: PostgreSQL + TimescaleDB extension - Used by: All services for read/write persistence **Messaging Layer:** - Purpose: Asynchronous event streaming between services - Location: `shared/redis_streams.py` - Contains: `StreamPublisher` (XADD), `StreamConsumer` (XREADGROUP with consumer groups) - Depends on: Redis Streams - Used by: All services for pub/sub communication **Frontend Layer:** - Purpose: User-facing dashboard for monitoring and control - Location: `dashboard/src/` - Contains: React pages, components, API client, WebAuthn auth - Depends on: TanStack Query, TradingView charts, Recharts, React Router - Used by: End users for real-time monitoring **Utility Layer:** - Purpose: Cross-cutting concerns - Location: `shared/config.py`, `shared/telemetry.py`, `shared/schemas/` - Contains: Configuration management (Pydantic BaseSettings), OpenTelemetry/Prometheus metrics setup, message schemas (Pydantic v2) - Depends on: pydantic-settings, opentelemetry-sdk, prometheus-client - Used by: All services ## Data Flow **News Ingestion Pipeline:** 1. `services/news_fetcher/main.py` polls RSS feeds and Reddit on independent schedules 2. Uses `RSSSource` and `RedditSource` to fetch articles 3. Deduplicates via Redis SET (`news:seen_hashes`) 4. Publishes `RawArticle` messages to `news:raw` Redis Stream **Sentiment Analysis:** 1. `services/sentiment_analyzer/main.py` consumes from `news:raw` 2. Runs articles through `FinBERTAnalyzer` for sentiment scores 3. Falls back to `OllamaAnalyzer` if confidence < threshold (configured via `SentimentAnalyzerConfig.finbert_confidence_threshold`) 4. Extracts ticker mentions via regex in `ticker_extractor.py` 5. Persists `Article` + `ArticleSentiment` to database 6. Publishes `ScoredArticle` messages (one per ticker per article) to `news:scored` Redis Stream **Market Data Collection:** 1. `services/market_data/main.py` fetches OHLCV bars from Alpaca (historical + live) 2. Publishes bars to `market:bars` Redis Stream as ticker-specific snapshots 3. Consumed by signal generator for technical indicators **Signal Generation:** 1. `services/signal_generator/main.py` consumes from both `news:scored` and `market:bars` 2. Maintains in-memory buffers: per-ticker sentiment context (deque of last 50 scores) and market data (SMA, RSI) 3. Runs `WeightedEnsemble` combining three strategies: - `MomentumStrategy`: SMA cross-over based - `MeanReversionStrategy`: RSI-based - `NewsDrivenStrategy`: Sentiment-aware 4. Weights loaded from Redis cache (default equal weights: 0.333 each) 5. Only emits `TradeSignal` when combined strength > threshold (default 0.3) 6. Publishes to `signals:generated` Redis Stream, persists `Signal` model to database **Trade Execution:** 1. `services/trade_executor/main.py` consumes from `signals:generated` 2. `RiskManager` performs pre-trade checks: - Market hours validation (9:30 AM - 4:00 PM ET) - Max daily loss limit - Per-ticker position limits - Cooldown period after exits 3. Calculates position size based on account equity and risk parameters 4. Submits order via `AlpacaBroker` (abstraction over alpaca-py) 5. Records `Trade` model with order details 6. Publishes `TradeExecution` to `trades:executed` Redis Stream **Learning & Weight Adjustment:** 1. `services/learning_engine/main.py` consumes from `trades:executed` 2. Identifies position closes by tracking opening trades in Redis hash (`positions:history`) 3. `TradeEvaluator` calculates realized P&L 4. `WeightAdjuster` uses multi-armed bandit approach: - Attributes P&L credit to contributing strategies - Adjusts weights (with guardrails: min trades threshold, max shift limit, weight floor) - Persists adjustments to `LearningAdjustment` model for auditability 5. Saves updated weights to Redis cache **Portfolio Sync (Background Task):** 1. `services/api_gateway/tasks/portfolio_sync.py` runs as background coroutine 2. Periodically queries Alpaca for current positions, cash, equity 3. Stores `PortfolioSnapshot` in database for historical tracking 4. Dashboard polls portfolio endpoint for real-time data **API Gateway & Dashboard:** 1. `services/api_gateway/main.py` FastAPI app with: - REST endpoints (`/api/signals`, `/api/trades`, `/api/portfolio`, `/api/news`, `/api/strategies`, `/api/backtest`, `/api/controls`) - WebSocket endpoint for real-time updates - WebAuthn/JWT authentication 2. Dashboard (`dashboard/src/App.tsx`) consumed by authenticated users 3. Real-time market data, trades, signals streamed via WebSocket **State Management:** - **Redis Streams**: Messages flow unidirectionally; consumer groups ensure at-least-once processing - **Redis Cache**: Strategy weights, position history, deduplication hashes - **PostgreSQL**: Long-term storage of trades, signals, articles, market snapshots, users - **In-Memory**: Services maintain per-ticker buffers (sentiment scores, market data) for current processing cycle; cleared/rebuilt as data arrives - **JWT Sessions**: Issued at login, renewed via refresh token endpoint ## Key Abstractions **BaseBroker:** - Purpose: Swap brokerage providers without changing services - Examples: `shared/broker/base.py` (ABC), `shared/broker/alpaca_broker.py` (implementation) - Pattern: Abstract methods for `submit_order()`, `get_account()`, `get_positions()`, `cancel_order()` - Trade executor only knows `BaseBroker` interface; `AlpacaBroker` is a concrete implementation **BaseStrategy:** - Purpose: Pluggable trading strategy implementations - Examples: `shared/strategies/base.py` (ABC), `momentum.py`, `mean_reversion.py`, `news_driven.py` - Pattern: Each strategy implements `async evaluate(ticker, market, sentiment)` returning `TradeSignal | None` - Signal generator instantiates all three at startup, runs via `WeightedEnsemble.evaluate()` **StreamPublisher / StreamConsumer:** - Purpose: Simplified Redis Streams wrapper with JSON serialization - Examples: `shared/redis_streams.py` - Pattern: `Publisher.publish(dict)` → `XADD`, `Consumer.consume()` → async iterator yielding `(msg_id, data)` - Consumer groups ensure each message is processed once per group **Pydantic Schemas:** - Purpose: Message validation and serialization - Examples: `shared/schemas/news.py`, `shared/schemas/trading.py`, `shared/schemas/learning.py` - Pattern: Each stream has a schema (e.g., `RawArticle`, `ScoredArticle`, `TradeSignal`, `TradeExecution`) - Services validate on consume, serialize with `model_dump(mode="json")` before publish ## Entry Points **News Fetcher:** - Location: `services/news_fetcher/main.py` - Triggers: Starts as standalone service, polling on timers - Responsibilities: Fetch RSS/Reddit articles, deduplicate, publish to `news:raw` **Sentiment Analyzer:** - Location: `services/sentiment_analyzer/main.py` - Triggers: Consumes messages from `news:raw` stream - Responsibilities: Score sentiment (FinBERT + Ollama fallback), extract tickers, persist articles, publish to `news:scored` **Market Data Service:** - Location: `services/market_data/main.py` - Triggers: Starts as standalone service, fetches on startup + live streaming - Responsibilities: Fetch OHLCV bars from Alpaca, publish to `market:bars` **Signal Generator:** - Location: `services/signal_generator/main.py` - Triggers: Consumes from both `news:scored` and `market:bars` streams - Responsibilities: Combine sentiment + technical data, run strategy ensemble, publish to `signals:generated` **Trade Executor:** - Location: `services/trade_executor/main.py` - Triggers: Consumes from `signals:generated` stream - Responsibilities: Risk check, position sizing, order submission, trade recording, publish to `trades:executed` **Learning Engine:** - Location: `services/learning_engine/main.py` - Triggers: Consumes from `trades:executed` stream - Responsibilities: Evaluate closed positions, adjust strategy weights, audit adjustments **API Gateway:** - Location: `services/api_gateway/main.py` - Triggers: HTTP server listening on configurable port (default 8000) - Responsibilities: REST/WebSocket API, user authentication (WebAuthn), portfolio sync task, query database **Backtester:** - Location: `backtester/engine.py` - Triggers: Called from API endpoint `/api/backtest` (POST) - Responsibilities: Historical replay with `SimulatedBroker`, metrics calculation, equity curve generation ## Error Handling **Strategy:** Graceful degradation and event loop resilience. **Patterns:** - **Service-level**: Catch broad `Exception` in polling loops (news fetcher), log and continue; metrics counters increment for monitoring - **Fallback chains**: Sentiment analyzer tries FinBERT first, falls back to Ollama on low confidence - **Risk checks**: Trade executor rejects signals that fail risk checks (logs reason, continues) - **Database retries**: If `IntegrityError` on article insert (duplicate), continue without re-raising - **Stream consumption**: Consumer groups handle delivery guarantees; manual `XACK` after processing ensures fault tolerance - **Shutdown**: `asyncio.TaskGroup` and signal handlers ensure graceful termination; cleanup of DB/Redis connections in FastAPI lifespan ## Cross-Cutting Concerns **Logging:** - Each service uses Python `logging` module with service name - All exceptions logged via `logger.exception()` for stack traces - Key events logged at INFO level (article counts, trades executed, weight updates) **Validation:** - Pydantic v2 schemas validate all messages on consume - Database models use SQLAlchemy constraints (NOT NULL, UNIQUE, FOREIGN KEY) - Config classes extend `BaseSettings` with environment variable prefixes (`TRADING_*`) **Authentication:** - WebAuthn (passkey) via `webauthn` PyPI package (`services/api_gateway/auth/`) - JWT tokens issued at login, verified on every protected endpoint via `get_current_user` dependency - Credentials stored as `UserCredential` model (credential ID, public key, counter) **Telemetry:** - OpenTelemetry meters instantiated per service via `setup_telemetry(service_name, metrics_port)` - Counters: `articles_fetched`, `finbert_count`, `ollama_count`, `rejections`, `trades_executed` - Histograms: `fill_latency`, sentiment analysis duration, strategy evaluation time - Prometheus scrapes `/metrics` endpoint (default port 9090) from each service - No Prometheus/Grafana deployed; uses existing infrastructure for metrics collection