trading/docs/plans/2026-02-23-strategies-fundamentals-plan.md
Viktor Barzin 5cb65e8e4f
docs: add implementation plan for extended strategies + fundamentals
8 tasks covering: MarketSnapshot schema changes, indicator computations,
fundamental data providers, DB caching, 6 new strategies, signal generator
wiring, and deployment steps.
2026-02-23 21:34:12 +00:00

74 KiB

Extended Strategies + Fundamental Data — Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add 6 new trading strategies (Value, MACD Crossover, Bollinger Breakout, VWAP, Liquidity, MA Stack) with a fundamental data pipeline (3 providers with rotation + DB caching) and 7 new technical indicators.

Architecture: New shared/fundamentals/ module provides fundamental data via a rotating provider abstraction (Alpha Vantage → FMP → Yahoo Finance). Technical indicators (MACD, Bollinger, VWAP, ATR, EMA-9, EMA-21, SMA-200) are computed in MarketDataManager from existing OHLCV bars. Strategies implement the existing BaseStrategy.evaluate() interface and plug into the WeightedEnsemble.

Tech Stack: Python 3.12, Pydantic v2, SQLAlchemy 2.0 async, httpx (REST calls), yfinance, pytest

Design doc: docs/plans/2026-02-23-strategies-fundamentals-design.md


Task 1: Add new technical indicator fields to MarketSnapshot

Files:

  • Modify: shared/schemas/trading.py:127-140
  • Test: tests/test_schemas.py (existing — new fields are optional, won't break)

Step 1: Add indicator fields to MarketSnapshot

In shared/schemas/trading.py, add after line 137 (rsi: float | None = None):

# Technical indicators — computed by MarketDataManager
ema_9: float | None = None
ema_21: float | None = None
sma_200: float | None = None
macd: float | None = None
macd_signal: float | None = None
macd_histogram: float | None = None
bollinger_upper: float | None = None
bollinger_mid: float | None = None
bollinger_lower: float | None = None
vwap: float | None = None
atr: float | None = None

Step 2: Add FundamentalsSnapshot schema

Add after SentimentContext in the same file:

class FundamentalsSnapshot(BaseModel):
    """Fundamental financial data for a single ticker — cached daily."""

    ticker: str
    eps_ttm: float | None = None
    pe_ratio: float | None = None
    peg_ratio: float | None = None
    revenue_growth_yoy: float | None = None
    profit_margin: float | None = None
    debt_to_equity: float | None = None
    market_cap: float | None = None
    fetched_at: datetime

    model_config = {"from_attributes": True}

Also add a fundamentals field to MarketSnapshot:

fundamentals: FundamentalsSnapshot | None = None

Step 3: Run existing tests to confirm nothing breaks

Run: python -m pytest tests/test_schemas.py -v Expected: All 49 tests PASS (new fields are optional with defaults)

Step 4: Commit

git add shared/schemas/trading.py
git commit -m "feat: add technical indicator and fundamentals fields to MarketSnapshot"

Task 2: Implement technical indicator computations in MarketDataManager

Files:

  • Modify: services/signal_generator/market_data.py
  • Test: tests/test_indicators.py (new)

Step 1: Write failing tests for EMA computation

Create tests/test_indicators.py:

"""Tests for technical indicator computations in MarketDataManager."""

import pytest
from datetime import datetime, timezone

from services.signal_generator.market_data import MarketDataManager
from shared.schemas.trading import OHLCVBar


def _bar(close: float, volume: float = 1000.0, high: float | None = None, low: float | None = None, open_: float | None = None) -> OHLCVBar:
    """Helper to create an OHLCVBar with sensible defaults."""
    return OHLCVBar(
        timestamp=datetime.now(timezone.utc),
        open=open_ if open_ is not None else close,
        high=high if high is not None else close + 1,
        low=low if low is not None else close - 1,
        close=close,
        volume=volume,
    )


def _add_bars(mgr: MarketDataManager, ticker: str, closes: list[float], volume: float = 1000.0) -> None:
    """Add multiple bars with given close prices."""
    for c in closes:
        mgr.add_bar(ticker, _bar(c, volume=volume))


class TestEMA:
    """Tests for exponential moving average computation."""

    def test_ema_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 5)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.ema_9 is None  # Need at least 9 bars

    def test_ema_9_with_exact_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 9)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.ema_9 is not None
        assert snap.ema_9 == pytest.approx(100.0, abs=0.01)

    def test_ema_responds_to_recent_prices(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        # 20 bars at 100, then 1 bar at 110
        _add_bars(mgr, "AAPL", [100.0] * 20 + [110.0])
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.ema_9 is not None
        # EMA-9 should be > 100 because the most recent price is 110
        assert snap.ema_9 > 100.0
        assert snap.ema_9 < 110.0

    def test_ema_21_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 15)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.ema_21 is None

    def test_ema_21_computed_with_enough_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 25)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.ema_21 is not None
        assert snap.ema_21 == pytest.approx(100.0, abs=0.01)


class TestSMA200:
    """Tests for SMA-200 computation."""

    def test_sma_200_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 100)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.sma_200 is None

    def test_sma_200_computed_with_enough_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 200)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.sma_200 is not None
        assert snap.sma_200 == pytest.approx(100.0, abs=0.01)


class TestMACD:
    """Tests for MACD computation."""

    def test_macd_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 20)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.macd is None

    def test_macd_computed_with_enough_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        # Need 26 bars for EMA-26, plus 9 more for the signal line = 35 minimum
        _add_bars(mgr, "AAPL", [100.0] * 40)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.macd is not None
        assert snap.macd_signal is not None
        assert snap.macd_histogram is not None
        # With constant prices, MACD should be ~0
        assert snap.macd == pytest.approx(0.0, abs=0.1)

    def test_macd_positive_in_uptrend(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        # Rising prices: EMA-12 > EMA-26 → positive MACD
        prices = [100.0 + i * 0.5 for i in range(50)]
        _add_bars(mgr, "AAPL", prices)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.macd is not None
        assert snap.macd > 0


class TestBollingerBands:
    """Tests for Bollinger Bands computation."""

    def test_bollinger_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 10)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.bollinger_upper is None

    def test_bollinger_computed_with_enough_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 25)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.bollinger_upper is not None
        assert snap.bollinger_mid is not None
        assert snap.bollinger_lower is not None
        # With constant prices, bands should be tight around the price
        assert snap.bollinger_mid == pytest.approx(100.0, abs=0.01)
        assert snap.bollinger_upper >= snap.bollinger_mid
        assert snap.bollinger_lower <= snap.bollinger_mid

    def test_bollinger_width_increases_with_volatility(self) -> None:
        mgr_stable = MarketDataManager(max_bars=300)
        _add_bars(mgr_stable, "AAPL", [100.0] * 25)
        snap_stable = mgr_stable.get_snapshot("AAPL")

        mgr_volatile = MarketDataManager(max_bars=300)
        # Alternating prices = high volatility
        _add_bars(mgr_volatile, "AAPL", [90.0 + (i % 2) * 20 for i in range(25)])
        snap_volatile = mgr_volatile.get_snapshot("AAPL")

        assert snap_stable is not None and snap_volatile is not None
        width_stable = snap_stable.bollinger_upper - snap_stable.bollinger_lower
        width_volatile = snap_volatile.bollinger_upper - snap_volatile.bollinger_lower
        assert width_volatile > width_stable


