trading/.planning/codebase/ARCHITECTURE.md
2026-02-23 20:04:05 +00:00

246 lines
12 KiB
Markdown

# Architecture
**Analysis Date:** 2025-02-23
## Pattern Overview
**Overall:** Event-driven microservices architecture using Redis Streams for inter-service communication.
**Key Characteristics:**
- 8 independent Python services communicating asynchronously via Redis Streams
- Layered abstraction for extensibility: broker abstraction (`BaseBroker`), strategy abstraction (`BaseStrategy`), source abstraction for news
- Each service consumes from one or more Redis Streams, processes data, persists to PostgreSQL, and publishes downstream results
- Shared libraries provide domain models, database layer, telemetry, and abstractions
- React TypeScript frontend connected via WebSocket and REST API
## Layers
**Service Layer (Event Processors):**
- Purpose: Independent microservices that transform data across the trading pipeline
- Location: `services/*/main.py` (entry points)
- Contains: RSS/Reddit fetchers, sentiment analysis, signal generation, trade execution, learning engine, market data subscriptions, API gateway
- Depends on: `shared/` libraries, external APIs (Alpaca, Reddit, Ollama)
- Used by: Each other (via Redis Streams), dashboard (via API gateway)
**Infrastructure/Abstraction Layer:**
- Purpose: Pluggable interfaces and database access
- Location: `shared/broker/`, `shared/strategies/`, `shared/db.py`
- Contains: `BaseBroker` (Alpaca implementation), `BaseStrategy` (Momentum/MeanReversion/NewsDriven), database engine setup
- Depends on: SQLAlchemy, alpaca-py, external ML models
- Used by: Trade executor, signal generator, backtester
**Data Layer:**
- Purpose: Persistent storage of trades, signals, articles, market data, user credentials
- Location: `shared/models/` (SQLAlchemy ORM models)
- Contains: 14 tables including `trades`, `signals`, `articles`, `article_sentiments`, `market_data`, `positions`, `users`, `strategies`, strategy weights history
- Depends on: PostgreSQL + TimescaleDB extension
- Used by: All services for read/write persistence
**Messaging Layer:**
- Purpose: Asynchronous event streaming between services
- Location: `shared/redis_streams.py`
- Contains: `StreamPublisher` (XADD), `StreamConsumer` (XREADGROUP with consumer groups)
- Depends on: Redis Streams
- Used by: All services for pub/sub communication
**Frontend Layer:**
- Purpose: User-facing dashboard for monitoring and control
- Location: `dashboard/src/`
- Contains: React pages, components, API client, WebAuthn auth
- Depends on: TanStack Query, TradingView charts, Recharts, React Router
- Used by: End users for real-time monitoring
**Utility Layer:**
- Purpose: Cross-cutting concerns
- Location: `shared/config.py`, `shared/telemetry.py`, `shared/schemas/`
- Contains: Configuration management (Pydantic BaseSettings), OpenTelemetry/Prometheus metrics setup, message schemas (Pydantic v2)
- Depends on: pydantic-settings, opentelemetry-sdk, prometheus-client
- Used by: All services
## Data Flow
**News Ingestion Pipeline:**
1. `services/news_fetcher/main.py` polls RSS feeds and Reddit on independent schedules
2. Uses `RSSSource` and `RedditSource` to fetch articles
3. Deduplicates via Redis SET (`news:seen_hashes`)
4. Publishes `RawArticle` messages to `news:raw` Redis Stream
**Sentiment Analysis:**
1. `services/sentiment_analyzer/main.py` consumes from `news:raw`
2. Runs articles through `FinBERTAnalyzer` for sentiment scores
3. Falls back to `OllamaAnalyzer` if confidence < threshold (configured via `SentimentAnalyzerConfig.finbert_confidence_threshold`)
4. Extracts ticker mentions via regex in `ticker_extractor.py`
5. Persists `Article` + `ArticleSentiment` to database
6. Publishes `ScoredArticle` messages (one per ticker per article) to `news:scored` Redis Stream
**Market Data Collection:**
1. `services/market_data/main.py` fetches OHLCV bars from Alpaca (historical + live)
2. Publishes bars to `market:bars` Redis Stream as ticker-specific snapshots
3. Consumed by signal generator for technical indicators
**Signal Generation:**
1. `services/signal_generator/main.py` consumes from both `news:scored` and `market:bars`
2. Maintains in-memory buffers: per-ticker sentiment context (deque of last 50 scores) and market data (SMA, RSI)
3. Runs `WeightedEnsemble` combining three strategies:
- `MomentumStrategy`: SMA cross-over based
- `MeanReversionStrategy`: RSI-based
- `NewsDrivenStrategy`: Sentiment-aware
4. Weights loaded from Redis cache (default equal weights: 0.333 each)
5. Only emits `TradeSignal` when combined strength > threshold (default 0.3)
6. Publishes to `signals:generated` Redis Stream, persists `Signal` model to database
**Trade Execution:**
1. `services/trade_executor/main.py` consumes from `signals:generated`
2. `RiskManager` performs pre-trade checks:
- Market hours validation (9:30 AM - 4:00 PM ET)
- Max daily loss limit
- Per-ticker position limits
- Cooldown period after exits
3. Calculates position size based on account equity and risk parameters
4. Submits order via `AlpacaBroker` (abstraction over alpaca-py)
5. Records `Trade` model with order details
6. Publishes `TradeExecution` to `trades:executed` Redis Stream
**Learning & Weight Adjustment:**
1. `services/learning_engine/main.py` consumes from `trades:executed`
2. Identifies position closes by tracking opening trades in Redis hash (`positions:history`)
3. `TradeEvaluator` calculates realized P&L
4. `WeightAdjuster` uses multi-armed bandit approach:
- Attributes P&L credit to contributing strategies
- Adjusts weights (with guardrails: min trades threshold, max shift limit, weight floor)
- Persists adjustments to `LearningAdjustment` model for auditability
5. Saves updated weights to Redis cache
**Portfolio Sync (Background Task):**
1. `services/api_gateway/tasks/portfolio_sync.py` runs as background coroutine
2. Periodically queries Alpaca for current positions, cash, equity
3. Stores `PortfolioSnapshot` in database for historical tracking
4. Dashboard polls portfolio endpoint for real-time data
**API Gateway & Dashboard:**
1. `services/api_gateway/main.py` FastAPI app with:
- REST endpoints (`/api/signals`, `/api/trades`, `/api/portfolio`, `/api/news`, `/api/strategies`, `/api/backtest`, `/api/controls`)
- WebSocket endpoint for real-time updates
- WebAuthn/JWT authentication
2. Dashboard (`dashboard/src/App.tsx`) consumed by authenticated users
3. Real-time market data, trades, signals streamed via WebSocket
**State Management:**
- **Redis Streams**: Messages flow unidirectionally; consumer groups ensure at-least-once processing
- **Redis Cache**: Strategy weights, position history, deduplication hashes
- **PostgreSQL**: Long-term storage of trades, signals, articles, market snapshots, users
- **In-Memory**: Services maintain per-ticker buffers (sentiment scores, market data) for current processing cycle; cleared/rebuilt as data arrives
- **JWT Sessions**: Issued at login, renewed via refresh token endpoint
## Key Abstractions
**BaseBroker:**
- Purpose: Swap brokerage providers without changing services
- Examples: `shared/broker/base.py` (ABC), `shared/broker/alpaca_broker.py` (implementation)
- Pattern: Abstract methods for `submit_order()`, `get_account()`, `get_positions()`, `cancel_order()`
- Trade executor only knows `BaseBroker` interface; `AlpacaBroker` is a concrete implementation
**BaseStrategy:**
- Purpose: Pluggable trading strategy implementations
- Examples: `shared/strategies/base.py` (ABC), `momentum.py`, `mean_reversion.py`, `news_driven.py`
- Pattern: Each strategy implements `async evaluate(ticker, market, sentiment)` returning `TradeSignal | None`
- Signal generator instantiates all three at startup, runs via `WeightedEnsemble.evaluate()`
**StreamPublisher / StreamConsumer:**
- Purpose: Simplified Redis Streams wrapper with JSON serialization
- Examples: `shared/redis_streams.py`
- Pattern: `Publisher.publish(dict)``XADD`, `Consumer.consume()` → async iterator yielding `(msg_id, data)`
- Consumer groups ensure each message is processed once per group
**Pydantic Schemas:**
- Purpose: Message validation and serialization
- Examples: `shared/schemas/news.py`, `shared/schemas/trading.py`, `shared/schemas/learning.py`
- Pattern: Each stream has a schema (e.g., `RawArticle`, `ScoredArticle`, `TradeSignal`, `TradeExecution`)
- Services validate on consume, serialize with `model_dump(mode="json")` before publish
## Entry Points
**News Fetcher:**
- Location: `services/news_fetcher/main.py`
- Triggers: Starts as standalone service, polling on timers
- Responsibilities: Fetch RSS/Reddit articles, deduplicate, publish to `news:raw`
**Sentiment Analyzer:**
- Location: `services/sentiment_analyzer/main.py`
- Triggers: Consumes messages from `news:raw` stream
- Responsibilities: Score sentiment (FinBERT + Ollama fallback), extract tickers, persist articles, publish to `news:scored`
**Market Data Service:**
- Location: `services/market_data/main.py`
- Triggers: Starts as standalone service, fetches on startup + live streaming
- Responsibilities: Fetch OHLCV bars from Alpaca, publish to `market:bars`
**Signal Generator:**
- Location: `services/signal_generator/main.py`
- Triggers: Consumes from both `news:scored` and `market:bars` streams
- Responsibilities: Combine sentiment + technical data, run strategy ensemble, publish to `signals:generated`
**Trade Executor:**
- Location: `services/trade_executor/main.py`
- Triggers: Consumes from `signals:generated` stream
- Responsibilities: Risk check, position sizing, order submission, trade recording, publish to `trades:executed`
**Learning Engine:**
- Location: `services/learning_engine/main.py`
- Triggers: Consumes from `trades:executed` stream
- Responsibilities: Evaluate closed positions, adjust strategy weights, audit adjustments
**API Gateway:**
- Location: `services/api_gateway/main.py`
- Triggers: HTTP server listening on configurable port (default 8000)
- Responsibilities: REST/WebSocket API, user authentication (WebAuthn), portfolio sync task, query database
**Backtester:**
- Location: `backtester/engine.py`
- Triggers: Called from API endpoint `/api/backtest` (POST)
- Responsibilities: Historical replay with `SimulatedBroker`, metrics calculation, equity curve generation
## Error Handling
**Strategy:** Graceful degradation and event loop resilience.
**Patterns:**
- **Service-level**: Catch broad `Exception` in polling loops (news fetcher), log and continue; metrics counters increment for monitoring
- **Fallback chains**: Sentiment analyzer tries FinBERT first, falls back to Ollama on low confidence
- **Risk checks**: Trade executor rejects signals that fail risk checks (logs reason, continues)
- **Database retries**: If `IntegrityError` on article insert (duplicate), continue without re-raising
- **Stream consumption**: Consumer groups handle delivery guarantees; manual `XACK` after processing ensures fault tolerance
- **Shutdown**: `asyncio.TaskGroup` and signal handlers ensure graceful termination; cleanup of DB/Redis connections in FastAPI lifespan
## Cross-Cutting Concerns
**Logging:**
- Each service uses Python `logging` module with service name
- All exceptions logged via `logger.exception()` for stack traces
- Key events logged at INFO level (article counts, trades executed, weight updates)
**Validation:**
- Pydantic v2 schemas validate all messages on consume
- Database models use SQLAlchemy constraints (NOT NULL, UNIQUE, FOREIGN KEY)
- Config classes extend `BaseSettings` with environment variable prefixes (`TRADING_*`)
**Authentication:**
- WebAuthn (passkey) via `webauthn` PyPI package (`services/api_gateway/auth/`)
- JWT tokens issued at login, verified on every protected endpoint via `get_current_user` dependency
- Credentials stored as `UserCredential` model (credential ID, public key, counter)
**Telemetry:**
- OpenTelemetry meters instantiated per service via `setup_telemetry(service_name, metrics_port)`
- Counters: `articles_fetched`, `finbert_count`, `ollama_count`, `rejections`, `trades_executed`
- Histograms: `fill_latency`, sentiment analysis duration, strategy evaluation time
- Prometheus scrapes `/metrics` endpoint (default port 9090) from each service
- No Prometheus/Grafana deployed; uses existing infrastructure for metrics collection