246 lines
12 KiB
Markdown
246 lines
12 KiB
Markdown
# Architecture
|
|
|
|
**Analysis Date:** 2025-02-23
|
|
|
|
## Pattern Overview
|
|
|
|
**Overall:** Event-driven microservices architecture using Redis Streams for inter-service communication.
|
|
|
|
**Key Characteristics:**
|
|
- 8 independent Python services communicating asynchronously via Redis Streams
|
|
- Layered abstraction for extensibility: broker abstraction (`BaseBroker`), strategy abstraction (`BaseStrategy`), source abstraction for news
|
|
- Each service consumes from one or more Redis Streams, processes data, persists to PostgreSQL, and publishes downstream results
|
|
- Shared libraries provide domain models, database layer, telemetry, and abstractions
|
|
- React TypeScript frontend connected via WebSocket and REST API
|
|
|
|
## Layers
|
|
|
|
**Service Layer (Event Processors):**
|
|
- Purpose: Independent microservices that transform data across the trading pipeline
|
|
- Location: `services/*/main.py` (entry points)
|
|
- Contains: RSS/Reddit fetchers, sentiment analysis, signal generation, trade execution, learning engine, market data subscriptions, API gateway
|
|
- Depends on: `shared/` libraries, external APIs (Alpaca, Reddit, Ollama)
|
|
- Used by: Each other (via Redis Streams), dashboard (via API gateway)
|
|
|
|
**Infrastructure/Abstraction Layer:**
|
|
- Purpose: Pluggable interfaces and database access
|
|
- Location: `shared/broker/`, `shared/strategies/`, `shared/db.py`
|
|
- Contains: `BaseBroker` (Alpaca implementation), `BaseStrategy` (Momentum/MeanReversion/NewsDriven), database engine setup
|
|
- Depends on: SQLAlchemy, alpaca-py, external ML models
|
|
- Used by: Trade executor, signal generator, backtester
|
|
|
|
**Data Layer:**
|
|
- Purpose: Persistent storage of trades, signals, articles, market data, user credentials
|
|
- Location: `shared/models/` (SQLAlchemy ORM models)
|
|
- Contains: 14 tables including `trades`, `signals`, `articles`, `article_sentiments`, `market_data`, `positions`, `users`, `strategies`, strategy weights history
|
|
- Depends on: PostgreSQL + TimescaleDB extension
|
|
- Used by: All services for read/write persistence
|
|
|
|
**Messaging Layer:**
|
|
- Purpose: Asynchronous event streaming between services
|
|
- Location: `shared/redis_streams.py`
|
|
- Contains: `StreamPublisher` (XADD), `StreamConsumer` (XREADGROUP with consumer groups)
|
|
- Depends on: Redis Streams
|
|
- Used by: All services for pub/sub communication
|
|
|
|
**Frontend Layer:**
|
|
- Purpose: User-facing dashboard for monitoring and control
|
|
- Location: `dashboard/src/`
|
|
- Contains: React pages, components, API client, WebAuthn auth
|
|
- Depends on: TanStack Query, TradingView charts, Recharts, React Router
|
|
- Used by: End users for real-time monitoring
|
|
|
|
**Utility Layer:**
|
|
- Purpose: Cross-cutting concerns
|
|
- Location: `shared/config.py`, `shared/telemetry.py`, `shared/schemas/`
|
|
- Contains: Configuration management (Pydantic BaseSettings), OpenTelemetry/Prometheus metrics setup, message schemas (Pydantic v2)
|
|
- Depends on: pydantic-settings, opentelemetry-sdk, prometheus-client
|
|
- Used by: All services
|
|
|
|
## Data Flow
|
|
|
|
**News Ingestion Pipeline:**
|
|
|
|
1. `services/news_fetcher/main.py` polls RSS feeds and Reddit on independent schedules
|
|
2. Uses `RSSSource` and `RedditSource` to fetch articles
|
|
3. Deduplicates via Redis SET (`news:seen_hashes`)
|
|
4. Publishes `RawArticle` messages to `news:raw` Redis Stream
|
|
|
|
**Sentiment Analysis:**
|
|
|
|
1. `services/sentiment_analyzer/main.py` consumes from `news:raw`
|
|
2. Runs articles through `FinBERTAnalyzer` for sentiment scores
|
|
3. Falls back to `OllamaAnalyzer` if confidence < threshold (configured via `SentimentAnalyzerConfig.finbert_confidence_threshold`)
|
|
4. Extracts ticker mentions via regex in `ticker_extractor.py`
|
|
5. Persists `Article` + `ArticleSentiment` to database
|
|
6. Publishes `ScoredArticle` messages (one per ticker per article) to `news:scored` Redis Stream
|
|
|
|
**Market Data Collection:**
|
|
|
|
1. `services/market_data/main.py` fetches OHLCV bars from Alpaca (historical + live)
|
|
2. Publishes bars to `market:bars` Redis Stream as ticker-specific snapshots
|
|
3. Consumed by signal generator for technical indicators
|
|
|
|
**Signal Generation:**
|
|
|
|
1. `services/signal_generator/main.py` consumes from both `news:scored` and `market:bars`
|
|
2. Maintains in-memory buffers: per-ticker sentiment context (deque of last 50 scores) and market data (SMA, RSI)
|
|
3. Runs `WeightedEnsemble` combining three strategies:
|
|
- `MomentumStrategy`: SMA cross-over based
|
|
- `MeanReversionStrategy`: RSI-based
|
|
- `NewsDrivenStrategy`: Sentiment-aware
|
|
4. Weights loaded from Redis cache (default equal weights: 0.333 each)
|
|
5. Only emits `TradeSignal` when combined strength > threshold (default 0.3)
|
|
6. Publishes to `signals:generated` Redis Stream, persists `Signal` model to database
|
|
|
|
**Trade Execution:**
|
|
|
|
1. `services/trade_executor/main.py` consumes from `signals:generated`
|
|
2. `RiskManager` performs pre-trade checks:
|
|
- Market hours validation (9:30 AM - 4:00 PM ET)
|
|
- Max daily loss limit
|
|
- Per-ticker position limits
|
|
- Cooldown period after exits
|
|
3. Calculates position size based on account equity and risk parameters
|
|
4. Submits order via `AlpacaBroker` (abstraction over alpaca-py)
|
|
5. Records `Trade` model with order details
|
|
6. Publishes `TradeExecution` to `trades:executed` Redis Stream
|
|
|
|
**Learning & Weight Adjustment:**
|
|
|
|
1. `services/learning_engine/main.py` consumes from `trades:executed`
|
|
2. Identifies position closes by tracking opening trades in Redis hash (`positions:history`)
|
|
3. `TradeEvaluator` calculates realized P&L
|
|
4. `WeightAdjuster` uses multi-armed bandit approach:
|
|
- Attributes P&L credit to contributing strategies
|
|
- Adjusts weights (with guardrails: min trades threshold, max shift limit, weight floor)
|
|
- Persists adjustments to `LearningAdjustment` model for auditability
|
|
5. Saves updated weights to Redis cache
|
|
|
|
**Portfolio Sync (Background Task):**
|
|
|
|
1. `services/api_gateway/tasks/portfolio_sync.py` runs as background coroutine
|
|
2. Periodically queries Alpaca for current positions, cash, equity
|
|
3. Stores `PortfolioSnapshot` in database for historical tracking
|
|
4. Dashboard polls portfolio endpoint for real-time data
|
|
|
|
**API Gateway & Dashboard:**
|
|
|
|
1. `services/api_gateway/main.py` FastAPI app with:
|
|
- REST endpoints (`/api/signals`, `/api/trades`, `/api/portfolio`, `/api/news`, `/api/strategies`, `/api/backtest`, `/api/controls`)
|
|
- WebSocket endpoint for real-time updates
|
|
- WebAuthn/JWT authentication
|
|
2. Dashboard (`dashboard/src/App.tsx`) consumed by authenticated users
|
|
3. Real-time market data, trades, signals streamed via WebSocket
|
|
|
|
**State Management:**
|
|
|
|
- **Redis Streams**: Messages flow unidirectionally; consumer groups ensure at-least-once processing
|
|
- **Redis Cache**: Strategy weights, position history, deduplication hashes
|
|
- **PostgreSQL**: Long-term storage of trades, signals, articles, market snapshots, users
|
|
- **In-Memory**: Services maintain per-ticker buffers (sentiment scores, market data) for current processing cycle; cleared/rebuilt as data arrives
|
|
- **JWT Sessions**: Issued at login, renewed via refresh token endpoint
|
|
|
|
## Key Abstractions
|
|
|
|
**BaseBroker:**
|
|
- Purpose: Swap brokerage providers without changing services
|
|
- Examples: `shared/broker/base.py` (ABC), `shared/broker/alpaca_broker.py` (implementation)
|
|
- Pattern: Abstract methods for `submit_order()`, `get_account()`, `get_positions()`, `cancel_order()`
|
|
- Trade executor only knows `BaseBroker` interface; `AlpacaBroker` is a concrete implementation
|
|
|
|
**BaseStrategy:**
|
|
- Purpose: Pluggable trading strategy implementations
|
|
- Examples: `shared/strategies/base.py` (ABC), `momentum.py`, `mean_reversion.py`, `news_driven.py`
|
|
- Pattern: Each strategy implements `async evaluate(ticker, market, sentiment)` returning `TradeSignal | None`
|
|
- Signal generator instantiates all three at startup, runs via `WeightedEnsemble.evaluate()`
|
|
|
|
**StreamPublisher / StreamConsumer:**
|
|
- Purpose: Simplified Redis Streams wrapper with JSON serialization
|
|
- Examples: `shared/redis_streams.py`
|
|
- Pattern: `Publisher.publish(dict)` → `XADD`, `Consumer.consume()` → async iterator yielding `(msg_id, data)`
|
|
- Consumer groups ensure each message is processed once per group
|
|
|
|
**Pydantic Schemas:**
|
|
- Purpose: Message validation and serialization
|
|
- Examples: `shared/schemas/news.py`, `shared/schemas/trading.py`, `shared/schemas/learning.py`
|
|
- Pattern: Each stream has a schema (e.g., `RawArticle`, `ScoredArticle`, `TradeSignal`, `TradeExecution`)
|
|
- Services validate on consume, serialize with `model_dump(mode="json")` before publish
|
|
|
|
## Entry Points
|
|
|
|
**News Fetcher:**
|
|
- Location: `services/news_fetcher/main.py`
|
|
- Triggers: Starts as standalone service, polling on timers
|
|
- Responsibilities: Fetch RSS/Reddit articles, deduplicate, publish to `news:raw`
|
|
|
|
**Sentiment Analyzer:**
|
|
- Location: `services/sentiment_analyzer/main.py`
|
|
- Triggers: Consumes messages from `news:raw` stream
|
|
- Responsibilities: Score sentiment (FinBERT + Ollama fallback), extract tickers, persist articles, publish to `news:scored`
|
|
|
|
**Market Data Service:**
|
|
- Location: `services/market_data/main.py`
|
|
- Triggers: Starts as standalone service, fetches on startup + live streaming
|
|
- Responsibilities: Fetch OHLCV bars from Alpaca, publish to `market:bars`
|
|
|
|
**Signal Generator:**
|
|
- Location: `services/signal_generator/main.py`
|
|
- Triggers: Consumes from both `news:scored` and `market:bars` streams
|
|
- Responsibilities: Combine sentiment + technical data, run strategy ensemble, publish to `signals:generated`
|
|
|
|
**Trade Executor:**
|
|
- Location: `services/trade_executor/main.py`
|
|
- Triggers: Consumes from `signals:generated` stream
|
|
- Responsibilities: Risk check, position sizing, order submission, trade recording, publish to `trades:executed`
|
|
|
|
**Learning Engine:**
|
|
- Location: `services/learning_engine/main.py`
|
|
- Triggers: Consumes from `trades:executed` stream
|
|
- Responsibilities: Evaluate closed positions, adjust strategy weights, audit adjustments
|
|
|
|
**API Gateway:**
|
|
- Location: `services/api_gateway/main.py`
|
|
- Triggers: HTTP server listening on configurable port (default 8000)
|
|
- Responsibilities: REST/WebSocket API, user authentication (WebAuthn), portfolio sync task, query database
|
|
|
|
**Backtester:**
|
|
- Location: `backtester/engine.py`
|
|
- Triggers: Called from API endpoint `/api/backtest` (POST)
|
|
- Responsibilities: Historical replay with `SimulatedBroker`, metrics calculation, equity curve generation
|
|
|
|
## Error Handling
|
|
|
|
**Strategy:** Graceful degradation and event loop resilience.
|
|
|
|
**Patterns:**
|
|
- **Service-level**: Catch broad `Exception` in polling loops (news fetcher), log and continue; metrics counters increment for monitoring
|
|
- **Fallback chains**: Sentiment analyzer tries FinBERT first, falls back to Ollama on low confidence
|
|
- **Risk checks**: Trade executor rejects signals that fail risk checks (logs reason, continues)
|
|
- **Database retries**: If `IntegrityError` on article insert (duplicate), continue without re-raising
|
|
- **Stream consumption**: Consumer groups handle delivery guarantees; manual `XACK` after processing ensures fault tolerance
|
|
- **Shutdown**: `asyncio.TaskGroup` and signal handlers ensure graceful termination; cleanup of DB/Redis connections in FastAPI lifespan
|
|
|
|
## Cross-Cutting Concerns
|
|
|
|
**Logging:**
|
|
- Each service uses Python `logging` module with service name
|
|
- All exceptions logged via `logger.exception()` for stack traces
|
|
- Key events logged at INFO level (article counts, trades executed, weight updates)
|
|
|
|
**Validation:**
|
|
- Pydantic v2 schemas validate all messages on consume
|
|
- Database models use SQLAlchemy constraints (NOT NULL, UNIQUE, FOREIGN KEY)
|
|
- Config classes extend `BaseSettings` with environment variable prefixes (`TRADING_*`)
|
|
|
|
**Authentication:**
|
|
- WebAuthn (passkey) via `webauthn` PyPI package (`services/api_gateway/auth/`)
|
|
- JWT tokens issued at login, verified on every protected endpoint via `get_current_user` dependency
|
|
- Credentials stored as `UserCredential` model (credential ID, public key, counter)
|
|
|
|
**Telemetry:**
|
|
- OpenTelemetry meters instantiated per service via `setup_telemetry(service_name, metrics_port)`
|
|
- Counters: `articles_fetched`, `finbert_count`, `ollama_count`, `rejections`, `trades_executed`
|
|
- Histograms: `fill_latency`, sentiment analysis duration, strategy evaluation time
|
|
- Prometheus scrapes `/metrics` endpoint (default port 9090) from each service
|
|
- No Prometheus/Grafana deployed; uses existing infrastructure for metrics collection
|