class TestVWAP:
    """Tests for VWAP computation."""

    def test_vwap_computed(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 5, volume=1000.0)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.vwap is not None
        # With constant prices and volume, VWAP = typical price
        # typical = (high + low + close) / 3 = (101 + 99 + 100) / 3 = 100.0
        assert snap.vwap == pytest.approx(100.0, abs=0.01)

    def test_vwap_weighted_by_volume(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        # Bar 1: close=100, volume=1000
        mgr.add_bar("AAPL", _bar(100.0, volume=1000.0))
        # Bar 2: close=200, volume=3000 (should pull VWAP toward 200)
        mgr.add_bar("AAPL", _bar(200.0, volume=3000.0))
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.vwap is not None
        assert snap.vwap > 150.0  # Weighted toward 200


class TestATR:
    """Tests for ATR computation."""

    def test_atr_returns_none_insufficient_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        _add_bars(mgr, "AAPL", [100.0] * 5)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.atr is None

    def test_atr_computed_with_enough_data(self) -> None:
        mgr = MarketDataManager(max_bars=300)
        # 15 bars needed (14-period ATR + 1 for prev close)
        _add_bars(mgr, "AAPL", [100.0] * 15)
        snap = mgr.get_snapshot("AAPL")
        assert snap is not None
        assert snap.atr is not None
        assert snap.atr >= 0

    def test_atr_increases_with_volatility(self) -> None:
        mgr_stable = MarketDataManager(max_bars=300)
        for i in range(20):
            mgr_stable.add_bar("AAPL", OHLCVBar(
                timestamp=datetime.now(timezone.utc),
                open=100.0, high=101.0, low=99.0, close=100.0, volume=1000.0,
            ))
        snap_stable = mgr_stable.get_snapshot("AAPL")

        mgr_volatile = MarketDataManager(max_bars=300)
        for i in range(20):
            mgr_volatile.add_bar("AAPL", OHLCVBar(
                timestamp=datetime.now(timezone.utc),
                open=100.0, high=115.0, low=85.0, close=100.0, volume=1000.0,
            ))
        snap_volatile = mgr_volatile.get_snapshot("AAPL")

        assert snap_stable is not None and snap_volatile is not None
        assert snap_volatile.atr > snap_stable.atr

Step 2: Run tests to verify they fail

Run: python -m pytest tests/test_indicators.py -v Expected: FAIL — ema_9, macd, etc. don't exist on MarketSnapshot yet (Task 1 must be done first) and computations aren't implemented.

Step 3: Implement indicator computations in MarketDataManager

Modify services/signal_generator/market_data.py:

  1. Add _compute_ema static method:
@staticmethod
def _compute_ema(closes: list[float], period: int) -> float | None:
    """Compute exponential moving average over *closes* with given *period*."""
    if len(closes) < period:
        return None
    multiplier = 2.0 / (period + 1)
    ema = sum(closes[:period]) / period  # Seed with SMA
    for price in closes[period:]:
        ema = (price - ema) * multiplier + ema
    return round(ema, 6)
  1. Add _compute_macd static method:
@staticmethod
def _compute_macd(closes: list[float]) -> tuple[float, float, float] | None:
    """Compute MACD (12,26,9). Returns (macd_line, signal_line, histogram) or None."""
    if len(closes) < 35:  # 26 for slow EMA + 9 for signal
        return None
    mul_12 = 2.0 / 13
    mul_26 = 2.0 / 27
    mul_9 = 2.0 / 10

    ema_12 = sum(closes[:12]) / 12
    ema_26 = sum(closes[:26]) / 26

    macd_values: list[float] = []
    for i in range(len(closes)):
        if i < 12:
            continue
        if i == 12:
            ema_12 = sum(closes[:12]) / 12
        else:
            ema_12 = (closes[i] - ema_12) * mul_12 + ema_12
        if i < 26:
            continue
        if i == 26:
            ema_26 = sum(closes[:26]) / 26
            # Recompute ema_12 from scratch for accuracy
            ema_12 = sum(closes[:12]) / 12
            for j in range(12, i + 1):
                ema_12 = (closes[j] - ema_12) * mul_12 + ema_12
        else:
            ema_26 = (closes[i] - ema_26) * mul_26 + ema_26
        macd_values.append(ema_12 - ema_26)

    if len(macd_values) < 9:
        return None

    signal = sum(macd_values[:9]) / 9
    for val in macd_values[9:]:
        signal = (val - signal) * mul_9 + signal

    macd_line = macd_values[-1]
    histogram = macd_line - signal
    return round(macd_line, 6), round(signal, 6), round(histogram, 6)
  1. Add _compute_bollinger static method:
@staticmethod
def _compute_bollinger(closes: list[float], period: int = 20, num_std: float = 2.0) -> tuple[float, float, float] | None:
    """Compute Bollinger Bands. Returns (upper, mid, lower) or None."""
    if len(closes) < period:
        return None
    window = closes[-period:]
    mid = sum(window) / period
    variance = sum((x - mid) ** 2 for x in window) / period
    std = variance ** 0.5
    return round(mid + num_std * std, 6), round(mid, 6), round(mid - num_std * std, 6)
  1. Add _compute_vwap static method:
@staticmethod
def _compute_vwap(bars: list[OHLCVBar]) -> float | None:
    """Compute VWAP from bars. Returns None if no bars."""
    if not bars:
        return None
    cum_tp_vol = 0.0
    cum_vol = 0.0
    for bar in bars:
        typical_price = (bar.high + bar.low + bar.close) / 3.0
        cum_tp_vol += typical_price * bar.volume
        cum_vol += bar.volume
    if cum_vol == 0:
        return None
    return round(cum_tp_vol / cum_vol, 6)
  1. Add _compute_atr static method:
@staticmethod
def _compute_atr(bars: list[OHLCVBar], period: int = 14) -> float | None:
    """Compute Average True Range over *period*. Needs period+1 bars."""
    if len(bars) < period + 1:
        return None
    relevant = bars[-(period + 1):]
    true_ranges: list[float] = []
    for i in range(1, len(relevant)):
        high_low = relevant[i].high - relevant[i].low
        high_prev_close = abs(relevant[i].high - relevant[i - 1].close)
        low_prev_close = abs(relevant[i].low - relevant[i - 1].close)
        true_ranges.append(max(high_low, high_prev_close, low_prev_close))
    return round(sum(true_ranges) / len(true_ranges), 6)
  1. Update get_snapshot() to compute and include all new indicators:
def get_snapshot(self, ticker: str) -> MarketSnapshot | None:
    bars = self._bars.get(ticker)
    if not bars:
        return None

    latest = bars[-1]
    closes = [b.close for b in bars]
    bar_list = list(bars)

    # MACD
    macd_result = self._compute_macd(closes)
    macd_val = macd_signal = macd_hist = None
    if macd_result:
        macd_val, macd_signal, macd_hist = macd_result

    # Bollinger
    boll_result = self._compute_bollinger(closes)
    boll_upper = boll_mid = boll_lower = None
    if boll_result:
        boll_upper, boll_mid, boll_lower = boll_result

    return MarketSnapshot(
        ticker=ticker,
        current_price=latest.close,
        open=latest.open,
        high=latest.high,
        low=latest.low,
        close=latest.close,
        volume=latest.volume,
        sma_20=self._compute_sma(closes, 20),
        sma_50=self._compute_sma(closes, 50),
        sma_200=self._compute_sma(closes, 200),
        rsi=self._compute_rsi(closes, _RSI_PERIOD),
        ema_9=self._compute_ema(closes, 9),
        ema_21=self._compute_ema(closes, 21),
        macd=macd_val,
        macd_signal=macd_signal,
        macd_histogram=macd_hist,
        bollinger_upper=boll_upper,
        bollinger_mid=boll_mid,
        bollinger_lower=boll_lower,
        vwap=self._compute_vwap(bar_list),
        atr=self._compute_atr(bar_list),
        bars=[b.model_dump(mode="json") for b in bars],
    )

Step 4: Run tests to verify they pass

Run: python -m pytest tests/test_indicators.py -v Expected: All PASS

Step 5: Run existing tests to confirm no regressions

Run: python -m pytest tests/ -v -m "not integration" --timeout=30 Expected: All 246+ tests PASS

Step 6: Commit

git add services/signal_generator/market_data.py tests/test_indicators.py
git commit -m "feat: add MACD, Bollinger, VWAP, ATR, EMA, SMA-200 indicator computations"

Task 3: Implement fundamental data providers

Files:

  • Create: shared/fundamentals/__init__.py
  • Create: shared/fundamentals/base.py
  • Create: shared/fundamentals/alpha_vantage.py
  • Create: shared/fundamentals/fmp.py
  • Create: shared/fundamentals/yahoo.py
  • Create: shared/fundamentals/rotating.py
  • Test: tests/test_fundamentals.py (new)

Step 1: Write failing tests

Create tests/test_fundamentals.py:

"""Tests for fundamental data providers."""

import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch, MagicMock

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.fundamentals.alpha_vantage import AlphaVantageProvider
from shared.fundamentals.fmp import FMPProvider
from shared.fundamentals.yahoo import YahooFinanceProvider
from shared.fundamentals.rotating import RotatingProvider


class TestFundamentalsSnapshot:
    """Tests for the FundamentalsSnapshot model."""

    def test_snapshot_all_fields(self) -> None:
        snap = FundamentalsSnapshot(
            ticker="AAPL",
            eps_ttm=6.5,
            pe_ratio=28.0,
            peg_ratio=1.5,
            revenue_growth_yoy=0.08,
            profit_margin=0.25,
            debt_to_equity=1.8,
            market_cap=3_000_000_000_000.0,
            fetched_at=datetime.now(timezone.utc),
        )
        assert snap.ticker == "AAPL"
        assert snap.eps_ttm == 6.5

    def test_snapshot_optional_fields(self) -> None:
        snap = FundamentalsSnapshot(
            ticker="AAPL",
            fetched_at=datetime.now(timezone.utc),
        )
        assert snap.eps_ttm is None
        assert snap.pe_ratio is None


class TestAlphaVantageProvider:
    """Tests for Alpha Vantage provider."""

    @pytest.mark.asyncio
    async def test_fetch_parses_response(self) -> None:
        mock_response = AsyncMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "Symbol": "AAPL",
            "EPS": "6.50",
            "PERatio": "28.00",
            "PEGRatio": "1.50",
            "QuarterlyRevenueGrowthYOY": "0.08",
            "ProfitMargin": "0.25",
            "DebtToEquity": "180",  # Alpha Vantage returns as percentage
            "MarketCapitalization": "3000000000000",
        }

        provider = AlphaVantageProvider(api_key="test-key")
        with patch.object(provider, "_client") as mock_client:
            mock_client.get = AsyncMock(return_value=mock_response)
            result = await provider.fetch("AAPL")

        assert result is not None
        assert result.ticker == "AAPL"
        assert result.eps_ttm == 6.5
        assert result.pe_ratio == 28.0

    @pytest.mark.asyncio
    async def test_fetch_handles_rate_limit(self) -> None:
        mock_response = AsyncMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "Note": "Thank you for using Alpha Vantage! Our standard API rate limit is 25 requests per day."
        }

        provider = AlphaVantageProvider(api_key="test-key")
        with patch.object(provider, "_client") as mock_client:
            mock_client.get = AsyncMock(return_value=mock_response)
            result = await provider.fetch("AAPL")

        assert result is None


