trading/.planning/codebase/ARCHITECTURE.md
2026-02-23 20:04:05 +00:00

12 KiB

Architecture

Analysis Date: 2025-02-23

Pattern Overview

Overall: Event-driven microservices architecture using Redis Streams for inter-service communication.

Key Characteristics:

  • 8 independent Python services communicating asynchronously via Redis Streams
  • Layered abstraction for extensibility: broker abstraction (BaseBroker), strategy abstraction (BaseStrategy), source abstraction for news
  • Each service consumes from one or more Redis Streams, processes data, persists to PostgreSQL, and publishes downstream results
  • Shared libraries provide domain models, database layer, telemetry, and abstractions
  • React TypeScript frontend connected via WebSocket and REST API

Layers

Service Layer (Event Processors):

  • Purpose: Independent microservices that transform data across the trading pipeline
  • Location: services/*/main.py (entry points)
  • Contains: RSS/Reddit fetchers, sentiment analysis, signal generation, trade execution, learning engine, market data subscriptions, API gateway
  • Depends on: shared/ libraries, external APIs (Alpaca, Reddit, Ollama)
  • Used by: Each other (via Redis Streams), dashboard (via API gateway)

Infrastructure/Abstraction Layer:

  • Purpose: Pluggable interfaces and database access
  • Location: shared/broker/, shared/strategies/, shared/db.py
  • Contains: BaseBroker (Alpaca implementation), BaseStrategy (Momentum/MeanReversion/NewsDriven), database engine setup
  • Depends on: SQLAlchemy, alpaca-py, external ML models
  • Used by: Trade executor, signal generator, backtester

Data Layer:

  • Purpose: Persistent storage of trades, signals, articles, market data, user credentials
  • Location: shared/models/ (SQLAlchemy ORM models)
  • Contains: 14 tables including trades, signals, articles, article_sentiments, market_data, positions, users, strategies, strategy weights history
  • Depends on: PostgreSQL + TimescaleDB extension
  • Used by: All services for read/write persistence

Messaging Layer:

  • Purpose: Asynchronous event streaming between services
  • Location: shared/redis_streams.py
  • Contains: StreamPublisher (XADD), StreamConsumer (XREADGROUP with consumer groups)
  • Depends on: Redis Streams
  • Used by: All services for pub/sub communication

Frontend Layer:

  • Purpose: User-facing dashboard for monitoring and control
  • Location: dashboard/src/
  • Contains: React pages, components, API client, WebAuthn auth
  • Depends on: TanStack Query, TradingView charts, Recharts, React Router
  • Used by: End users for real-time monitoring

Utility Layer:

  • Purpose: Cross-cutting concerns
  • Location: shared/config.py, shared/telemetry.py, shared/schemas/
  • Contains: Configuration management (Pydantic BaseSettings), OpenTelemetry/Prometheus metrics setup, message schemas (Pydantic v2)
  • Depends on: pydantic-settings, opentelemetry-sdk, prometheus-client
  • Used by: All services

Data Flow

News Ingestion Pipeline:

  1. services/news_fetcher/main.py polls RSS feeds and Reddit on independent schedules
  2. Uses RSSSource and RedditSource to fetch articles
  3. Deduplicates via Redis SET (news:seen_hashes)
  4. Publishes RawArticle messages to news:raw Redis Stream

Sentiment Analysis:

  1. services/sentiment_analyzer/main.py consumes from news:raw
  2. Runs articles through FinBERTAnalyzer for sentiment scores
  3. Falls back to OllamaAnalyzer if confidence < threshold (configured via SentimentAnalyzerConfig.finbert_confidence_threshold)
  4. Extracts ticker mentions via regex in ticker_extractor.py
  5. Persists Article + ArticleSentiment to database
  6. Publishes ScoredArticle messages (one per ticker per article) to news:scored Redis Stream

Market Data Collection:

  1. services/market_data/main.py fetches OHLCV bars from Alpaca (historical + live)
  2. Publishes bars to market:bars Redis Stream as ticker-specific snapshots
  3. Consumed by signal generator for technical indicators

Signal Generation:

  1. services/signal_generator/main.py consumes from both news:scored and market:bars
  2. Maintains in-memory buffers: per-ticker sentiment context (deque of last 50 scores) and market data (SMA, RSI)
  3. Runs WeightedEnsemble combining three strategies:
    • MomentumStrategy: SMA cross-over based
    • MeanReversionStrategy: RSI-based
    • NewsDrivenStrategy: Sentiment-aware
  4. Weights loaded from Redis cache (default equal weights: 0.333 each)
  5. Only emits TradeSignal when combined strength > threshold (default 0.3)
  6. Publishes to signals:generated Redis Stream, persists Signal model to database

Trade Execution:

  1. services/trade_executor/main.py consumes from signals:generated
  2. RiskManager performs pre-trade checks:
    • Market hours validation (9:30 AM - 4:00 PM ET)
    • Max daily loss limit
    • Per-ticker position limits
    • Cooldown period after exits
  3. Calculates position size based on account equity and risk parameters
  4. Submits order via AlpacaBroker (abstraction over alpaca-py)
  5. Records Trade model with order details
  6. Publishes TradeExecution to trades:executed Redis Stream

Learning & Weight Adjustment:

  1. services/learning_engine/main.py consumes from trades:executed
  2. Identifies position closes by tracking opening trades in Redis hash (positions:history)
  3. TradeEvaluator calculates realized P&L
  4. WeightAdjuster uses multi-armed bandit approach:
    • Attributes P&L credit to contributing strategies
    • Adjusts weights (with guardrails: min trades threshold, max shift limit, weight floor)
    • Persists adjustments to LearningAdjustment model for auditability
  5. Saves updated weights to Redis cache

Portfolio Sync (Background Task):

  1. services/api_gateway/tasks/portfolio_sync.py runs as background coroutine
  2. Periodically queries Alpaca for current positions, cash, equity
  3. Stores PortfolioSnapshot in database for historical tracking
  4. Dashboard polls portfolio endpoint for real-time data

API Gateway & Dashboard:

  1. services/api_gateway/main.py FastAPI app with:
    • REST endpoints (/api/signals, /api/trades, /api/portfolio, /api/news, /api/strategies, /api/backtest, /api/controls)
    • WebSocket endpoint for real-time updates
    • WebAuthn/JWT authentication
  2. Dashboard (dashboard/src/App.tsx) consumed by authenticated users
  3. Real-time market data, trades, signals streamed via WebSocket

State Management:

  • Redis Streams: Messages flow unidirectionally; consumer groups ensure at-least-once processing
  • Redis Cache: Strategy weights, position history, deduplication hashes
  • PostgreSQL: Long-term storage of trades, signals, articles, market snapshots, users
  • In-Memory: Services maintain per-ticker buffers (sentiment scores, market data) for current processing cycle; cleared/rebuilt as data arrives
  • JWT Sessions: Issued at login, renewed via refresh token endpoint

Key Abstractions

BaseBroker:

  • Purpose: Swap brokerage providers without changing services
  • Examples: shared/broker/base.py (ABC), shared/broker/alpaca_broker.py (implementation)
  • Pattern: Abstract methods for submit_order(), get_account(), get_positions(), cancel_order()
  • Trade executor only knows BaseBroker interface; AlpacaBroker is a concrete implementation

BaseStrategy:

  • Purpose: Pluggable trading strategy implementations
  • Examples: shared/strategies/base.py (ABC), momentum.py, mean_reversion.py, news_driven.py
  • Pattern: Each strategy implements async evaluate(ticker, market, sentiment) returning TradeSignal | None
  • Signal generator instantiates all three at startup, runs via WeightedEnsemble.evaluate()

StreamPublisher / StreamConsumer:

  • Purpose: Simplified Redis Streams wrapper with JSON serialization
  • Examples: shared/redis_streams.py
  • Pattern: Publisher.publish(dict)XADD, Consumer.consume() → async iterator yielding (msg_id, data)
  • Consumer groups ensure each message is processed once per group

Pydantic Schemas:

  • Purpose: Message validation and serialization
  • Examples: shared/schemas/news.py, shared/schemas/trading.py, shared/schemas/learning.py
  • Pattern: Each stream has a schema (e.g., RawArticle, ScoredArticle, TradeSignal, TradeExecution)
  • Services validate on consume, serialize with model_dump(mode="json") before publish

Entry Points

News Fetcher:

  • Location: services/news_fetcher/main.py
  • Triggers: Starts as standalone service, polling on timers
  • Responsibilities: Fetch RSS/Reddit articles, deduplicate, publish to news:raw

Sentiment Analyzer:

  • Location: services/sentiment_analyzer/main.py
  • Triggers: Consumes messages from news:raw stream
  • Responsibilities: Score sentiment (FinBERT + Ollama fallback), extract tickers, persist articles, publish to news:scored

Market Data Service:

  • Location: services/market_data/main.py
  • Triggers: Starts as standalone service, fetches on startup + live streaming
  • Responsibilities: Fetch OHLCV bars from Alpaca, publish to market:bars

Signal Generator:

  • Location: services/signal_generator/main.py
  • Triggers: Consumes from both news:scored and market:bars streams
  • Responsibilities: Combine sentiment + technical data, run strategy ensemble, publish to signals:generated

Trade Executor:

  • Location: services/trade_executor/main.py
  • Triggers: Consumes from signals:generated stream
  • Responsibilities: Risk check, position sizing, order submission, trade recording, publish to trades:executed

Learning Engine:

  • Location: services/learning_engine/main.py
  • Triggers: Consumes from trades:executed stream
  • Responsibilities: Evaluate closed positions, adjust strategy weights, audit adjustments

API Gateway:

  • Location: services/api_gateway/main.py
  • Triggers: HTTP server listening on configurable port (default 8000)
  • Responsibilities: REST/WebSocket API, user authentication (WebAuthn), portfolio sync task, query database

Backtester:

  • Location: backtester/engine.py
  • Triggers: Called from API endpoint /api/backtest (POST)
  • Responsibilities: Historical replay with SimulatedBroker, metrics calculation, equity curve generation

Error Handling

Strategy: Graceful degradation and event loop resilience.

Patterns:

  • Service-level: Catch broad Exception in polling loops (news fetcher), log and continue; metrics counters increment for monitoring
  • Fallback chains: Sentiment analyzer tries FinBERT first, falls back to Ollama on low confidence
  • Risk checks: Trade executor rejects signals that fail risk checks (logs reason, continues)
  • Database retries: If IntegrityError on article insert (duplicate), continue without re-raising
  • Stream consumption: Consumer groups handle delivery guarantees; manual XACK after processing ensures fault tolerance
  • Shutdown: asyncio.TaskGroup and signal handlers ensure graceful termination; cleanup of DB/Redis connections in FastAPI lifespan

Cross-Cutting Concerns

Logging:

  • Each service uses Python logging module with service name
  • All exceptions logged via logger.exception() for stack traces
  • Key events logged at INFO level (article counts, trades executed, weight updates)

Validation:

  • Pydantic v2 schemas validate all messages on consume
  • Database models use SQLAlchemy constraints (NOT NULL, UNIQUE, FOREIGN KEY)
  • Config classes extend BaseSettings with environment variable prefixes (TRADING_*)

Authentication:

  • WebAuthn (passkey) via webauthn PyPI package (services/api_gateway/auth/)
  • JWT tokens issued at login, verified on every protected endpoint via get_current_user dependency
  • Credentials stored as UserCredential model (credential ID, public key, counter)

Telemetry:

  • OpenTelemetry meters instantiated per service via setup_telemetry(service_name, metrics_port)
  • Counters: articles_fetched, finbert_count, ollama_count, rejections, trades_executed
  • Histograms: fill_latency, sentiment analysis duration, strategy evaluation time
  • Prometheus scrapes /metrics endpoint (default port 9090) from each service
  • No Prometheus/Grafana deployed; uses existing infrastructure for metrics collection