8 tasks covering: MarketSnapshot schema changes, indicator computations, fundamental data providers, DB caching, 6 new strategies, signal generator wiring, and deployment steps.
74 KiB
Extended Strategies + Fundamental Data — Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add 6 new trading strategies (Value, MACD Crossover, Bollinger Breakout, VWAP, Liquidity, MA Stack) with a fundamental data pipeline (3 providers with rotation + DB caching) and 7 new technical indicators.
Architecture: New shared/fundamentals/ module provides fundamental data via a rotating provider abstraction (Alpha Vantage → FMP → Yahoo Finance). Technical indicators (MACD, Bollinger, VWAP, ATR, EMA-9, EMA-21, SMA-200) are computed in MarketDataManager from existing OHLCV bars. Strategies implement the existing BaseStrategy.evaluate() interface and plug into the WeightedEnsemble.
Tech Stack: Python 3.12, Pydantic v2, SQLAlchemy 2.0 async, httpx (REST calls), yfinance, pytest
Design doc: docs/plans/2026-02-23-strategies-fundamentals-design.md
Task 1: Add new technical indicator fields to MarketSnapshot
Files:
- Modify:
shared/schemas/trading.py:127-140 - Test:
tests/test_schemas.py(existing — new fields are optional, won't break)
Step 1: Add indicator fields to MarketSnapshot
In shared/schemas/trading.py, add after line 137 (rsi: float | None = None):
# Technical indicators — computed by MarketDataManager
ema_9: float | None = None
ema_21: float | None = None
sma_200: float | None = None
macd: float | None = None
macd_signal: float | None = None
macd_histogram: float | None = None
bollinger_upper: float | None = None
bollinger_mid: float | None = None
bollinger_lower: float | None = None
vwap: float | None = None
atr: float | None = None
Step 2: Add FundamentalsSnapshot schema
Add after SentimentContext in the same file:
class FundamentalsSnapshot(BaseModel):
"""Fundamental financial data for a single ticker — cached daily."""
ticker: str
eps_ttm: float | None = None
pe_ratio: float | None = None
peg_ratio: float | None = None
revenue_growth_yoy: float | None = None
profit_margin: float | None = None
debt_to_equity: float | None = None
market_cap: float | None = None
fetched_at: datetime
model_config = {"from_attributes": True}
Also add a fundamentals field to MarketSnapshot:
fundamentals: FundamentalsSnapshot | None = None
Step 3: Run existing tests to confirm nothing breaks
Run: python -m pytest tests/test_schemas.py -v
Expected: All 49 tests PASS (new fields are optional with defaults)
Step 4: Commit
git add shared/schemas/trading.py
git commit -m "feat: add technical indicator and fundamentals fields to MarketSnapshot"
Task 2: Implement technical indicator computations in MarketDataManager
Files:
- Modify:
services/signal_generator/market_data.py - Test:
tests/test_indicators.py(new)
Step 1: Write failing tests for EMA computation
Create tests/test_indicators.py:
"""Tests for technical indicator computations in MarketDataManager."""
import pytest
from datetime import datetime, timezone
from services.signal_generator.market_data import MarketDataManager
from shared.schemas.trading import OHLCVBar
def _bar(close: float, volume: float = 1000.0, high: float | None = None, low: float | None = None, open_: float | None = None) -> OHLCVBar:
"""Helper to create an OHLCVBar with sensible defaults."""
return OHLCVBar(
timestamp=datetime.now(timezone.utc),
open=open_ if open_ is not None else close,
high=high if high is not None else close + 1,
low=low if low is not None else close - 1,
close=close,
volume=volume,
)
def _add_bars(mgr: MarketDataManager, ticker: str, closes: list[float], volume: float = 1000.0) -> None:
"""Add multiple bars with given close prices."""
for c in closes:
mgr.add_bar(ticker, _bar(c, volume=volume))
class TestEMA:
"""Tests for exponential moving average computation."""
def test_ema_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 5)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.ema_9 is None # Need at least 9 bars
def test_ema_9_with_exact_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 9)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.ema_9 is not None
assert snap.ema_9 == pytest.approx(100.0, abs=0.01)
def test_ema_responds_to_recent_prices(self) -> None:
mgr = MarketDataManager(max_bars=300)
# 20 bars at 100, then 1 bar at 110
_add_bars(mgr, "AAPL", [100.0] * 20 + [110.0])
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.ema_9 is not None
# EMA-9 should be > 100 because the most recent price is 110
assert snap.ema_9 > 100.0
assert snap.ema_9 < 110.0
def test_ema_21_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 15)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.ema_21 is None
def test_ema_21_computed_with_enough_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 25)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.ema_21 is not None
assert snap.ema_21 == pytest.approx(100.0, abs=0.01)
class TestSMA200:
"""Tests for SMA-200 computation."""
def test_sma_200_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 100)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.sma_200 is None
def test_sma_200_computed_with_enough_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 200)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.sma_200 is not None
assert snap.sma_200 == pytest.approx(100.0, abs=0.01)
class TestMACD:
"""Tests for MACD computation."""
def test_macd_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 20)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.macd is None
def test_macd_computed_with_enough_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
# Need 26 bars for EMA-26, plus 9 more for the signal line = 35 minimum
_add_bars(mgr, "AAPL", [100.0] * 40)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.macd is not None
assert snap.macd_signal is not None
assert snap.macd_histogram is not None
# With constant prices, MACD should be ~0
assert snap.macd == pytest.approx(0.0, abs=0.1)
def test_macd_positive_in_uptrend(self) -> None:
mgr = MarketDataManager(max_bars=300)
# Rising prices: EMA-12 > EMA-26 → positive MACD
prices = [100.0 + i * 0.5 for i in range(50)]
_add_bars(mgr, "AAPL", prices)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.macd is not None
assert snap.macd > 0
class TestBollingerBands:
"""Tests for Bollinger Bands computation."""
def test_bollinger_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 10)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.bollinger_upper is None
def test_bollinger_computed_with_enough_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 25)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.bollinger_upper is not None
assert snap.bollinger_mid is not None
assert snap.bollinger_lower is not None
# With constant prices, bands should be tight around the price
assert snap.bollinger_mid == pytest.approx(100.0, abs=0.01)
assert snap.bollinger_upper >= snap.bollinger_mid
assert snap.bollinger_lower <= snap.bollinger_mid
def test_bollinger_width_increases_with_volatility(self) -> None:
mgr_stable = MarketDataManager(max_bars=300)
_add_bars(mgr_stable, "AAPL", [100.0] * 25)
snap_stable = mgr_stable.get_snapshot("AAPL")
mgr_volatile = MarketDataManager(max_bars=300)
# Alternating prices = high volatility
_add_bars(mgr_volatile, "AAPL", [90.0 + (i % 2) * 20 for i in range(25)])
snap_volatile = mgr_volatile.get_snapshot("AAPL")
assert snap_stable is not None and snap_volatile is not None
width_stable = snap_stable.bollinger_upper - snap_stable.bollinger_lower
width_volatile = snap_volatile.bollinger_upper - snap_volatile.bollinger_lower
assert width_volatile > width_stable
class TestVWAP:
"""Tests for VWAP computation."""
def test_vwap_computed(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 5, volume=1000.0)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.vwap is not None
# With constant prices and volume, VWAP = typical price
# typical = (high + low + close) / 3 = (101 + 99 + 100) / 3 = 100.0
assert snap.vwap == pytest.approx(100.0, abs=0.01)
def test_vwap_weighted_by_volume(self) -> None:
mgr = MarketDataManager(max_bars=300)
# Bar 1: close=100, volume=1000
mgr.add_bar("AAPL", _bar(100.0, volume=1000.0))
# Bar 2: close=200, volume=3000 (should pull VWAP toward 200)
mgr.add_bar("AAPL", _bar(200.0, volume=3000.0))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.vwap is not None
assert snap.vwap > 150.0 # Weighted toward 200
class TestATR:
"""Tests for ATR computation."""
def test_atr_returns_none_insufficient_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
_add_bars(mgr, "AAPL", [100.0] * 5)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.atr is None
def test_atr_computed_with_enough_data(self) -> None:
mgr = MarketDataManager(max_bars=300)
# 15 bars needed (14-period ATR + 1 for prev close)
_add_bars(mgr, "AAPL", [100.0] * 15)
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.atr is not None
assert snap.atr >= 0
def test_atr_increases_with_volatility(self) -> None:
mgr_stable = MarketDataManager(max_bars=300)
for i in range(20):
mgr_stable.add_bar("AAPL", OHLCVBar(
timestamp=datetime.now(timezone.utc),
open=100.0, high=101.0, low=99.0, close=100.0, volume=1000.0,
))
snap_stable = mgr_stable.get_snapshot("AAPL")
mgr_volatile = MarketDataManager(max_bars=300)
for i in range(20):
mgr_volatile.add_bar("AAPL", OHLCVBar(
timestamp=datetime.now(timezone.utc),
open=100.0, high=115.0, low=85.0, close=100.0, volume=1000.0,
))
snap_volatile = mgr_volatile.get_snapshot("AAPL")
assert snap_stable is not None and snap_volatile is not None
assert snap_volatile.atr > snap_stable.atr
Step 2: Run tests to verify they fail
Run: python -m pytest tests/test_indicators.py -v
Expected: FAIL — ema_9, macd, etc. don't exist on MarketSnapshot yet (Task 1 must be done first) and computations aren't implemented.
Step 3: Implement indicator computations in MarketDataManager
Modify services/signal_generator/market_data.py:
- Add
_compute_emastatic method:
@staticmethod
def _compute_ema(closes: list[float], period: int) -> float | None:
"""Compute exponential moving average over *closes* with given *period*."""
if len(closes) < period:
return None
multiplier = 2.0 / (period + 1)
ema = sum(closes[:period]) / period # Seed with SMA
for price in closes[period:]:
ema = (price - ema) * multiplier + ema
return round(ema, 6)
- Add
_compute_macdstatic method:
@staticmethod
def _compute_macd(closes: list[float]) -> tuple[float, float, float] | None:
"""Compute MACD (12,26,9). Returns (macd_line, signal_line, histogram) or None."""
if len(closes) < 35: # 26 for slow EMA + 9 for signal
return None
mul_12 = 2.0 / 13
mul_26 = 2.0 / 27
mul_9 = 2.0 / 10
ema_12 = sum(closes[:12]) / 12
ema_26 = sum(closes[:26]) / 26
macd_values: list[float] = []
for i in range(len(closes)):
if i < 12:
continue
if i == 12:
ema_12 = sum(closes[:12]) / 12
else:
ema_12 = (closes[i] - ema_12) * mul_12 + ema_12
if i < 26:
continue
if i == 26:
ema_26 = sum(closes[:26]) / 26
# Recompute ema_12 from scratch for accuracy
ema_12 = sum(closes[:12]) / 12
for j in range(12, i + 1):
ema_12 = (closes[j] - ema_12) * mul_12 + ema_12
else:
ema_26 = (closes[i] - ema_26) * mul_26 + ema_26
macd_values.append(ema_12 - ema_26)
if len(macd_values) < 9:
return None
signal = sum(macd_values[:9]) / 9
for val in macd_values[9:]:
signal = (val - signal) * mul_9 + signal
macd_line = macd_values[-1]
histogram = macd_line - signal
return round(macd_line, 6), round(signal, 6), round(histogram, 6)
- Add
_compute_bollingerstatic method:
@staticmethod
def _compute_bollinger(closes: list[float], period: int = 20, num_std: float = 2.0) -> tuple[float, float, float] | None:
"""Compute Bollinger Bands. Returns (upper, mid, lower) or None."""
if len(closes) < period:
return None
window = closes[-period:]
mid = sum(window) / period
variance = sum((x - mid) ** 2 for x in window) / period
std = variance ** 0.5
return round(mid + num_std * std, 6), round(mid, 6), round(mid - num_std * std, 6)
- Add
_compute_vwapstatic method:
@staticmethod
def _compute_vwap(bars: list[OHLCVBar]) -> float | None:
"""Compute VWAP from bars. Returns None if no bars."""
if not bars:
return None
cum_tp_vol = 0.0
cum_vol = 0.0
for bar in bars:
typical_price = (bar.high + bar.low + bar.close) / 3.0
cum_tp_vol += typical_price * bar.volume
cum_vol += bar.volume
if cum_vol == 0:
return None
return round(cum_tp_vol / cum_vol, 6)
- Add
_compute_atrstatic method:
@staticmethod
def _compute_atr(bars: list[OHLCVBar], period: int = 14) -> float | None:
"""Compute Average True Range over *period*. Needs period+1 bars."""
if len(bars) < period + 1:
return None
relevant = bars[-(period + 1):]
true_ranges: list[float] = []
for i in range(1, len(relevant)):
high_low = relevant[i].high - relevant[i].low
high_prev_close = abs(relevant[i].high - relevant[i - 1].close)
low_prev_close = abs(relevant[i].low - relevant[i - 1].close)
true_ranges.append(max(high_low, high_prev_close, low_prev_close))
return round(sum(true_ranges) / len(true_ranges), 6)
- Update
get_snapshot()to compute and include all new indicators:
def get_snapshot(self, ticker: str) -> MarketSnapshot | None:
bars = self._bars.get(ticker)
if not bars:
return None
latest = bars[-1]
closes = [b.close for b in bars]
bar_list = list(bars)
# MACD
macd_result = self._compute_macd(closes)
macd_val = macd_signal = macd_hist = None
if macd_result:
macd_val, macd_signal, macd_hist = macd_result
# Bollinger
boll_result = self._compute_bollinger(closes)
boll_upper = boll_mid = boll_lower = None
if boll_result:
boll_upper, boll_mid, boll_lower = boll_result
return MarketSnapshot(
ticker=ticker,
current_price=latest.close,
open=latest.open,
high=latest.high,
low=latest.low,
close=latest.close,
volume=latest.volume,
sma_20=self._compute_sma(closes, 20),
sma_50=self._compute_sma(closes, 50),
sma_200=self._compute_sma(closes, 200),
rsi=self._compute_rsi(closes, _RSI_PERIOD),
ema_9=self._compute_ema(closes, 9),
ema_21=self._compute_ema(closes, 21),
macd=macd_val,
macd_signal=macd_signal,
macd_histogram=macd_hist,
bollinger_upper=boll_upper,
bollinger_mid=boll_mid,
bollinger_lower=boll_lower,
vwap=self._compute_vwap(bar_list),
atr=self._compute_atr(bar_list),
bars=[b.model_dump(mode="json") for b in bars],
)
Step 4: Run tests to verify they pass
Run: python -m pytest tests/test_indicators.py -v
Expected: All PASS
Step 5: Run existing tests to confirm no regressions
Run: python -m pytest tests/ -v -m "not integration" --timeout=30
Expected: All 246+ tests PASS
Step 6: Commit
git add services/signal_generator/market_data.py tests/test_indicators.py
git commit -m "feat: add MACD, Bollinger, VWAP, ATR, EMA, SMA-200 indicator computations"
Task 3: Implement fundamental data providers
Files:
- Create:
shared/fundamentals/__init__.py - Create:
shared/fundamentals/base.py - Create:
shared/fundamentals/alpha_vantage.py - Create:
shared/fundamentals/fmp.py - Create:
shared/fundamentals/yahoo.py - Create:
shared/fundamentals/rotating.py - Test:
tests/test_fundamentals.py(new)
Step 1: Write failing tests
Create tests/test_fundamentals.py:
"""Tests for fundamental data providers."""
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch, MagicMock
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.fundamentals.alpha_vantage import AlphaVantageProvider
from shared.fundamentals.fmp import FMPProvider
from shared.fundamentals.yahoo import YahooFinanceProvider
from shared.fundamentals.rotating import RotatingProvider
class TestFundamentalsSnapshot:
"""Tests for the FundamentalsSnapshot model."""
def test_snapshot_all_fields(self) -> None:
snap = FundamentalsSnapshot(
ticker="AAPL",
eps_ttm=6.5,
pe_ratio=28.0,
peg_ratio=1.5,
revenue_growth_yoy=0.08,
profit_margin=0.25,
debt_to_equity=1.8,
market_cap=3_000_000_000_000.0,
fetched_at=datetime.now(timezone.utc),
)
assert snap.ticker == "AAPL"
assert snap.eps_ttm == 6.5
def test_snapshot_optional_fields(self) -> None:
snap = FundamentalsSnapshot(
ticker="AAPL",
fetched_at=datetime.now(timezone.utc),
)
assert snap.eps_ttm is None
assert snap.pe_ratio is None
class TestAlphaVantageProvider:
"""Tests for Alpha Vantage provider."""
@pytest.mark.asyncio
async def test_fetch_parses_response(self) -> None:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"Symbol": "AAPL",
"EPS": "6.50",
"PERatio": "28.00",
"PEGRatio": "1.50",
"QuarterlyRevenueGrowthYOY": "0.08",
"ProfitMargin": "0.25",
"DebtToEquity": "180", # Alpha Vantage returns as percentage
"MarketCapitalization": "3000000000000",
}
provider = AlphaVantageProvider(api_key="test-key")
with patch.object(provider, "_client") as mock_client:
mock_client.get = AsyncMock(return_value=mock_response)
result = await provider.fetch("AAPL")
assert result is not None
assert result.ticker == "AAPL"
assert result.eps_ttm == 6.5
assert result.pe_ratio == 28.0
@pytest.mark.asyncio
async def test_fetch_handles_rate_limit(self) -> None:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"Note": "Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day."
}
provider = AlphaVantageProvider(api_key="test-key")
with patch.object(provider, "_client") as mock_client:
mock_client.get = AsyncMock(return_value=mock_response)
result = await provider.fetch("AAPL")
assert result is None
class TestFMPProvider:
"""Tests for Financial Modeling Prep provider."""
@pytest.mark.asyncio
async def test_fetch_parses_response(self) -> None:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = [
{
"symbol": "AAPL",
"eps": 6.5,
"pe": 28.0,
"pegRatio": 1.5,
"revenueGrowth": 0.08,
"netProfitMargin": 0.25,
"debtToEquity": 1.8,
"marketCap": 3_000_000_000_000,
}
]
provider = FMPProvider(api_key="test-key")
with patch.object(provider, "_client") as mock_client:
mock_client.get = AsyncMock(return_value=mock_response)
result = await provider.fetch("AAPL")
assert result is not None
assert result.ticker == "AAPL"
assert result.peg_ratio == 1.5
@pytest.mark.asyncio
async def test_fetch_handles_empty_response(self) -> None:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = []
provider = FMPProvider(api_key="test-key")
with patch.object(provider, "_client") as mock_client:
mock_client.get = AsyncMock(return_value=mock_response)
result = await provider.fetch("INVALID")
assert result is None
class TestYahooFinanceProvider:
"""Tests for Yahoo Finance provider."""
@pytest.mark.asyncio
async def test_fetch_parses_ticker_info(self) -> None:
mock_ticker = MagicMock()
mock_ticker.info = {
"trailingEps": 6.5,
"trailingPE": 28.0,
"pegRatio": 1.5,
"revenueGrowth": 0.08,
"profitMargins": 0.25,
"debtToEquity": 180.0,
"marketCap": 3_000_000_000_000,
}
provider = YahooFinanceProvider()
with patch("shared.fundamentals.yahoo.yfinance.Ticker", return_value=mock_ticker):
result = await provider.fetch("AAPL")
assert result is not None
assert result.ticker == "AAPL"
assert result.eps_ttm == 6.5
assert result.debt_to_equity == pytest.approx(1.8, abs=0.01)
class TestRotatingProvider:
"""Tests for the rotating provider with failover."""
@pytest.mark.asyncio
async def test_uses_first_provider_on_success(self) -> None:
snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc), eps_ttm=6.5)
p1 = AsyncMock(spec=FundamentalsProvider)
p1.fetch = AsyncMock(return_value=snap)
p2 = AsyncMock(spec=FundamentalsProvider)
rotating = RotatingProvider([p1, p2])
result = await rotating.fetch("AAPL")
assert result is not None
assert result.eps_ttm == 6.5
p1.fetch.assert_awaited_once_with("AAPL")
p2.fetch.assert_not_awaited()
@pytest.mark.asyncio
async def test_falls_back_on_failure(self) -> None:
snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc), eps_ttm=7.0)
p1 = AsyncMock(spec=FundamentalsProvider)
p1.fetch = AsyncMock(return_value=None) # Rate limited
p2 = AsyncMock(spec=FundamentalsProvider)
p2.fetch = AsyncMock(return_value=snap)
rotating = RotatingProvider([p1, p2])
result = await rotating.fetch("AAPL")
assert result is not None
assert result.eps_ttm == 7.0
@pytest.mark.asyncio
async def test_returns_none_when_all_fail(self) -> None:
p1 = AsyncMock(spec=FundamentalsProvider)
p1.fetch = AsyncMock(return_value=None)
p2 = AsyncMock(spec=FundamentalsProvider)
p2.fetch = AsyncMock(return_value=None)
rotating = RotatingProvider([p1, p2])
result = await rotating.fetch("AAPL")
assert result is None
@pytest.mark.asyncio
async def test_handles_exception_and_falls_back(self) -> None:
snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc))
p1 = AsyncMock(spec=FundamentalsProvider)
p1.fetch = AsyncMock(side_effect=Exception("API error"))
p2 = AsyncMock(spec=FundamentalsProvider)
p2.fetch = AsyncMock(return_value=snap)
rotating = RotatingProvider([p1, p2])
result = await rotating.fetch("AAPL")
assert result is not None
Step 2: Run tests to verify they fail
Run: python -m pytest tests/test_fundamentals.py -v
Expected: FAIL — modules don't exist yet
Step 3: Create shared/fundamentals/__init__.py
"""Fundamental data providers for stock financial metrics."""
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.fundamentals.rotating import RotatingProvider
__all__ = ["FundamentalsProvider", "FundamentalsSnapshot", "RotatingProvider"]
Step 4: Create shared/fundamentals/base.py
"""Abstract base class for fundamental data providers."""
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime
from pydantic import BaseModel
class FundamentalsSnapshot(BaseModel):
"""Fundamental financial data for a single ticker."""
ticker: str
eps_ttm: float | None = None
pe_ratio: float | None = None
peg_ratio: float | None = None
revenue_growth_yoy: float | None = None
profit_margin: float | None = None
debt_to_equity: float | None = None
market_cap: float | None = None
fetched_at: datetime
model_config = {"from_attributes": True}
class FundamentalsProvider(ABC):
"""Abstract base class for fundamental data providers."""
@abstractmethod
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
"""Fetch fundamental data for *ticker*. Returns None on failure/rate limit."""
...
Step 5: Create shared/fundamentals/alpha_vantage.py
"""Alpha Vantage fundamental data provider."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
import httpx
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
_BASE_URL = "https://www.alphavantage.co/query"
class AlphaVantageProvider(FundamentalsProvider):
"""Fetch fundamentals from the Alpha Vantage OVERVIEW endpoint."""
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._client = httpx.AsyncClient(timeout=30.0)
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
try:
resp = await self._client.get(
_BASE_URL,
params={"function": "OVERVIEW", "symbol": ticker, "apikey": self._api_key},
)
data = resp.json()
# Rate limit detection
if "Note" in data or "Information" in data:
logger.warning("Alpha Vantage rate limit hit for %s", ticker)
return None
if "Symbol" not in data:
logger.warning("Alpha Vantage returned no data for %s", ticker)
return None
return FundamentalsSnapshot(
ticker=ticker,
eps_ttm=_safe_float(data.get("EPS")),
pe_ratio=_safe_float(data.get("PERatio")),
peg_ratio=_safe_float(data.get("PEGRatio")),
revenue_growth_yoy=_safe_float(data.get("QuarterlyRevenueGrowthYOY")),
profit_margin=_safe_float(data.get("ProfitMargin")),
debt_to_equity=_safe_float_div100(data.get("DebtToEquity")),
market_cap=_safe_float(data.get("MarketCapitalization")),
fetched_at=datetime.now(timezone.utc),
)
except Exception:
logger.exception("Alpha Vantage fetch failed for %s", ticker)
return None
def _safe_float(val: str | None) -> float | None:
if val is None or val in ("None", "-", ""):
return None
try:
return float(val)
except (ValueError, TypeError):
return None
def _safe_float_div100(val: str | None) -> float | None:
"""Alpha Vantage returns debt-to-equity as a percentage (e.g. 180 = 1.8)."""
f = _safe_float(val)
return f / 100.0 if f is not None else None
Step 6: Create shared/fundamentals/fmp.py
"""Financial Modeling Prep (FMP) fundamental data provider."""
from __future__ import annotations
import logging
from datetime import datetime, timezone
import httpx
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
_BASE_URL = "https://financialmodelingprep.com/api/v3"
class FMPProvider(FundamentalsProvider):
"""Fetch fundamentals from the FMP key-metrics endpoint."""
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._client = httpx.AsyncClient(timeout=30.0)
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
try:
resp = await self._client.get(
f"{_BASE_URL}/key-metrics-ttm/{ticker}",
params={"apikey": self._api_key},
)
data = resp.json()
if not data or not isinstance(data, list) or len(data) == 0:
logger.warning("FMP returned no data for %s", ticker)
return None
item = data[0]
return FundamentalsSnapshot(
ticker=ticker,
eps_ttm=item.get("eps") or item.get("netIncomePerShareTTM"),
pe_ratio=item.get("pe") or item.get("peRatioTTM"),
peg_ratio=item.get("pegRatio") or item.get("pegRatioTTM"),
revenue_growth_yoy=item.get("revenueGrowth"),
profit_margin=item.get("netProfitMargin") or item.get("netProfitMarginTTM"),
debt_to_equity=item.get("debtToEquity") or item.get("debtToEquityTTM"),
market_cap=item.get("marketCap") or item.get("marketCapTTM"),
fetched_at=datetime.now(timezone.utc),
)
except Exception:
logger.exception("FMP fetch failed for %s", ticker)
return None
Step 7: Create shared/fundamentals/yahoo.py
"""Yahoo Finance fundamental data provider (via yfinance library)."""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
class YahooFinanceProvider(FundamentalsProvider):
"""Fetch fundamentals via yfinance (runs in thread pool to avoid blocking)."""
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
try:
import yfinance
loop = asyncio.get_running_loop()
info = await loop.run_in_executor(None, _get_ticker_info, ticker)
if not info:
logger.warning("yfinance returned no info for %s", ticker)
return None
d2e = info.get("debtToEquity")
# yfinance returns debt-to-equity as percentage (e.g. 180.0 = 1.8)
if d2e is not None:
d2e = d2e / 100.0
return FundamentalsSnapshot(
ticker=ticker,
eps_ttm=info.get("trailingEps"),
pe_ratio=info.get("trailingPE"),
peg_ratio=info.get("pegRatio"),
revenue_growth_yoy=info.get("revenueGrowth"),
profit_margin=info.get("profitMargins"),
debt_to_equity=d2e,
market_cap=info.get("marketCap"),
fetched_at=datetime.now(timezone.utc),
)
except Exception:
logger.exception("yfinance fetch failed for %s", ticker)
return None
def _get_ticker_info(ticker: str) -> dict:
"""Synchronous helper — called via run_in_executor."""
import yfinance
t = yfinance.Ticker(ticker)
return t.info
Step 8: Create shared/fundamentals/rotating.py
"""Rotating provider that tries multiple providers with failover."""
from __future__ import annotations
import logging
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
logger = logging.getLogger(__name__)
class RotatingProvider(FundamentalsProvider):
"""Try each provider in order; return the first successful result."""
def __init__(self, providers: list[FundamentalsProvider]) -> None:
self._providers = providers
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
for provider in self._providers:
try:
result = await provider.fetch(ticker)
if result is not None:
logger.debug(
"Fetched fundamentals for %s via %s",
ticker,
type(provider).__name__,
)
return result
logger.debug(
"%s returned None for %s, trying next",
type(provider).__name__,
ticker,
)
except Exception:
logger.exception(
"%s failed for %s, trying next",
type(provider).__name__,
ticker,
)
logger.warning("All providers failed for %s", ticker)
return None
Step 9: Run tests
Run: python -m pytest tests/test_fundamentals.py -v
Expected: All PASS
Step 10: Commit
git add shared/fundamentals/ tests/test_fundamentals.py
git commit -m "feat: add fundamental data providers (Alpha Vantage, FMP, Yahoo Finance) with rotation"
Task 4: Add fundamentals DB table and caching
Files:
- Create:
shared/models/fundamentals.py - Modify:
shared/models/__init__.py - Create:
alembic/versions/xxx_add_fundamentals_table.py(via alembic) - Create:
shared/fundamentals/cache.py - Test:
tests/test_fundamentals.py(add cache tests)
Step 1: Create shared/models/fundamentals.py
"""Fundamentals database model for caching fundamental data."""
import uuid
from sqlalchemy import Float, String, DateTime
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from shared.models.base import Base, TimestampMixin
class Fundamentals(TimestampMixin, Base):
__tablename__ = "fundamentals"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
ticker: Mapped[str] = mapped_column(String(20), unique=True, nullable=False, index=True)
eps_ttm: Mapped[float | None] = mapped_column(Float, nullable=True)
pe_ratio: Mapped[float | None] = mapped_column(Float, nullable=True)
peg_ratio: Mapped[float | None] = mapped_column(Float, nullable=True)
revenue_growth_yoy: Mapped[float | None] = mapped_column(Float, nullable=True)
profit_margin: Mapped[float | None] = mapped_column(Float, nullable=True)
debt_to_equity: Mapped[float | None] = mapped_column(Float, nullable=True)
market_cap: Mapped[float | None] = mapped_column(Float, nullable=True)
fetched_at: Mapped[str] = mapped_column(DateTime(timezone=True), nullable=False)
Step 2: Update shared/models/__init__.py
Add from shared.models.fundamentals import Fundamentals and add "Fundamentals" to __all__.
Step 3: Generate Alembic migration
Run: python -m alembic revision --autogenerate -m "add fundamentals table"
Then verify the generated migration looks correct.
Step 4: Create shared/fundamentals/cache.py
"""DB-backed cache for fundamental data."""
from __future__ import annotations
import logging
from datetime import datetime, timezone, timedelta
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.models.fundamentals import Fundamentals
logger = logging.getLogger(__name__)
class CachedFundamentalsProvider:
"""Wraps a FundamentalsProvider with DB-backed caching.
On fetch: checks DB first. If data is fresh (within TTL), returns cached.
Otherwise fetches from the underlying provider and updates the DB.
"""
def __init__(
self,
provider: FundamentalsProvider,
session_factory: async_sessionmaker,
cache_ttl_hours: int = 24,
) -> None:
self._provider = provider
self._session_factory = session_factory
self._cache_ttl = timedelta(hours=cache_ttl_hours)
async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
# Try cache first
cached = await self._load_from_db(ticker)
if cached is not None:
age = datetime.now(timezone.utc) - cached.fetched_at.replace(tzinfo=timezone.utc)
if age < self._cache_ttl:
logger.debug("Cache hit for %s (age=%s)", ticker, age)
return cached
logger.debug("Cache stale for %s (age=%s), refreshing", ticker, age)
# Fetch from provider
result = await self._provider.fetch(ticker)
if result is not None:
await self._save_to_db(result)
return result
async def _load_from_db(self, ticker: str) -> FundamentalsSnapshot | None:
try:
async with self._session_factory() as session:
stmt = select(Fundamentals).where(Fundamentals.ticker == ticker)
row = (await session.execute(stmt)).scalar_one_or_none()
if row is None:
return None
return FundamentalsSnapshot(
ticker=row.ticker,
eps_ttm=row.eps_ttm,
pe_ratio=row.pe_ratio,
peg_ratio=row.peg_ratio,
revenue_growth_yoy=row.revenue_growth_yoy,
profit_margin=row.profit_margin,
debt_to_equity=row.debt_to_equity,
market_cap=row.market_cap,
fetched_at=row.fetched_at,
)
except Exception:
logger.exception("Failed to load fundamentals from DB for %s", ticker)
return None
async def _save_to_db(self, snapshot: FundamentalsSnapshot) -> None:
try:
async with self._session_factory() as session:
stmt = select(Fundamentals).where(Fundamentals.ticker == snapshot.ticker)
existing = (await session.execute(stmt)).scalar_one_or_none()
if existing:
existing.eps_ttm = snapshot.eps_ttm
existing.pe_ratio = snapshot.pe_ratio
existing.peg_ratio = snapshot.peg_ratio
existing.revenue_growth_yoy = snapshot.revenue_growth_yoy
existing.profit_margin = snapshot.profit_margin
existing.debt_to_equity = snapshot.debt_to_equity
existing.market_cap = snapshot.market_cap
existing.fetched_at = snapshot.fetched_at
else:
row = Fundamentals(
ticker=snapshot.ticker,
eps_ttm=snapshot.eps_ttm,
pe_ratio=snapshot.pe_ratio,
peg_ratio=snapshot.peg_ratio,
revenue_growth_yoy=snapshot.revenue_growth_yoy,
profit_margin=snapshot.profit_margin,
debt_to_equity=snapshot.debt_to_equity,
market_cap=snapshot.market_cap,
fetched_at=snapshot.fetched_at,
)
session.add(row)
await session.commit()
logger.debug("Saved fundamentals for %s to DB", snapshot.ticker)
except Exception:
logger.exception("Failed to save fundamentals for %s to DB", snapshot.ticker)
Step 5: Run tests
Run: python -m pytest tests/test_fundamentals.py tests/test_models.py -v
Expected: PASS
Step 6: Commit
git add shared/models/fundamentals.py shared/models/__init__.py shared/fundamentals/cache.py alembic/
git commit -m "feat: add fundamentals DB model and cached provider"
Task 5: Implement the 6 new strategies
Files:
- Create:
shared/strategies/value.py - Create:
shared/strategies/macd_crossover.py - Create:
shared/strategies/bollinger_breakout.py - Create:
shared/strategies/vwap.py - Create:
shared/strategies/liquidity.py - Create:
shared/strategies/ma_stack.py - Modify:
shared/strategies/__init__.py - Test:
tests/test_new_strategies.py(new)
Step 1: Write failing tests
Create tests/test_new_strategies.py:
"""Tests for the 6 new trading strategies."""
import pytest
from datetime import datetime, timezone
from shared.fundamentals.base import FundamentalsSnapshot
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection
from shared.strategies import BaseStrategy
from shared.strategies.value import ValueStrategy
from shared.strategies.macd_crossover import MACDCrossoverStrategy
from shared.strategies.bollinger_breakout import BollingerBreakoutStrategy
from shared.strategies.vwap import VWAPStrategy
from shared.strategies.liquidity import LiquidityStrategy
from shared.strategies.ma_stack import MAStackStrategy
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _market(
ticker: str = "AAPL",
price: float = 150.0,
volume: float = 1_000_000,
sma_20: float | None = None,
sma_50: float | None = None,
sma_200: float | None = None,
rsi: float | None = None,
ema_9: float | None = None,
ema_21: float | None = None,
macd: float | None = None,
macd_signal: float | None = None,
macd_histogram: float | None = None,
bollinger_upper: float | None = None,
bollinger_mid: float | None = None,
bollinger_lower: float | None = None,
vwap: float | None = None,
atr: float | None = None,
fundamentals: FundamentalsSnapshot | None = None,
bars: list | None = None,
) -> MarketSnapshot:
return MarketSnapshot(
ticker=ticker,
current_price=price,
open=price - 1,
high=price + 2,
low=price - 2,
close=price,
volume=volume,
sma_20=sma_20,
sma_50=sma_50,
sma_200=sma_200,
rsi=rsi,
ema_9=ema_9,
ema_21=ema_21,
macd=macd,
macd_signal=macd_signal,
macd_histogram=macd_histogram,
bollinger_upper=bollinger_upper,
bollinger_mid=bollinger_mid,
bollinger_lower=bollinger_lower,
vwap=vwap,
atr=atr,
fundamentals=fundamentals,
bars=bars or [],
)
def _fundamentals(
ticker: str = "AAPL",
eps_ttm: float | None = 6.5,
pe_ratio: float | None = 28.0,
peg_ratio: float | None = 1.5,
revenue_growth_yoy: float | None = 0.08,
profit_margin: float | None = 0.25,
debt_to_equity: float | None = 1.8,
) -> FundamentalsSnapshot:
return FundamentalsSnapshot(
ticker=ticker,
eps_ttm=eps_ttm,
pe_ratio=pe_ratio,
peg_ratio=peg_ratio,
revenue_growth_yoy=revenue_growth_yoy,
profit_margin=profit_margin,
debt_to_equity=debt_to_equity,
fetched_at=datetime.now(timezone.utc),
)
# ===================================================================
# Value strategy
# ===================================================================
class TestValueStrategy:
@pytest.fixture()
def strategy(self) -> ValueStrategy:
return ValueStrategy()
@pytest.mark.asyncio
async def test_long_signal_undervalued(self, strategy: ValueStrategy) -> None:
f = _fundamentals(peg_ratio=0.8, pe_ratio=15.0, eps_ttm=5.0, revenue_growth_yoy=0.1)
market = _market(fundamentals=f)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_short_signal_overvalued(self, strategy: ValueStrategy) -> None:
f = _fundamentals(peg_ratio=4.0, pe_ratio=60.0, eps_ttm=-1.0)
market = _market(fundamentals=f)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.SHORT
@pytest.mark.asyncio
async def test_no_signal_without_fundamentals(self, strategy: ValueStrategy) -> None:
market = _market(fundamentals=None)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_no_signal_neutral_fundamentals(self, strategy: ValueStrategy) -> None:
f = _fundamentals(peg_ratio=1.5, pe_ratio=20.0, eps_ttm=3.0)
market = _market(fundamentals=f)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_strength_bounded(self, strategy: ValueStrategy) -> None:
f = _fundamentals(peg_ratio=0.1, pe_ratio=5.0, eps_ttm=10.0, revenue_growth_yoy=0.5)
market = _market(fundamentals=f)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert 0 < signal.strength <= 1.0
# ===================================================================
# MACD Crossover strategy
# ===================================================================
class TestMACDCrossoverStrategy:
@pytest.fixture()
def strategy(self) -> MACDCrossoverStrategy:
return MACDCrossoverStrategy()
@pytest.mark.asyncio
async def test_long_signal_bullish_crossover(self, strategy: MACDCrossoverStrategy) -> None:
# Simulate: previous MACD was below signal, now above
# First call sets the state
market_prev = _market(macd=-0.5, macd_signal=0.1, macd_histogram=-0.6, atr=2.0)
await strategy.evaluate("AAPL", market_prev)
# Second call: MACD crossed above signal
market = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_short_signal_bearish_crossover(self, strategy: MACDCrossoverStrategy) -> None:
# Previous: MACD above signal
market_prev = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
await strategy.evaluate("AAPL", market_prev)
# Now: MACD crossed below signal
market = _market(macd=-0.5, macd_signal=0.1, macd_histogram=-0.6, atr=2.0)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.SHORT
@pytest.mark.asyncio
async def test_no_signal_without_macd(self, strategy: MACDCrossoverStrategy) -> None:
market = _market(macd=None, macd_signal=None)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_no_signal_without_crossover(self, strategy: MACDCrossoverStrategy) -> None:
# Both calls have MACD above signal — no crossover
market1 = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
await strategy.evaluate("AAPL", market1)
market2 = _market(macd=0.6, macd_signal=0.1, macd_histogram=0.5, atr=2.0)
signal = await strategy.evaluate("AAPL", market2)
assert signal is None
# ===================================================================
# Bollinger Breakout strategy
# ===================================================================
class TestBollingerBreakoutStrategy:
@pytest.fixture()
def strategy(self) -> BollingerBreakoutStrategy:
return BollingerBreakoutStrategy()
@pytest.mark.asyncio
async def test_long_signal_above_upper_band(self, strategy: BollingerBreakoutStrategy) -> None:
market = _market(
price=155.0, bollinger_upper=152.0, bollinger_mid=150.0, bollinger_lower=148.0,
volume=2_000_000, sma_20=150.0,
)
# Need bars to compute average volume
bars = [{"volume": 1_000_000}] * 20
market.bars = bars
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_long_signal_below_lower_band(self, strategy: BollingerBreakoutStrategy) -> None:
market = _market(
price=145.0, bollinger_upper=152.0, bollinger_mid=150.0, bollinger_lower=148.0,
)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_no_signal_inside_bands(self, strategy: BollingerBreakoutStrategy) -> None:
market = _market(
price=150.0, bollinger_upper=155.0, bollinger_mid=150.0, bollinger_lower=145.0,
)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_no_signal_without_bollinger(self, strategy: BollingerBreakoutStrategy) -> None:
market = _market()
signal = await strategy.evaluate("AAPL", market)
assert signal is None
# ===================================================================
# VWAP strategy
# ===================================================================
class TestVWAPStrategy:
@pytest.fixture()
def strategy(self) -> VWAPStrategy:
return VWAPStrategy()
@pytest.mark.asyncio
async def test_long_signal_above_vwap(self, strategy: VWAPStrategy) -> None:
# First call: below VWAP
market_prev = _market(price=148.0, vwap=150.0, volume=1_000_000)
await strategy.evaluate("AAPL", market_prev)
# Second call: crossed above VWAP with volume
market = _market(price=152.0, vwap=150.0, volume=1_500_000)
market.bars = [{"volume": 1_000_000}] * 20
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_short_signal_below_vwap(self, strategy: VWAPStrategy) -> None:
market_prev = _market(price=152.0, vwap=150.0, volume=1_000_000)
await strategy.evaluate("AAPL", market_prev)
market = _market(price=148.0, vwap=150.0, volume=1_500_000)
market.bars = [{"volume": 1_000_000}] * 20
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.SHORT
@pytest.mark.asyncio
async def test_no_signal_without_vwap(self, strategy: VWAPStrategy) -> None:
market = _market(vwap=None)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
# ===================================================================
# Liquidity strategy
# ===================================================================
class TestLiquidityStrategy:
@pytest.fixture()
def strategy(self) -> LiquidityStrategy:
return LiquidityStrategy()
@pytest.mark.asyncio
async def test_long_signal_high_volume_up(self, strategy: LiquidityStrategy) -> None:
bars = [{"close": 100.0, "volume": 1_000_000}] * 20
market = _market(price=105.0, volume=2_500_000, bars=bars)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
@pytest.mark.asyncio
async def test_short_signal_high_volume_down(self, strategy: LiquidityStrategy) -> None:
bars = [{"close": 100.0, "volume": 1_000_000}] * 20
market = _market(price=95.0, volume=2_500_000, bars=bars)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.SHORT
@pytest.mark.asyncio
async def test_no_signal_thin_volume(self, strategy: LiquidityStrategy) -> None:
bars = [{"close": 100.0, "volume": 1_000_000}] * 20
market = _market(price=105.0, volume=400_000, bars=bars)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_no_signal_no_bars(self, strategy: LiquidityStrategy) -> None:
market = _market(price=105.0, volume=2_000_000, bars=[])
signal = await strategy.evaluate("AAPL", market)
assert signal is None
# ===================================================================
# MA Stack strategy
# ===================================================================
class TestMAStackStrategy:
@pytest.fixture()
def strategy(self) -> MAStackStrategy:
return MAStackStrategy()
@pytest.mark.asyncio
async def test_full_bull_alignment(self, strategy: MAStackStrategy) -> None:
market = _market(
price=160.0, ema_9=155.0, ema_21=150.0, sma_50=145.0, sma_200=140.0,
)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.LONG
assert signal.strength > 0.5
@pytest.mark.asyncio
async def test_full_bear_alignment(self, strategy: MAStackStrategy) -> None:
market = _market(
price=130.0, ema_9=135.0, ema_21=140.0, sma_50=145.0, sma_200=150.0,
)
signal = await strategy.evaluate("AAPL", market)
assert signal is not None
assert signal.direction == SignalDirection.SHORT
assert signal.strength > 0.5
@pytest.mark.asyncio
async def test_no_signal_tangled_mas(self, strategy: MAStackStrategy) -> None:
market = _market(
price=150.0, ema_9=151.0, ema_21=149.0, sma_50=152.0, sma_200=148.0,
)
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_no_signal_missing_mas(self, strategy: MAStackStrategy) -> None:
market = _market(ema_9=155.0) # Missing others
signal = await strategy.evaluate("AAPL", market)
assert signal is None
@pytest.mark.asyncio
async def test_partial_bull(self, strategy: MAStackStrategy) -> None:
# Price > EMA-9 > EMA-21, but below SMA-200 (partial bull)
market = _market(
price=155.0, ema_9=152.0, ema_21=148.0, sma_50=145.0, sma_200=160.0,
)
signal = await strategy.evaluate("AAPL", market)
# Should still produce LONG but with lower strength
if signal is not None:
assert signal.direction == SignalDirection.LONG
assert signal.strength < 0.8
# ===================================================================
# Cross-strategy tests
# ===================================================================
class TestNewStrategyCrossChecks:
def test_all_new_strategies_inherit_base(self) -> None:
for cls in (ValueStrategy, MACDCrossoverStrategy, BollingerBreakoutStrategy,
VWAPStrategy, LiquidityStrategy, MAStackStrategy):
assert issubclass(cls, BaseStrategy)
def test_all_strategy_names_unique(self) -> None:
strategies = [
ValueStrategy(), MACDCrossoverStrategy(), BollingerBreakoutStrategy(),
VWAPStrategy(), LiquidityStrategy(), MAStackStrategy(),
]
names = [s.name for s in strategies]
assert len(names) == len(set(names))
def test_strategy_names_non_empty(self) -> None:
for cls in (ValueStrategy, MACDCrossoverStrategy, BollingerBreakoutStrategy,
VWAPStrategy, LiquidityStrategy, MAStackStrategy):
assert len(cls().name) > 0
Step 2: Run tests to verify they fail
Run: python -m pytest tests/test_new_strategies.py -v
Expected: FAIL — strategy modules don't exist
Step 3: Implement all 6 strategies
Create each strategy file following the BaseStrategy.evaluate() interface. Each strategy:
- Has a
name: strclass attribute - Implements
async def evaluate(self, ticker, market, sentiment) -> TradeSignal | None - Returns
Nonewhen required data is missing
shared/strategies/value.py:
"""Value investing strategy — trade on fundamental data."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class ValueStrategy(BaseStrategy):
"""Generate signals from fundamental metrics (EPS, P/E, PEG, etc.)."""
name: str = "value"
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
f = market.fundamentals
if f is None:
return None
# Need at least PEG and P/E to form an opinion
if f.peg_ratio is None or f.pe_ratio is None:
return None
score = 0.0 # Positive = undervalued, negative = overvalued
# PEG ratio: < 1.0 is undervalued, > 3.0 is overvalued
if f.peg_ratio < 1.0:
score += (1.0 - f.peg_ratio) # max 1.0
elif f.peg_ratio > 3.0:
score -= min((f.peg_ratio - 3.0) / 3.0, 1.0)
# P/E ratio: < 15 is cheap, > 40 is expensive
if f.pe_ratio < 15:
score += (15 - f.pe_ratio) / 15
elif f.pe_ratio > 40:
score -= min((f.pe_ratio - 40) / 40, 1.0)
# EPS: positive is good, negative is bad
if f.eps_ttm is not None:
if f.eps_ttm > 0:
score += 0.2
elif f.eps_ttm < 0:
score -= 0.3
# Revenue growth: positive growth is bullish
if f.revenue_growth_yoy is not None:
if f.revenue_growth_yoy > 0.1:
score += 0.2
elif f.revenue_growth_yoy < -0.1:
score -= 0.2
# Profit margin: healthy margin is bullish
if f.profit_margin is not None:
if f.profit_margin > 0.15:
score += 0.1
elif f.profit_margin < 0:
score -= 0.2
# Debt-to-equity: low is healthy
if f.debt_to_equity is not None:
if f.debt_to_equity > 3.0:
score -= 0.2
elif f.debt_to_equity < 0.5:
score += 0.1
# Determine direction and strength
if score > 0.3:
direction = SignalDirection.LONG
elif score < -0.3:
direction = SignalDirection.SHORT
else:
return None
strength = max(0.0, min(1.0, abs(score) / 2.0))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
shared/strategies/macd_crossover.py:
"""MACD crossover strategy — trade on MACD/signal line crossovers."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class MACDCrossoverStrategy(BaseStrategy):
"""Detect MACD/signal line crossovers for trade signals."""
name: str = "macd_crossover"
def __init__(self) -> None:
self._prev_macd: dict[str, float] = {}
self._prev_signal: dict[str, float] = {}
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
if market.macd is None or market.macd_signal is None:
return None
macd = market.macd
sig = market.macd_signal
prev_macd = self._prev_macd.get(ticker)
prev_signal = self._prev_signal.get(ticker)
# Store current state for next evaluation
self._prev_macd[ticker] = macd
self._prev_signal[ticker] = sig
# Need previous state to detect a crossover
if prev_macd is None or prev_signal is None:
return None
prev_diff = prev_macd - prev_signal
curr_diff = macd - sig
# Detect crossover: sign change
if prev_diff <= 0 and curr_diff > 0:
direction = SignalDirection.LONG
elif prev_diff >= 0 and curr_diff < 0:
direction = SignalDirection.SHORT
else:
return None
# Strength: magnitude of histogram normalized by ATR if available
histogram = abs(market.macd_histogram) if market.macd_histogram is not None else abs(curr_diff)
if market.atr and market.atr > 0:
raw_strength = histogram / market.atr
else:
raw_strength = min(histogram / 2.0, 1.0)
strength = max(0.0, min(1.0, raw_strength))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
shared/strategies/bollinger_breakout.py:
"""Bollinger Bands breakout strategy."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class BollingerBreakoutStrategy(BaseStrategy):
"""Generate signals from Bollinger Band breakouts."""
name: str = "bollinger_breakout"
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
if market.bollinger_upper is None or market.bollinger_lower is None or market.bollinger_mid is None:
return None
price = market.current_price
upper = market.bollinger_upper
lower = market.bollinger_lower
band_width = upper - lower
if band_width <= 0:
return None
direction: SignalDirection | None = None
if price > upper:
# Breakout above upper band — check for volume confirmation
avg_vol = _avg_volume(market.bars)
if avg_vol and market.volume > avg_vol * 1.5:
direction = SignalDirection.LONG
else:
# Above band but no volume = potential failed breakout, skip
return None
elif price < lower:
# Below lower band — mean reversion bounce expected
direction = SignalDirection.LONG
else:
return None
# Strength: how far price is from the band, relative to band width
if direction == SignalDirection.LONG and price > upper:
raw_strength = (price - upper) / band_width
elif price < lower:
raw_strength = (lower - price) / band_width
else:
raw_strength = 0.3
strength = max(0.0, min(1.0, raw_strength))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
def _avg_volume(bars: list[dict]) -> float | None:
"""Compute average volume from bar dicts."""
if not bars:
return None
volumes = [b.get("volume", 0) for b in bars if "volume" in b]
if not volumes:
return None
return sum(volumes) / len(volumes)
shared/strategies/vwap.py:
"""VWAP strategy — trade on price crossing VWAP."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class VWAPStrategy(BaseStrategy):
"""Generate signals when price crosses VWAP with volume confirmation."""
name: str = "vwap"
def __init__(self) -> None:
self._prev_price: dict[str, float] = {}
self._prev_vwap: dict[str, float] = {}
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
if market.vwap is None:
return None
price = market.current_price
vwap = market.vwap
prev_price = self._prev_price.get(ticker)
prev_vwap = self._prev_vwap.get(ticker)
self._prev_price[ticker] = price
self._prev_vwap[ticker] = vwap
if prev_price is None or prev_vwap is None:
return None
prev_diff = prev_price - prev_vwap
curr_diff = price - vwap
# Detect crossover
if prev_diff <= 0 and curr_diff > 0:
direction = SignalDirection.LONG
elif prev_diff >= 0 and curr_diff < 0:
direction = SignalDirection.SHORT
else:
return None
# Strength: distance from VWAP as % of price, amplified by relative volume
distance_pct = abs(curr_diff) / price if price > 0 else 0
vol_ratio = 1.0
avg_vol = _avg_volume(market.bars)
if avg_vol and avg_vol > 0:
vol_ratio = min(market.volume / avg_vol, 3.0) / 3.0
raw_strength = distance_pct * 20 * vol_ratio # Scale up since % distance is small
strength = max(0.0, min(1.0, raw_strength))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
def _avg_volume(bars: list[dict]) -> float | None:
if not bars:
return None
volumes = [b.get("volume", 0) for b in bars if "volume" in b]
return sum(volumes) / len(volumes) if volumes else None
shared/strategies/liquidity.py:
"""Liquidity strategy — trade on volume patterns."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class LiquidityStrategy(BaseStrategy):
"""Analyze volume-based liquidity to confirm directional moves."""
name: str = "liquidity"
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
if not market.bars or len(market.bars) < 5:
return None
# Compute average volume from bars
volumes = [b.get("volume", 0) for b in market.bars if "volume" in b]
if not volumes:
return None
avg_vol = sum(volumes) / len(volumes)
if avg_vol <= 0:
return None
relative_volume = market.volume / avg_vol
# Thin liquidity — unreliable, skip
if relative_volume < 1.0:
return None
# Determine price direction from bars
closes = [b.get("close", 0) for b in market.bars if "close" in b]
if len(closes) < 2:
return None
# Compare current price to recent average close
recent_avg = sum(closes[-5:]) / min(len(closes), 5)
price_change = (market.current_price - recent_avg) / recent_avg if recent_avg > 0 else 0
if relative_volume >= 2.0 and price_change > 0.001:
direction = SignalDirection.LONG
elif relative_volume >= 2.0 and price_change < -0.001:
direction = SignalDirection.SHORT
elif price_change > 0.001 and relative_volume < 0.7:
# Price rising on declining volume — weak rally
direction = SignalDirection.SHORT
else:
return None
# Strength based on relative volume magnitude
raw_strength = min(relative_volume / 4.0, 1.0)
strength = max(0.0, min(1.0, raw_strength))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
shared/strategies/ma_stack.py:
"""MA Stack strategy — read the full 5-MA stack for trend alignment."""
from datetime import datetime, timezone
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy
class MAStackStrategy(BaseStrategy):
"""Assess trend strength from the alignment of 5 moving averages."""
name: str = "ma_stack"
def __init__(self) -> None:
self._prev_sma50: dict[str, float] = {}
self._prev_sma200: dict[str, float] = {}
async def evaluate(
self,
ticker: str,
market: MarketSnapshot,
sentiment: SentimentContext | None = None,
) -> TradeSignal | None:
# Need all MAs
if any(v is None for v in [market.ema_9, market.ema_21, market.sma_50, market.sma_200]):
return None
price = market.current_price
ema_9 = market.ema_9
ema_21 = market.ema_21
sma_50 = market.sma_50
sma_200 = market.sma_200
# Count bull alignment: each pair in order scores a point
values = [price, ema_9, ema_21, sma_50, sma_200]
bull_score = sum(1 for i in range(len(values) - 1) if values[i] > values[i + 1])
bear_score = sum(1 for i in range(len(values) - 1) if values[i] < values[i + 1])
# Golden/death cross detection
prev_50 = self._prev_sma50.get(ticker)
prev_200 = self._prev_sma200.get(ticker)
self._prev_sma50[ticker] = sma_50
self._prev_sma200[ticker] = sma_200
cross_bonus = 0.0
if prev_50 is not None and prev_200 is not None:
if prev_50 <= prev_200 and sma_50 > sma_200:
cross_bonus = 0.2 # Golden cross
elif prev_50 >= prev_200 and sma_50 < sma_200:
cross_bonus = -0.2 # Death cross
if bull_score >= 3:
direction = SignalDirection.LONG
raw_strength = bull_score / 4.0 + max(cross_bonus, 0)
elif bear_score >= 3:
direction = SignalDirection.SHORT
raw_strength = bear_score / 4.0 + max(-cross_bonus, 0)
else:
# Tangled — no clear trend
return None
strength = max(0.0, min(1.0, raw_strength))
return TradeSignal(
ticker=ticker,
direction=direction,
strength=strength,
strategy_sources=[self.name],
timestamp=datetime.now(tz=timezone.utc),
)
Step 4: Update shared/strategies/__init__.py
Add imports for all 6 new strategies and update __all__.
Step 5: Run tests
Run: python -m pytest tests/test_new_strategies.py tests/test_strategies.py -v
Expected: All PASS
Step 6: Commit
git add shared/strategies/ tests/test_new_strategies.py
git commit -m "feat: add 6 new strategies (value, MACD, Bollinger, VWAP, liquidity, MA stack)"
Task 6: Wire everything into the signal generator
Files:
- Modify:
services/signal_generator/main.py - Modify:
services/signal_generator/config.py - Modify:
.env - Modify:
scripts/seed_strategies.py - Modify:
pyproject.toml
Step 1: Update signal generator config
In services/signal_generator/config.py, add:
alpha_vantage_api_key: str = ""
fmp_api_key: str = ""
fundamentals_cache_ttl_hours: int = 24
Step 2: Update .env
Add:
TRADING_ALPHA_VANTAGE_API_KEY=M0I3TWB6VKU0UF51
TRADING_FMP_API_KEY=34zqbQFeRxYvPtzp3Y5QLKPVPztkZyfK
TRADING_FUNDAMENTALS_CACHE_TTL_HOURS=24
TRADING_HISTORICAL_BARS=250
Step 3: Update services/signal_generator/main.py
- Import the 6 new strategies.
- Update
_DEFAULT_WEIGHTSto include all 9 strategies (equal weight ~0.111). - Add fundamentals initialization:
- Create
RotatingProviderwith all three providers - Wrap with
CachedFundamentalsProvider - Fetch fundamentals for watchlist tickers on startup
- Store in a
dict[str, FundamentalsSnapshot]
- Create
- In
_consume_scored_articles, inject fundamentals intoMarketSnapshotbefore calling ensemble:if fundamentals_cache: snapshot.fundamentals = fundamentals_cache.get(ticker) - Add a daily refresh background task for fundamentals.
Step 4: Update scripts/seed_strategies.py
Add 6 new strategies to DEFAULT_STRATEGIES with descriptions and equal weights (recalculate to ~0.111).
Step 5: Update pyproject.toml
Add yfinance to the trading optional dependency group:
trading = ["alpaca-py>=0.21", "pytz>=2024.1", "yfinance>=0.2"]
Add httpx to the trading group if not already available (it's in news group but signal-generator uses the trading group):
trading = ["alpaca-py>=0.21", "pytz>=2024.1", "yfinance>=0.2", "httpx>=0.27"]
Step 6: Run all tests
Run: python -m pytest tests/ -v -m "not integration" --timeout=30
Expected: All tests PASS (should be ~270+ tests now)
Step 7: Commit
git add services/signal_generator/ scripts/seed_strategies.py .env pyproject.toml
git commit -m "feat: wire 6 new strategies and fundamentals into signal generator"
Task 7: Update MarketDataManager max_bars default
Files:
- Modify:
services/signal_generator/market_data.py:16
Step 1: Update default max bars
Change _DEFAULT_MAX_BARS = 100 to _DEFAULT_MAX_BARS = 250.
Step 2: Run tests
Run: python -m pytest tests/test_indicators.py tests/ -v -m "not integration" --timeout=30
Expected: All PASS
Step 3: Commit
git add services/signal_generator/market_data.py
git commit -m "feat: increase default max bars to 250 for SMA-200 support"
Task 8: Run migration and seed new strategies
Step 1: Apply migration (if docker is running)
Run: docker compose exec api-gateway python -m alembic upgrade head
Step 2: Seed new strategies
Run: docker compose exec api-gateway python -m scripts.seed_strategies
Step 3: Rebuild and restart services
Run: docker compose build signal-generator && docker compose up -d signal-generator
Step 4: Verify
Check logs: docker compose logs signal-generator --tail 50
Expected: All 9 strategies loaded, fundamentals fetched for watchlist tickers.