class TestFMPProvider:
    """Tests for Financial Modeling Prep provider."""

    @pytest.mark.asyncio
    async def test_fetch_parses_response(self) -> None:
        mock_response = AsyncMock()
        mock_response.status_code = 200
        mock_response.json.return_value = [
            {
                "symbol": "AAPL",
                "eps": 6.5,
                "pe": 28.0,
                "pegRatio": 1.5,
                "revenueGrowth": 0.08,
                "netProfitMargin": 0.25,
                "debtToEquity": 1.8,
                "marketCap": 3_000_000_000_000,
            }
        ]

        provider = FMPProvider(api_key="test-key")
        with patch.object(provider, "_client") as mock_client:
            mock_client.get = AsyncMock(return_value=mock_response)
            result = await provider.fetch("AAPL")

        assert result is not None
        assert result.ticker == "AAPL"
        assert result.peg_ratio == 1.5

    @pytest.mark.asyncio
    async def test_fetch_handles_empty_response(self) -> None:
        mock_response = AsyncMock()
        mock_response.status_code = 200
        mock_response.json.return_value = []

        provider = FMPProvider(api_key="test-key")
        with patch.object(provider, "_client") as mock_client:
            mock_client.get = AsyncMock(return_value=mock_response)
            result = await provider.fetch("INVALID")

        assert result is None


class TestYahooFinanceProvider:
    """Tests for Yahoo Finance provider."""

    @pytest.mark.asyncio
    async def test_fetch_parses_ticker_info(self) -> None:
        mock_ticker = MagicMock()
        mock_ticker.info = {
            "trailingEps": 6.5,
            "trailingPE": 28.0,
            "pegRatio": 1.5,
            "revenueGrowth": 0.08,
            "profitMargins": 0.25,
            "debtToEquity": 180.0,
            "marketCap": 3_000_000_000_000,
        }

        provider = YahooFinanceProvider()
        with patch("shared.fundamentals.yahoo.yfinance.Ticker", return_value=mock_ticker):
            result = await provider.fetch("AAPL")

        assert result is not None
        assert result.ticker == "AAPL"
        assert result.eps_ttm == 6.5
        assert result.debt_to_equity == pytest.approx(1.8, abs=0.01)


class TestRotatingProvider:
    """Tests for the rotating provider with failover."""

    @pytest.mark.asyncio
    async def test_uses_first_provider_on_success(self) -> None:
        snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc), eps_ttm=6.5)
        p1 = AsyncMock(spec=FundamentalsProvider)
        p1.fetch = AsyncMock(return_value=snap)
        p2 = AsyncMock(spec=FundamentalsProvider)

        rotating = RotatingProvider([p1, p2])
        result = await rotating.fetch("AAPL")

        assert result is not None
        assert result.eps_ttm == 6.5
        p1.fetch.assert_awaited_once_with("AAPL")
        p2.fetch.assert_not_awaited()

    @pytest.mark.asyncio
    async def test_falls_back_on_failure(self) -> None:
        snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc), eps_ttm=7.0)
        p1 = AsyncMock(spec=FundamentalsProvider)
        p1.fetch = AsyncMock(return_value=None)  # Rate limited
        p2 = AsyncMock(spec=FundamentalsProvider)
        p2.fetch = AsyncMock(return_value=snap)

        rotating = RotatingProvider([p1, p2])
        result = await rotating.fetch("AAPL")

        assert result is not None
        assert result.eps_ttm == 7.0

    @pytest.mark.asyncio
    async def test_returns_none_when_all_fail(self) -> None:
        p1 = AsyncMock(spec=FundamentalsProvider)
        p1.fetch = AsyncMock(return_value=None)
        p2 = AsyncMock(spec=FundamentalsProvider)
        p2.fetch = AsyncMock(return_value=None)

        rotating = RotatingProvider([p1, p2])
        result = await rotating.fetch("AAPL")
        assert result is None

    @pytest.mark.asyncio
    async def test_handles_exception_and_falls_back(self) -> None:
        snap = FundamentalsSnapshot(ticker="AAPL", fetched_at=datetime.now(timezone.utc))
        p1 = AsyncMock(spec=FundamentalsProvider)
        p1.fetch = AsyncMock(side_effect=Exception("API error"))
        p2 = AsyncMock(spec=FundamentalsProvider)
        p2.fetch = AsyncMock(return_value=snap)

        rotating = RotatingProvider([p1, p2])
        result = await rotating.fetch("AAPL")
        assert result is not None

Step 2: Run tests to verify they fail

Run: python -m pytest tests/test_fundamentals.py -v Expected: FAIL — modules don't exist yet

Step 3: Create shared/fundamentals/__init__.py

"""Fundamental data providers for stock financial metrics."""

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.fundamentals.rotating import RotatingProvider

__all__ = ["FundamentalsProvider", "FundamentalsSnapshot", "RotatingProvider"]

Step 4: Create shared/fundamentals/base.py

"""Abstract base class for fundamental data providers."""

from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import datetime

from pydantic import BaseModel


class FundamentalsSnapshot(BaseModel):
    """Fundamental financial data for a single ticker."""

    ticker: str
    eps_ttm: float | None = None
    pe_ratio: float | None = None
    peg_ratio: float | None = None
    revenue_growth_yoy: float | None = None
    profit_margin: float | None = None
    debt_to_equity: float | None = None
    market_cap: float | None = None
    fetched_at: datetime

    model_config = {"from_attributes": True}


class FundamentalsProvider(ABC):
    """Abstract base class for fundamental data providers."""

    @abstractmethod
    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        """Fetch fundamental data for *ticker*. Returns None on failure/rate limit."""
        ...

