12 KiB
Architecture
Analysis Date: 2025-02-23
Pattern Overview
Overall: Event-driven microservices architecture using Redis Streams for inter-service communication.
Key Characteristics:
- 8 independent Python services communicating asynchronously via Redis Streams
- Layered abstraction for extensibility: broker abstraction (
BaseBroker), strategy abstraction (BaseStrategy), source abstraction for news - Each service consumes from one or more Redis Streams, processes data, persists to PostgreSQL, and publishes downstream results
- Shared libraries provide domain models, database layer, telemetry, and abstractions
- React TypeScript frontend connected via WebSocket and REST API
Layers
Service Layer (Event Processors):
- Purpose: Independent microservices that transform data across the trading pipeline
- Location:
services/*/main.py(entry points) - Contains: RSS/Reddit fetchers, sentiment analysis, signal generation, trade execution, learning engine, market data subscriptions, API gateway
- Depends on:
shared/libraries, external APIs (Alpaca, Reddit, Ollama) - Used by: Each other (via Redis Streams), dashboard (via API gateway)
Infrastructure/Abstraction Layer:
- Purpose: Pluggable interfaces and database access
- Location:
shared/broker/,shared/strategies/,shared/db.py - Contains:
BaseBroker(Alpaca implementation),BaseStrategy(Momentum/MeanReversion/NewsDriven), database engine setup - Depends on: SQLAlchemy, alpaca-py, external ML models
- Used by: Trade executor, signal generator, backtester
Data Layer:
- Purpose: Persistent storage of trades, signals, articles, market data, user credentials
- Location:
shared/models/(SQLAlchemy ORM models) - Contains: 14 tables including
trades,signals,articles,article_sentiments,market_data,positions,users,strategies, strategy weights history - Depends on: PostgreSQL + TimescaleDB extension
- Used by: All services for read/write persistence
Messaging Layer:
- Purpose: Asynchronous event streaming between services
- Location:
shared/redis_streams.py - Contains:
StreamPublisher(XADD),StreamConsumer(XREADGROUP with consumer groups) - Depends on: Redis Streams
- Used by: All services for pub/sub communication
Frontend Layer:
- Purpose: User-facing dashboard for monitoring and control
- Location:
dashboard/src/ - Contains: React pages, components, API client, WebAuthn auth
- Depends on: TanStack Query, TradingView charts, Recharts, React Router
- Used by: End users for real-time monitoring
Utility Layer:
- Purpose: Cross-cutting concerns
- Location:
shared/config.py,shared/telemetry.py,shared/schemas/ - Contains: Configuration management (Pydantic BaseSettings), OpenTelemetry/Prometheus metrics setup, message schemas (Pydantic v2)
- Depends on: pydantic-settings, opentelemetry-sdk, prometheus-client
- Used by: All services
Data Flow
News Ingestion Pipeline:
services/news_fetcher/main.pypolls RSS feeds and Reddit on independent schedules- Uses
RSSSourceandRedditSourceto fetch articles - Deduplicates via Redis SET (
news:seen_hashes) - Publishes
RawArticlemessages tonews:rawRedis Stream
Sentiment Analysis:
services/sentiment_analyzer/main.pyconsumes fromnews:raw- Runs articles through
FinBERTAnalyzerfor sentiment scores - Falls back to
OllamaAnalyzerif confidence < threshold (configured viaSentimentAnalyzerConfig.finbert_confidence_threshold) - Extracts ticker mentions via regex in
ticker_extractor.py - Persists
Article+ArticleSentimentto database - Publishes
ScoredArticlemessages (one per ticker per article) tonews:scoredRedis Stream
Market Data Collection:
services/market_data/main.pyfetches OHLCV bars from Alpaca (historical + live)- Publishes bars to
market:barsRedis Stream as ticker-specific snapshots - Consumed by signal generator for technical indicators
Signal Generation:
services/signal_generator/main.pyconsumes from bothnews:scoredandmarket:bars- Maintains in-memory buffers: per-ticker sentiment context (deque of last 50 scores) and market data (SMA, RSI)
- Runs
WeightedEnsemblecombining three strategies:MomentumStrategy: SMA cross-over basedMeanReversionStrategy: RSI-basedNewsDrivenStrategy: Sentiment-aware
- Weights loaded from Redis cache (default equal weights: 0.333 each)
- Only emits
TradeSignalwhen combined strength > threshold (default 0.3) - Publishes to
signals:generatedRedis Stream, persistsSignalmodel to database
Trade Execution:
services/trade_executor/main.pyconsumes fromsignals:generatedRiskManagerperforms pre-trade checks:- Market hours validation (9:30 AM - 4:00 PM ET)
- Max daily loss limit
- Per-ticker position limits
- Cooldown period after exits
- Calculates position size based on account equity and risk parameters
- Submits order via
AlpacaBroker(abstraction over alpaca-py) - Records
Trademodel with order details - Publishes
TradeExecutiontotrades:executedRedis Stream
Learning & Weight Adjustment:
services/learning_engine/main.pyconsumes fromtrades:executed- Identifies position closes by tracking opening trades in Redis hash (
positions:history) TradeEvaluatorcalculates realized P&LWeightAdjusteruses multi-armed bandit approach:- Attributes P&L credit to contributing strategies
- Adjusts weights (with guardrails: min trades threshold, max shift limit, weight floor)
- Persists adjustments to
LearningAdjustmentmodel for auditability
- Saves updated weights to Redis cache
Portfolio Sync (Background Task):
services/api_gateway/tasks/portfolio_sync.pyruns as background coroutine- Periodically queries Alpaca for current positions, cash, equity
- Stores
PortfolioSnapshotin database for historical tracking - Dashboard polls portfolio endpoint for real-time data
API Gateway & Dashboard:
services/api_gateway/main.pyFastAPI app with:- REST endpoints (
/api/signals,/api/trades,/api/portfolio,/api/news,/api/strategies,/api/backtest,/api/controls) - WebSocket endpoint for real-time updates
- WebAuthn/JWT authentication
- REST endpoints (
- Dashboard (
dashboard/src/App.tsx) consumed by authenticated users - Real-time market data, trades, signals streamed via WebSocket
State Management:
- Redis Streams: Messages flow unidirectionally; consumer groups ensure at-least-once processing
- Redis Cache: Strategy weights, position history, deduplication hashes
- PostgreSQL: Long-term storage of trades, signals, articles, market snapshots, users
- In-Memory: Services maintain per-ticker buffers (sentiment scores, market data) for current processing cycle; cleared/rebuilt as data arrives
- JWT Sessions: Issued at login, renewed via refresh token endpoint
Key Abstractions
BaseBroker:
- Purpose: Swap brokerage providers without changing services
- Examples:
shared/broker/base.py(ABC),shared/broker/alpaca_broker.py(implementation) - Pattern: Abstract methods for
submit_order(),get_account(),get_positions(),cancel_order() - Trade executor only knows
BaseBrokerinterface;AlpacaBrokeris a concrete implementation
BaseStrategy:
- Purpose: Pluggable trading strategy implementations
- Examples:
shared/strategies/base.py(ABC),momentum.py,mean_reversion.py,news_driven.py - Pattern: Each strategy implements
async evaluate(ticker, market, sentiment)returningTradeSignal | None - Signal generator instantiates all three at startup, runs via
WeightedEnsemble.evaluate()
StreamPublisher / StreamConsumer:
- Purpose: Simplified Redis Streams wrapper with JSON serialization
- Examples:
shared/redis_streams.py - Pattern:
Publisher.publish(dict)→XADD,Consumer.consume()→ async iterator yielding(msg_id, data) - Consumer groups ensure each message is processed once per group
Pydantic Schemas:
- Purpose: Message validation and serialization
- Examples:
shared/schemas/news.py,shared/schemas/trading.py,shared/schemas/learning.py - Pattern: Each stream has a schema (e.g.,
RawArticle,ScoredArticle,TradeSignal,TradeExecution) - Services validate on consume, serialize with
model_dump(mode="json")before publish
Entry Points
News Fetcher:
- Location:
services/news_fetcher/main.py - Triggers: Starts as standalone service, polling on timers
- Responsibilities: Fetch RSS/Reddit articles, deduplicate, publish to
news:raw
Sentiment Analyzer:
- Location:
services/sentiment_analyzer/main.py - Triggers: Consumes messages from
news:rawstream - Responsibilities: Score sentiment (FinBERT + Ollama fallback), extract tickers, persist articles, publish to
news:scored
Market Data Service:
- Location:
services/market_data/main.py - Triggers: Starts as standalone service, fetches on startup + live streaming
- Responsibilities: Fetch OHLCV bars from Alpaca, publish to
market:bars
Signal Generator:
- Location:
services/signal_generator/main.py - Triggers: Consumes from both
news:scoredandmarket:barsstreams - Responsibilities: Combine sentiment + technical data, run strategy ensemble, publish to
signals:generated
Trade Executor:
- Location:
services/trade_executor/main.py - Triggers: Consumes from
signals:generatedstream - Responsibilities: Risk check, position sizing, order submission, trade recording, publish to
trades:executed
Learning Engine:
- Location:
services/learning_engine/main.py - Triggers: Consumes from
trades:executedstream - Responsibilities: Evaluate closed positions, adjust strategy weights, audit adjustments
API Gateway:
- Location:
services/api_gateway/main.py - Triggers: HTTP server listening on configurable port (default 8000)
- Responsibilities: REST/WebSocket API, user authentication (WebAuthn), portfolio sync task, query database
Backtester:
- Location:
backtester/engine.py - Triggers: Called from API endpoint
/api/backtest(POST) - Responsibilities: Historical replay with
SimulatedBroker, metrics calculation, equity curve generation
Error Handling
Strategy: Graceful degradation and event loop resilience.
Patterns:
- Service-level: Catch broad
Exceptionin polling loops (news fetcher), log and continue; metrics counters increment for monitoring - Fallback chains: Sentiment analyzer tries FinBERT first, falls back to Ollama on low confidence
- Risk checks: Trade executor rejects signals that fail risk checks (logs reason, continues)
- Database retries: If
IntegrityErroron article insert (duplicate), continue without re-raising - Stream consumption: Consumer groups handle delivery guarantees; manual
XACKafter processing ensures fault tolerance - Shutdown:
asyncio.TaskGroupand signal handlers ensure graceful termination; cleanup of DB/Redis connections in FastAPI lifespan
Cross-Cutting Concerns
Logging:
- Each service uses Python
loggingmodule with service name - All exceptions logged via
logger.exception()for stack traces - Key events logged at INFO level (article counts, trades executed, weight updates)
Validation:
- Pydantic v2 schemas validate all messages on consume
- Database models use SQLAlchemy constraints (NOT NULL, UNIQUE, FOREIGN KEY)
- Config classes extend
BaseSettingswith environment variable prefixes (TRADING_*)
Authentication:
- WebAuthn (passkey) via
webauthnPyPI package (services/api_gateway/auth/) - JWT tokens issued at login, verified on every protected endpoint via
get_current_userdependency - Credentials stored as
UserCredentialmodel (credential ID, public key, counter)
Telemetry:
- OpenTelemetry meters instantiated per service via
setup_telemetry(service_name, metrics_port) - Counters:
articles_fetched,finbert_count,ollama_count,rejections,trades_executed - Histograms:
fill_latency, sentiment analysis duration, strategy evaluation time - Prometheus scrapes
/metricsendpoint (default port 9090) from each service - No Prometheus/Grafana deployed; uses existing infrastructure for metrics collection