Step 5: Create shared/fundamentals/alpha_vantage.py

"""Alpha Vantage fundamental data provider."""

from __future__ import annotations

import logging
from datetime import datetime, timezone

import httpx

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot

logger = logging.getLogger(__name__)

_BASE_URL = "https://www.alphavantage.co/query"


class AlphaVantageProvider(FundamentalsProvider):
    """Fetch fundamentals from the Alpha Vantage OVERVIEW endpoint."""

    def __init__(self, api_key: str) -> None:
        self._api_key = api_key
        self._client = httpx.AsyncClient(timeout=30.0)

    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        try:
            resp = await self._client.get(
                _BASE_URL,
                params={"function": "OVERVIEW", "symbol": ticker, "apikey": self._api_key},
            )
            data = resp.json()

            # Rate limit detection
            if "Note" in data or "Information" in data:
                logger.warning("Alpha Vantage rate limit hit for %s", ticker)
                return None

            if "Symbol" not in data:
                logger.warning("Alpha Vantage returned no data for %s", ticker)
                return None

            return FundamentalsSnapshot(
                ticker=ticker,
                eps_ttm=_safe_float(data.get("EPS")),
                pe_ratio=_safe_float(data.get("PERatio")),
                peg_ratio=_safe_float(data.get("PEGRatio")),
                revenue_growth_yoy=_safe_float(data.get("QuarterlyRevenueGrowthYOY")),
                profit_margin=_safe_float(data.get("ProfitMargin")),
                debt_to_equity=_safe_float_div100(data.get("DebtToEquity")),
                market_cap=_safe_float(data.get("MarketCapitalization")),
                fetched_at=datetime.now(timezone.utc),
            )
        except Exception:
            logger.exception("Alpha Vantage fetch failed for %s", ticker)
            return None


def _safe_float(val: str | None) -> float | None:
    if val is None or val in ("None", "-", ""):
        return None
    try:
        return float(val)
    except (ValueError, TypeError):
        return None


def _safe_float_div100(val: str | None) -> float | None:
    """Alpha Vantage returns debt-to-equity as a percentage (e.g. 180 = 1.8)."""
    f = _safe_float(val)
    return f / 100.0 if f is not None else None

Step 6: Create shared/fundamentals/fmp.py

"""Financial Modeling Prep (FMP) fundamental data provider."""

from __future__ import annotations

import logging
from datetime import datetime, timezone

import httpx

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot

logger = logging.getLogger(__name__)

_BASE_URL = "https://financialmodelingprep.com/api/v3"


class FMPProvider(FundamentalsProvider):
    """Fetch fundamentals from the FMP key-metrics endpoint."""

    def __init__(self, api_key: str) -> None:
        self._api_key = api_key
        self._client = httpx.AsyncClient(timeout=30.0)

    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        try:
            resp = await self._client.get(
                f"{_BASE_URL}/key-metrics-ttm/{ticker}",
                params={"apikey": self._api_key},
            )
            data = resp.json()

            if not data or not isinstance(data, list) or len(data) == 0:
                logger.warning("FMP returned no data for %s", ticker)
                return None

            item = data[0]
            return FundamentalsSnapshot(
                ticker=ticker,
                eps_ttm=item.get("eps") or item.get("netIncomePerShareTTM"),
                pe_ratio=item.get("pe") or item.get("peRatioTTM"),
                peg_ratio=item.get("pegRatio") or item.get("pegRatioTTM"),
                revenue_growth_yoy=item.get("revenueGrowth"),
                profit_margin=item.get("netProfitMargin") or item.get("netProfitMarginTTM"),
                debt_to_equity=item.get("debtToEquity") or item.get("debtToEquityTTM"),
                market_cap=item.get("marketCap") or item.get("marketCapTTM"),
                fetched_at=datetime.now(timezone.utc),
            )
        except Exception:
            logger.exception("FMP fetch failed for %s", ticker)
            return None

Step 7: Create shared/fundamentals/yahoo.py

"""Yahoo Finance fundamental data provider (via yfinance library)."""

from __future__ import annotations

import asyncio
import logging
from datetime import datetime, timezone

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot

logger = logging.getLogger(__name__)


class YahooFinanceProvider(FundamentalsProvider):
    """Fetch fundamentals via yfinance (runs in thread pool to avoid blocking)."""

    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        try:
            import yfinance

            loop = asyncio.get_running_loop()
            info = await loop.run_in_executor(None, _get_ticker_info, ticker)

            if not info:
                logger.warning("yfinance returned no info for %s", ticker)
                return None

            d2e = info.get("debtToEquity")
            # yfinance returns debt-to-equity as percentage (e.g. 180.0 = 1.8)
            if d2e is not None:
                d2e = d2e / 100.0

            return FundamentalsSnapshot(
                ticker=ticker,
                eps_ttm=info.get("trailingEps"),
                pe_ratio=info.get("trailingPE"),
                peg_ratio=info.get("pegRatio"),
                revenue_growth_yoy=info.get("revenueGrowth"),
                profit_margin=info.get("profitMargins"),
                debt_to_equity=d2e,
                market_cap=info.get("marketCap"),
                fetched_at=datetime.now(timezone.utc),
            )
        except Exception:
            logger.exception("yfinance fetch failed for %s", ticker)
            return None


def _get_ticker_info(ticker: str) -> dict:
    """Synchronous helper — called via run_in_executor."""
    import yfinance

    t = yfinance.Ticker(ticker)
    return t.info

Step 8: Create shared/fundamentals/rotating.py

"""Rotating provider that tries multiple providers with failover."""

from __future__ import annotations

import logging

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot

logger = logging.getLogger(__name__)


class RotatingProvider(FundamentalsProvider):
    """Try each provider in order; return the first successful result."""

    def __init__(self, providers: list[FundamentalsProvider]) -> None:
        self._providers = providers

    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        for provider in self._providers:
            try:
                result = await provider.fetch(ticker)
                if result is not None:
                    logger.debug(
                        "Fetched fundamentals for %s via %s",
                        ticker,
                        type(provider).__name__,
                    )
                    return result
                logger.debug(
                    "%s returned None for %s, trying next",
                    type(provider).__name__,
                    ticker,
                )
            except Exception:
                logger.exception(
                    "%s failed for %s, trying next",
                    type(provider).__name__,
                    ticker,
                )
        logger.warning("All providers failed for %s", ticker)
        return None

Step 9: Run tests

Run: python -m pytest tests/test_fundamentals.py -v Expected: All PASS

Step 10: Commit

git add shared/fundamentals/ tests/test_fundamentals.py
git commit -m "feat: add fundamental data providers (Alpha Vantage, FMP, Yahoo Finance) with rotation"

Task 4: Add fundamentals DB table and caching

Files:

  • Create: shared/models/fundamentals.py
  • Modify: shared/models/__init__.py
  • Create: alembic/versions/xxx_add_fundamentals_table.py (via alembic)
  • Create: shared/fundamentals/cache.py
  • Test: tests/test_fundamentals.py (add cache tests)

Step 1: Create shared/models/fundamentals.py

"""Fundamentals database model for caching fundamental data."""

import uuid

from sqlalchemy import Float, String, DateTime
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column

from shared.models.base import Base, TimestampMixin


class Fundamentals(TimestampMixin, Base):
    __tablename__ = "fundamentals"

    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
    )
    ticker: Mapped[str] = mapped_column(String(20), unique=True, nullable=False, index=True)
    eps_ttm: Mapped[float | None] = mapped_column(Float, nullable=True)
    pe_ratio: Mapped[float | None] = mapped_column(Float, nullable=True)
    peg_ratio: Mapped[float | None] = mapped_column(Float, nullable=True)
    revenue_growth_yoy: Mapped[float | None] = mapped_column(Float, nullable=True)
    profit_margin: Mapped[float | None] = mapped_column(Float, nullable=True)
    debt_to_equity: Mapped[float | None] = mapped_column(Float, nullable=True)
    market_cap: Mapped[float | None] = mapped_column(Float, nullable=True)
    fetched_at: Mapped[str] = mapped_column(DateTime(timezone=True), nullable=False)

Step 2: Update shared/models/__init__.py

Add from shared.models.fundamentals import Fundamentals and add "Fundamentals" to __all__.

Step 3: Generate Alembic migration

Run: python -m alembic revision --autogenerate -m "add fundamentals table" Then verify the generated migration looks correct.

Step 4: Create shared/fundamentals/cache.py

"""DB-backed cache for fundamental data."""

from __future__ import annotations

import logging
from datetime import datetime, timezone, timedelta

from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker

from shared.fundamentals.base import FundamentalsProvider, FundamentalsSnapshot
from shared.models.fundamentals import Fundamentals

logger = logging.getLogger(__name__)


class CachedFundamentalsProvider:
    """Wraps a FundamentalsProvider with DB-backed caching.

    On fetch: checks DB first. If data is fresh (within TTL), returns cached.
    Otherwise fetches from the underlying provider and updates the DB.
    """

    def __init__(
        self,
        provider: FundamentalsProvider,
        session_factory: async_sessionmaker,
        cache_ttl_hours: int = 24,
    ) -> None:
        self._provider = provider
        self._session_factory = session_factory
        self._cache_ttl = timedelta(hours=cache_ttl_hours)

    async def fetch(self, ticker: str) -> FundamentalsSnapshot | None:
        # Try cache first
        cached = await self._load_from_db(ticker)
        if cached is not None:
            age = datetime.now(timezone.utc) - cached.fetched_at.replace(tzinfo=timezone.utc)
            if age < self._cache_ttl:
                logger.debug("Cache hit for %s (age=%s)", ticker, age)
                return cached
            logger.debug("Cache stale for %s (age=%s), refreshing", ticker, age)

        # Fetch from provider
        result = await self._provider.fetch(ticker)
        if result is not None:
            await self._save_to_db(result)
        return result

    async def _load_from_db(self, ticker: str) -> FundamentalsSnapshot | None:
        try:
            async with self._session_factory() as session:
                stmt = select(Fundamentals).where(Fundamentals.ticker == ticker)
                row = (await session.execute(stmt)).scalar_one_or_none()
                if row is None:
                    return None
                return FundamentalsSnapshot(
                    ticker=row.ticker,
                    eps_ttm=row.eps_ttm,
                    pe_ratio=row.pe_ratio,
                    peg_ratio=row.peg_ratio,
                    revenue_growth_yoy=row.revenue_growth_yoy,
                    profit_margin=row.profit_margin,
                    debt_to_equity=row.debt_to_equity,
                    market_cap=row.market_cap,
                    fetched_at=row.fetched_at,
                )
        except Exception:
            logger.exception("Failed to load fundamentals from DB for %s", ticker)
            return None

    async def _save_to_db(self, snapshot: FundamentalsSnapshot) -> None:
        try:
            async with self._session_factory() as session:
                stmt = select(Fundamentals).where(Fundamentals.ticker == snapshot.ticker)
                existing = (await session.execute(stmt)).scalar_one_or_none()

                if existing:
                    existing.eps_ttm = snapshot.eps_ttm
                    existing.pe_ratio = snapshot.pe_ratio
                    existing.peg_ratio = snapshot.peg_ratio
                    existing.revenue_growth_yoy = snapshot.revenue_growth_yoy
                    existing.profit_margin = snapshot.profit_margin
                    existing.debt_to_equity = snapshot.debt_to_equity
                    existing.market_cap = snapshot.market_cap
                    existing.fetched_at = snapshot.fetched_at
                else:
                    row = Fundamentals(
                        ticker=snapshot.ticker,
                        eps_ttm=snapshot.eps_ttm,
                        pe_ratio=snapshot.pe_ratio,
                        peg_ratio=snapshot.peg_ratio,
                        revenue_growth_yoy=snapshot.revenue_growth_yoy,
                        profit_margin=snapshot.profit_margin,
                        debt_to_equity=snapshot.debt_to_equity,
                        market_cap=snapshot.market_cap,
                        fetched_at=snapshot.fetched_at,
                    )
                    session.add(row)

                await session.commit()
                logger.debug("Saved fundamentals for %s to DB", snapshot.ticker)
        except Exception:
            logger.exception("Failed to save fundamentals for %s to DB", snapshot.ticker)

Step 5: Run tests

Run: python -m pytest tests/test_fundamentals.py tests/test_models.py -v Expected: PASS

Step 6: Commit

git add shared/models/fundamentals.py shared/models/__init__.py shared/fundamentals/cache.py alembic/
git commit -m "feat: add fundamentals DB model and cached provider"

Task 5: Implement the 6 new strategies

Files:

  • Create: shared/strategies/value.py
  • Create: shared/strategies/macd_crossover.py
  • Create: shared/strategies/bollinger_breakout.py
  • Create: shared/strategies/vwap.py
  • Create: shared/strategies/liquidity.py
  • Create: shared/strategies/ma_stack.py
  • Modify: shared/strategies/__init__.py
  • Test: tests/test_new_strategies.py (new)

Step 1: Write failing tests

Create tests/test_new_strategies.py:

"""Tests for the 6 new trading strategies."""

import pytest
from datetime import datetime, timezone

from shared.fundamentals.base import FundamentalsSnapshot
from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection
from shared.strategies import BaseStrategy
from shared.strategies.value import ValueStrategy
from shared.strategies.macd_crossover import MACDCrossoverStrategy
from shared.strategies.bollinger_breakout import BollingerBreakoutStrategy
from shared.strategies.vwap import VWAPStrategy
from shared.strategies.liquidity import LiquidityStrategy
from shared.strategies.ma_stack import MAStackStrategy


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _market(
    ticker: str = "AAPL",
    price: float = 150.0,
    volume: float = 1_000_000,
    sma_20: float | None = None,
    sma_50: float | None = None,
    sma_200: float | None = None,
    rsi: float | None = None,
    ema_9: float | None = None,
    ema_21: float | None = None,
    macd: float | None = None,
    macd_signal: float | None = None,
    macd_histogram: float | None = None,
    bollinger_upper: float | None = None,
    bollinger_mid: float | None = None,
    bollinger_lower: float | None = None,
    vwap: float | None = None,
    atr: float | None = None,
    fundamentals: FundamentalsSnapshot | None = None,
    bars: list | None = None,
) -> MarketSnapshot:
    return MarketSnapshot(
        ticker=ticker,
        current_price=price,
        open=price - 1,
        high=price + 2,
        low=price - 2,
        close=price,
        volume=volume,
        sma_20=sma_20,
        sma_50=sma_50,
        sma_200=sma_200,
        rsi=rsi,
        ema_9=ema_9,
        ema_21=ema_21,
        macd=macd,
        macd_signal=macd_signal,
        macd_histogram=macd_histogram,
        bollinger_upper=bollinger_upper,
        bollinger_mid=bollinger_mid,
        bollinger_lower=bollinger_lower,
        vwap=vwap,
        atr=atr,
        fundamentals=fundamentals,
        bars=bars or [],
    )


def _fundamentals(
    ticker: str = "AAPL",
    eps_ttm: float | None = 6.5,
    pe_ratio: float | None = 28.0,
    peg_ratio: float | None = 1.5,
    revenue_growth_yoy: float | None = 0.08,
    profit_margin: float | None = 0.25,
    debt_to_equity: float | None = 1.8,
) -> FundamentalsSnapshot:
    return FundamentalsSnapshot(
        ticker=ticker,
        eps_ttm=eps_ttm,
        pe_ratio=pe_ratio,
        peg_ratio=peg_ratio,
        revenue_growth_yoy=revenue_growth_yoy,
        profit_margin=profit_margin,
        debt_to_equity=debt_to_equity,
        fetched_at=datetime.now(timezone.utc),
    )


# ===================================================================
# Value strategy
# ===================================================================


class TestValueStrategy:

    @pytest.fixture()
    def strategy(self) -> ValueStrategy:
        return ValueStrategy()

    @pytest.mark.asyncio
    async def test_long_signal_undervalued(self, strategy: ValueStrategy) -> None:
        f = _fundamentals(peg_ratio=0.8, pe_ratio=15.0, eps_ttm=5.0, revenue_growth_yoy=0.1)
        market = _market(fundamentals=f)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_short_signal_overvalued(self, strategy: ValueStrategy) -> None:
        f = _fundamentals(peg_ratio=4.0, pe_ratio=60.0, eps_ttm=-1.0)
        market = _market(fundamentals=f)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.SHORT

    @pytest.mark.asyncio
    async def test_no_signal_without_fundamentals(self, strategy: ValueStrategy) -> None:
        market = _market(fundamentals=None)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_no_signal_neutral_fundamentals(self, strategy: ValueStrategy) -> None:
        f = _fundamentals(peg_ratio=1.5, pe_ratio=20.0, eps_ttm=3.0)
        market = _market(fundamentals=f)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_strength_bounded(self, strategy: ValueStrategy) -> None:
        f = _fundamentals(peg_ratio=0.1, pe_ratio=5.0, eps_ttm=10.0, revenue_growth_yoy=0.5)
        market = _market(fundamentals=f)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert 0 < signal.strength <= 1.0


# ===================================================================
# MACD Crossover strategy
# ===================================================================


class TestMACDCrossoverStrategy:

    @pytest.fixture()
    def strategy(self) -> MACDCrossoverStrategy:
        return MACDCrossoverStrategy()

    @pytest.mark.asyncio
    async def test_long_signal_bullish_crossover(self, strategy: MACDCrossoverStrategy) -> None:
        # Simulate: previous MACD was below signal, now above
        # First call sets the state
        market_prev = _market(macd=-0.5, macd_signal=0.1, macd_histogram=-0.6, atr=2.0)
        await strategy.evaluate("AAPL", market_prev)

        # Second call: MACD crossed above signal
        market = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_short_signal_bearish_crossover(self, strategy: MACDCrossoverStrategy) -> None:
        # Previous: MACD above signal
        market_prev = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
        await strategy.evaluate("AAPL", market_prev)

        # Now: MACD crossed below signal
        market = _market(macd=-0.5, macd_signal=0.1, macd_histogram=-0.6, atr=2.0)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.SHORT

    @pytest.mark.asyncio
    async def test_no_signal_without_macd(self, strategy: MACDCrossoverStrategy) -> None:
        market = _market(macd=None, macd_signal=None)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_no_signal_without_crossover(self, strategy: MACDCrossoverStrategy) -> None:
        # Both calls have MACD above signal — no crossover
        market1 = _market(macd=0.5, macd_signal=0.1, macd_histogram=0.4, atr=2.0)
        await strategy.evaluate("AAPL", market1)
        market2 = _market(macd=0.6, macd_signal=0.1, macd_histogram=0.5, atr=2.0)
        signal = await strategy.evaluate("AAPL", market2)
        assert signal is None


# ===================================================================
# Bollinger Breakout strategy
# ===================================================================


class TestBollingerBreakoutStrategy:

    @pytest.fixture()
    def strategy(self) -> BollingerBreakoutStrategy:
        return BollingerBreakoutStrategy()

    @pytest.mark.asyncio
    async def test_long_signal_above_upper_band(self, strategy: BollingerBreakoutStrategy) -> None:
        market = _market(
            price=155.0, bollinger_upper=152.0, bollinger_mid=150.0, bollinger_lower=148.0,
            volume=2_000_000, sma_20=150.0,
        )
        # Need bars to compute average volume
        bars = [{"volume": 1_000_000}] * 20
        market.bars = bars
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_long_signal_below_lower_band(self, strategy: BollingerBreakoutStrategy) -> None:
        market = _market(
            price=145.0, bollinger_upper=152.0, bollinger_mid=150.0, bollinger_lower=148.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_no_signal_inside_bands(self, strategy: BollingerBreakoutStrategy) -> None:
        market = _market(
            price=150.0, bollinger_upper=155.0, bollinger_mid=150.0, bollinger_lower=145.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_no_signal_without_bollinger(self, strategy: BollingerBreakoutStrategy) -> None:
        market = _market()
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None


# ===================================================================
# VWAP strategy
# ===================================================================


class TestVWAPStrategy:

    @pytest.fixture()
    def strategy(self) -> VWAPStrategy:
        return VWAPStrategy()

    @pytest.mark.asyncio
    async def test_long_signal_above_vwap(self, strategy: VWAPStrategy) -> None:
        # First call: below VWAP
        market_prev = _market(price=148.0, vwap=150.0, volume=1_000_000)
        await strategy.evaluate("AAPL", market_prev)

        # Second call: crossed above VWAP with volume
        market = _market(price=152.0, vwap=150.0, volume=1_500_000)
        market.bars = [{"volume": 1_000_000}] * 20
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_short_signal_below_vwap(self, strategy: VWAPStrategy) -> None:
        market_prev = _market(price=152.0, vwap=150.0, volume=1_000_000)
        await strategy.evaluate("AAPL", market_prev)

        market = _market(price=148.0, vwap=150.0, volume=1_500_000)
        market.bars = [{"volume": 1_000_000}] * 20
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.SHORT

    @pytest.mark.asyncio
    async def test_no_signal_without_vwap(self, strategy: VWAPStrategy) -> None:
        market = _market(vwap=None)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None


# ===================================================================
# Liquidity strategy
# ===================================================================


class TestLiquidityStrategy:

    @pytest.fixture()
    def strategy(self) -> LiquidityStrategy:
        return LiquidityStrategy()

    @pytest.mark.asyncio
    async def test_long_signal_high_volume_up(self, strategy: LiquidityStrategy) -> None:
        bars = [{"close": 100.0, "volume": 1_000_000}] * 20
        market = _market(price=105.0, volume=2_500_000, bars=bars)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG

    @pytest.mark.asyncio
    async def test_short_signal_high_volume_down(self, strategy: LiquidityStrategy) -> None:
        bars = [{"close": 100.0, "volume": 1_000_000}] * 20
        market = _market(price=95.0, volume=2_500_000, bars=bars)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.SHORT

    @pytest.mark.asyncio
    async def test_no_signal_thin_volume(self, strategy: LiquidityStrategy) -> None:
        bars = [{"close": 100.0, "volume": 1_000_000}] * 20
        market = _market(price=105.0, volume=400_000, bars=bars)
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_no_signal_no_bars(self, strategy: LiquidityStrategy) -> None:
        market = _market(price=105.0, volume=2_000_000, bars=[])
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None


# ===================================================================
# MA Stack strategy
# ===================================================================


class TestMAStackStrategy:

    @pytest.fixture()
    def strategy(self) -> MAStackStrategy:
        return MAStackStrategy()

    @pytest.mark.asyncio
    async def test_full_bull_alignment(self, strategy: MAStackStrategy) -> None:
        market = _market(
            price=160.0, ema_9=155.0, ema_21=150.0, sma_50=145.0, sma_200=140.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.LONG
        assert signal.strength > 0.5

    @pytest.mark.asyncio
    async def test_full_bear_alignment(self, strategy: MAStackStrategy) -> None:
        market = _market(
            price=130.0, ema_9=135.0, ema_21=140.0, sma_50=145.0, sma_200=150.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        assert signal is not None
        assert signal.direction == SignalDirection.SHORT
        assert signal.strength > 0.5

    @pytest.mark.asyncio
    async def test_no_signal_tangled_mas(self, strategy: MAStackStrategy) -> None:
        market = _market(
            price=150.0, ema_9=151.0, ema_21=149.0, sma_50=152.0, sma_200=148.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_no_signal_missing_mas(self, strategy: MAStackStrategy) -> None:
        market = _market(ema_9=155.0)  # Missing others
        signal = await strategy.evaluate("AAPL", market)
        assert signal is None

    @pytest.mark.asyncio
    async def test_partial_bull(self, strategy: MAStackStrategy) -> None:
        # Price > EMA-9 > EMA-21, but below SMA-200 (partial bull)
        market = _market(
            price=155.0, ema_9=152.0, ema_21=148.0, sma_50=145.0, sma_200=160.0,
        )
        signal = await strategy.evaluate("AAPL", market)
        # Should still produce LONG but with lower strength
        if signal is not None:
            assert signal.direction == SignalDirection.LONG
            assert signal.strength < 0.8


# ===================================================================
# Cross-strategy tests
# ===================================================================


class TestNewStrategyCrossChecks:

    def test_all_new_strategies_inherit_base(self) -> None:
        for cls in (ValueStrategy, MACDCrossoverStrategy, BollingerBreakoutStrategy,
                    VWAPStrategy, LiquidityStrategy, MAStackStrategy):
            assert issubclass(cls, BaseStrategy)

    def test_all_strategy_names_unique(self) -> None:
        strategies = [
            ValueStrategy(), MACDCrossoverStrategy(), BollingerBreakoutStrategy(),
            VWAPStrategy(), LiquidityStrategy(), MAStackStrategy(),
        ]
        names = [s.name for s in strategies]
        assert len(names) == len(set(names))

    def test_strategy_names_non_empty(self) -> None:
        for cls in (ValueStrategy, MACDCrossoverStrategy, BollingerBreakoutStrategy,
                    VWAPStrategy, LiquidityStrategy, MAStackStrategy):
            assert len(cls().name) > 0

Step 2: Run tests to verify they fail

Run: python -m pytest tests/test_new_strategies.py -v Expected: FAIL — strategy modules don't exist

Step 3: Implement all 6 strategies

Create each strategy file following the BaseStrategy.evaluate() interface. Each strategy:

  • Has a name: str class attribute
  • Implements async def evaluate(self, ticker, market, sentiment) -> TradeSignal | None
  • Returns None when required data is missing

shared/strategies/value.py:

"""Value investing strategy — trade on fundamental data."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class ValueStrategy(BaseStrategy):
    """Generate signals from fundamental metrics (EPS, P/E, PEG, etc.)."""

    name: str = "value"

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        f = market.fundamentals
        if f is None:
            return None

        # Need at least PEG and P/E to form an opinion
        if f.peg_ratio is None or f.pe_ratio is None:
            return None

        score = 0.0  # Positive = undervalued, negative = overvalued

        # PEG ratio: < 1.0 is undervalued, > 3.0 is overvalued
        if f.peg_ratio < 1.0:
            score += (1.0 - f.peg_ratio)  # max 1.0
        elif f.peg_ratio > 3.0:
            score -= min((f.peg_ratio - 3.0) / 3.0, 1.0)

        # P/E ratio: < 15 is cheap, > 40 is expensive
        if f.pe_ratio < 15:
            score += (15 - f.pe_ratio) / 15
        elif f.pe_ratio > 40:
            score -= min((f.pe_ratio - 40) / 40, 1.0)

        # EPS: positive is good, negative is bad
        if f.eps_ttm is not None:
            if f.eps_ttm > 0:
                score += 0.2
            elif f.eps_ttm < 0:
                score -= 0.3

        # Revenue growth: positive growth is bullish
        if f.revenue_growth_yoy is not None:
            if f.revenue_growth_yoy > 0.1:
                score += 0.2
            elif f.revenue_growth_yoy < -0.1:
                score -= 0.2

        # Profit margin: healthy margin is bullish
        if f.profit_margin is not None:
            if f.profit_margin > 0.15:
                score += 0.1
            elif f.profit_margin < 0:
                score -= 0.2

        # Debt-to-equity: low is healthy
        if f.debt_to_equity is not None:
            if f.debt_to_equity > 3.0:
                score -= 0.2
            elif f.debt_to_equity < 0.5:
                score += 0.1

        # Determine direction and strength
        if score > 0.3:
            direction = SignalDirection.LONG
        elif score < -0.3:
            direction = SignalDirection.SHORT
        else:
            return None

        strength = max(0.0, min(1.0, abs(score) / 2.0))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )

shared/strategies/macd_crossover.py:

"""MACD crossover strategy — trade on MACD/signal line crossovers."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class MACDCrossoverStrategy(BaseStrategy):
    """Detect MACD/signal line crossovers for trade signals."""

    name: str = "macd_crossover"

    def __init__(self) -> None:
        self._prev_macd: dict[str, float] = {}
        self._prev_signal: dict[str, float] = {}

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        if market.macd is None or market.macd_signal is None:
            return None

        macd = market.macd
        sig = market.macd_signal
        prev_macd = self._prev_macd.get(ticker)
        prev_signal = self._prev_signal.get(ticker)

        # Store current state for next evaluation
        self._prev_macd[ticker] = macd
        self._prev_signal[ticker] = sig

        # Need previous state to detect a crossover
        if prev_macd is None or prev_signal is None:
            return None

        prev_diff = prev_macd - prev_signal
        curr_diff = macd - sig

        # Detect crossover: sign change
        if prev_diff <= 0 and curr_diff > 0:
            direction = SignalDirection.LONG
        elif prev_diff >= 0 and curr_diff < 0:
            direction = SignalDirection.SHORT
        else:
            return None

        # Strength: magnitude of histogram normalized by ATR if available
        histogram = abs(market.macd_histogram) if market.macd_histogram is not None else abs(curr_diff)
        if market.atr and market.atr > 0:
            raw_strength = histogram / market.atr
        else:
            raw_strength = min(histogram / 2.0, 1.0)

        strength = max(0.0, min(1.0, raw_strength))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )

shared/strategies/bollinger_breakout.py:

"""Bollinger Bands breakout strategy."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class BollingerBreakoutStrategy(BaseStrategy):
    """Generate signals from Bollinger Band breakouts."""

    name: str = "bollinger_breakout"

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        if market.bollinger_upper is None or market.bollinger_lower is None or market.bollinger_mid is None:
            return None

        price = market.current_price
        upper = market.bollinger_upper
        lower = market.bollinger_lower
        band_width = upper - lower
        if band_width <= 0:
            return None

        direction: SignalDirection | None = None

        if price > upper:
            # Breakout above upper band — check for volume confirmation
            avg_vol = _avg_volume(market.bars)
            if avg_vol and market.volume > avg_vol * 1.5:
                direction = SignalDirection.LONG
            else:
                # Above band but no volume = potential failed breakout, skip
                return None
        elif price < lower:
            # Below lower band — mean reversion bounce expected
            direction = SignalDirection.LONG
        else:
            return None

        # Strength: how far price is from the band, relative to band width
        if direction == SignalDirection.LONG and price > upper:
            raw_strength = (price - upper) / band_width
        elif price < lower:
            raw_strength = (lower - price) / band_width
        else:
            raw_strength = 0.3

        strength = max(0.0, min(1.0, raw_strength))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )


def _avg_volume(bars: list[dict]) -> float | None:
    """Compute average volume from bar dicts."""
    if not bars:
        return None
    volumes = [b.get("volume", 0) for b in bars if "volume" in b]
    if not volumes:
        return None
    return sum(volumes) / len(volumes)

shared/strategies/vwap.py:

"""VWAP strategy — trade on price crossing VWAP."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class VWAPStrategy(BaseStrategy):
    """Generate signals when price crosses VWAP with volume confirmation."""

    name: str = "vwap"

    def __init__(self) -> None:
        self._prev_price: dict[str, float] = {}
        self._prev_vwap: dict[str, float] = {}

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        if market.vwap is None:
            return None

        price = market.current_price
        vwap = market.vwap
        prev_price = self._prev_price.get(ticker)
        prev_vwap = self._prev_vwap.get(ticker)

        self._prev_price[ticker] = price
        self._prev_vwap[ticker] = vwap

        if prev_price is None or prev_vwap is None:
            return None

        prev_diff = prev_price - prev_vwap
        curr_diff = price - vwap

        # Detect crossover
        if prev_diff <= 0 and curr_diff > 0:
            direction = SignalDirection.LONG
        elif prev_diff >= 0 and curr_diff < 0:
            direction = SignalDirection.SHORT
        else:
            return None

        # Strength: distance from VWAP as % of price, amplified by relative volume
        distance_pct = abs(curr_diff) / price if price > 0 else 0
        vol_ratio = 1.0
        avg_vol = _avg_volume(market.bars)
        if avg_vol and avg_vol > 0:
            vol_ratio = min(market.volume / avg_vol, 3.0) / 3.0

        raw_strength = distance_pct * 20 * vol_ratio  # Scale up since % distance is small
        strength = max(0.0, min(1.0, raw_strength))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )


def _avg_volume(bars: list[dict]) -> float | None:
    if not bars:
        return None
    volumes = [b.get("volume", 0) for b in bars if "volume" in b]
    return sum(volumes) / len(volumes) if volumes else None

shared/strategies/liquidity.py:

"""Liquidity strategy — trade on volume patterns."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class LiquidityStrategy(BaseStrategy):
    """Analyze volume-based liquidity to confirm directional moves."""

    name: str = "liquidity"

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        if not market.bars or len(market.bars) < 5:
            return None

        # Compute average volume from bars
        volumes = [b.get("volume", 0) for b in market.bars if "volume" in b]
        if not volumes:
            return None

        avg_vol = sum(volumes) / len(volumes)
        if avg_vol <= 0:
            return None

        relative_volume = market.volume / avg_vol

        # Thin liquidity — unreliable, skip
        if relative_volume < 1.0:
            return None

        # Determine price direction from bars
        closes = [b.get("close", 0) for b in market.bars if "close" in b]
        if len(closes) < 2:
            return None

        # Compare current price to recent average close
        recent_avg = sum(closes[-5:]) / min(len(closes), 5)
        price_change = (market.current_price - recent_avg) / recent_avg if recent_avg > 0 else 0

        if relative_volume >= 2.0 and price_change > 0.001:
            direction = SignalDirection.LONG
        elif relative_volume >= 2.0 and price_change < -0.001:
            direction = SignalDirection.SHORT
        elif price_change > 0.001 and relative_volume < 0.7:
            # Price rising on declining volume — weak rally
            direction = SignalDirection.SHORT
        else:
            return None

        # Strength based on relative volume magnitude
        raw_strength = min(relative_volume / 4.0, 1.0)
        strength = max(0.0, min(1.0, raw_strength))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )

shared/strategies/ma_stack.py:

"""MA Stack strategy — read the full 5-MA stack for trend alignment."""

from datetime import datetime, timezone

from shared.schemas.trading import MarketSnapshot, SentimentContext, SignalDirection, TradeSignal
from shared.strategies.base import BaseStrategy


class MAStackStrategy(BaseStrategy):
    """Assess trend strength from the alignment of 5 moving averages."""

    name: str = "ma_stack"

    def __init__(self) -> None:
        self._prev_sma50: dict[str, float] = {}
        self._prev_sma200: dict[str, float] = {}

    async def evaluate(
        self,
        ticker: str,
        market: MarketSnapshot,
        sentiment: SentimentContext | None = None,
    ) -> TradeSignal | None:
        # Need all MAs
        if any(v is None for v in [market.ema_9, market.ema_21, market.sma_50, market.sma_200]):
            return None

        price = market.current_price
        ema_9 = market.ema_9
        ema_21 = market.ema_21
        sma_50 = market.sma_50
        sma_200 = market.sma_200

        # Count bull alignment: each pair in order scores a point
        values = [price, ema_9, ema_21, sma_50, sma_200]
        bull_score = sum(1 for i in range(len(values) - 1) if values[i] > values[i + 1])
        bear_score = sum(1 for i in range(len(values) - 1) if values[i] < values[i + 1])

        # Golden/death cross detection
        prev_50 = self._prev_sma50.get(ticker)
        prev_200 = self._prev_sma200.get(ticker)
        self._prev_sma50[ticker] = sma_50
        self._prev_sma200[ticker] = sma_200

        cross_bonus = 0.0
        if prev_50 is not None and prev_200 is not None:
            if prev_50 <= prev_200 and sma_50 > sma_200:
                cross_bonus = 0.2  # Golden cross
            elif prev_50 >= prev_200 and sma_50 < sma_200:
                cross_bonus = -0.2  # Death cross

        if bull_score >= 3:
            direction = SignalDirection.LONG
            raw_strength = bull_score / 4.0 + max(cross_bonus, 0)
        elif bear_score >= 3:
            direction = SignalDirection.SHORT
            raw_strength = bear_score / 4.0 + max(-cross_bonus, 0)
        else:
            # Tangled — no clear trend
            return None

        strength = max(0.0, min(1.0, raw_strength))

        return TradeSignal(
            ticker=ticker,
            direction=direction,
            strength=strength,
            strategy_sources=[self.name],
            timestamp=datetime.now(tz=timezone.utc),
        )

Step 4: Update shared/strategies/__init__.py

Add imports for all 6 new strategies and update __all__.

Step 5: Run tests

Run: python -m pytest tests/test_new_strategies.py tests/test_strategies.py -v Expected: All PASS

Step 6: Commit

git add shared/strategies/ tests/test_new_strategies.py
git commit -m "feat: add 6 new strategies (value, MACD, Bollinger, VWAP, liquidity, MA stack)"

Task 6: Wire everything into the signal generator

Files:

  • Modify: services/signal_generator/main.py
  • Modify: services/signal_generator/config.py
  • Modify: .env
  • Modify: scripts/seed_strategies.py
  • Modify: pyproject.toml

Step 1: Update signal generator config

In services/signal_generator/config.py, add:

alpha_vantage_api_key: str = ""
fmp_api_key: str = ""
fundamentals_cache_ttl_hours: int = 24

Step 2: Update .env

Add:

TRADING_ALPHA_VANTAGE_API_KEY=M0I3TWB6VKU0UF51
TRADING_FMP_API_KEY=34zqbQFeRxYvPtzp3Y5QLKPVPztkZyfK
TRADING_FUNDAMENTALS_CACHE_TTL_HOURS=24
TRADING_HISTORICAL_BARS=250

Step 3: Update services/signal_generator/main.py

  1. Import the 6 new strategies.
  2. Update _DEFAULT_WEIGHTS to include all 9 strategies (equal weight ~0.111).
  3. Add fundamentals initialization:
    • Create RotatingProvider with all three providers
    • Wrap with CachedFundamentalsProvider
    • Fetch fundamentals for watchlist tickers on startup
    • Store in a dict[str, FundamentalsSnapshot]
  4. In _consume_scored_articles, inject fundamentals into MarketSnapshot before calling ensemble:
    if fundamentals_cache:
        snapshot.fundamentals = fundamentals_cache.get(ticker)
    
  5. Add a daily refresh background task for fundamentals.

Step 4: Update scripts/seed_strategies.py

Add 6 new strategies to DEFAULT_STRATEGIES with descriptions and equal weights (recalculate to ~0.111).

Step 5: Update pyproject.toml

Add yfinance to the trading optional dependency group:

trading = ["alpaca-py>=0.21", "pytz>=2024.1", "yfinance>=0.2"]

Add httpx to the trading group if not already available (it's in news group but signal-generator uses the trading group):

trading = ["alpaca-py>=0.21", "pytz>=2024.1", "yfinance>=0.2", "httpx>=0.27"]

Step 6: Run all tests

Run: python -m pytest tests/ -v -m "not integration" --timeout=30 Expected: All tests PASS (should be ~270+ tests now)

Step 7: Commit

git add services/signal_generator/ scripts/seed_strategies.py .env pyproject.toml
git commit -m "feat: wire 6 new strategies and fundamentals into signal generator"

Task 7: Update MarketDataManager max_bars default

Files:

  • Modify: services/signal_generator/market_data.py:16

Step 1: Update default max bars

Change _DEFAULT_MAX_BARS = 100 to _DEFAULT_MAX_BARS = 250.

Step 2: Run tests

Run: python -m pytest tests/test_indicators.py tests/ -v -m "not integration" --timeout=30 Expected: All PASS

Step 3: Commit

git add services/signal_generator/market_data.py
git commit -m "feat: increase default max bars to 250 for SMA-200 support"

Task 8: Run migration and seed new strategies

Step 1: Apply migration (if docker is running)

Run: docker compose exec api-gateway python -m alembic upgrade head

Step 2: Seed new strategies

Run: docker compose exec api-gateway python -m scripts.seed_strategies

Step 3: Rebuild and restart services

Run: docker compose build signal-generator && docker compose up -d signal-generator

Step 4: Verify

Check logs: docker compose logs signal-generator --tail 50 Expected: All 9 strategies loaded, fundamentals fetched for watchlist tickers.