trading/tests/services/test_signal_generator.py
Viktor Barzin e2a3bd456d
feat: real data pipeline — market data, DB persistence, portfolio sync, signal-trade linkage
Wire the trading bot to real Alpaca market data and persist pipeline
state to the database so the dashboard displays live information.

- Add market-data service fetching OHLCV bars from Alpaca, publishing
  to market:bars Redis Stream; signal generator consumes bars and
  injects current_price into signals for position sizing
- Sentiment analyzer now persists Article + ArticleSentiment rows to
  DB after scoring, with duplicate and error handling
- API gateway runs a background portfolio sync task that snapshots
  Alpaca account state into PortfolioSnapshot/Position DB tables
  during market hours
- TradeSignal carries a signal_id UUID; signal generator and trade
  executor both persist their records to DB with cross-references
- 303 unit tests pass (57 new tests added)
2026-02-22 19:52:45 +00:00

557 lines
21 KiB
Python

"""Tests for the Signal Generator service.
Covers MarketDataManager (SMA, RSI, snapshot) and WeightedEnsemble
(signal combination, threshold filtering, strategy source tagging).
"""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from services.signal_generator.ensemble import WeightedEnsemble
from services.signal_generator.market_data import MarketDataManager
from shared.schemas.trading import (
MarketSnapshot,
OHLCVBar,
SentimentContext,
SignalDirection,
TradeSignal,
)
from shared.strategies.base import BaseStrategy
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_bar(close: float, *, ts_offset: int = 0) -> OHLCVBar:
"""Create an ``OHLCVBar`` with the given close price."""
return OHLCVBar(
timestamp=datetime(2026, 1, 1, 10, ts_offset, tzinfo=timezone.utc),
open=close - 0.5,
high=close + 1.0,
low=close - 1.0,
close=close,
volume=1000.0,
)
class _StubStrategy(BaseStrategy):
"""Test helper that returns a preconfigured signal."""
def __init__(self, name: str, signal: TradeSignal | None) -> None:
self.name = name
self._signal = signal
async def evaluate(self, ticker, market, sentiment=None):
return self._signal
def _make_signal(
direction: SignalDirection = SignalDirection.LONG,
strength: float = 0.8,
sources: list[str] | None = None,
) -> TradeSignal:
return TradeSignal(
ticker="AAPL",
direction=direction,
strength=strength,
strategy_sources=sources or ["test"],
timestamp=datetime.now(timezone.utc),
)
# ---------------------------------------------------------------------------
# MarketDataManager — SMA
# ---------------------------------------------------------------------------
class TestMarketDataManagerSMA:
"""Tests for SMA computation inside MarketDataManager."""
def test_sma_basic(self):
"""SMA-20 should equal the mean of the last 20 close prices."""
mgr = MarketDataManager()
closes = list(range(1, 21)) # 1, 2, ..., 20
for i, c in enumerate(closes):
mgr.add_bar("AAPL", _make_bar(float(c), ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
expected_sma_20 = sum(closes) / 20
assert snap.sma_20 == pytest.approx(expected_sma_20)
def test_sma_returns_none_insufficient_data(self):
"""SMA-20 should be None when fewer than 20 bars exist."""
mgr = MarketDataManager()
for i in range(10):
mgr.add_bar("AAPL", _make_bar(100.0, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.sma_20 is None
def test_sma_50_requires_50_bars(self):
"""SMA-50 should be None with only 30 bars, present with 50."""
mgr = MarketDataManager()
for i in range(30):
mgr.add_bar("AAPL", _make_bar(float(i + 1), ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.sma_50 is None
# Add 20 more
for i in range(30, 50):
mgr.add_bar("AAPL", _make_bar(float(i + 1), ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.sma_50 is not None
expected = sum(range(1, 51)) / 50
assert snap.sma_50 == pytest.approx(expected)
# ---------------------------------------------------------------------------
# MarketDataManager — RSI
# ---------------------------------------------------------------------------
class TestMarketDataManagerRSI:
"""Tests for RSI computation inside MarketDataManager."""
def test_rsi_all_gains(self):
"""RSI should be 100 when all price changes are positive."""
mgr = MarketDataManager()
for i in range(20):
mgr.add_bar("AAPL", _make_bar(100.0 + i, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.rsi == pytest.approx(100.0)
def test_rsi_all_losses(self):
"""RSI should be 0 when all price changes are negative."""
mgr = MarketDataManager()
for i in range(20):
mgr.add_bar("AAPL", _make_bar(200.0 - i, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.rsi == pytest.approx(0.0)
def test_rsi_mixed(self):
"""RSI should be between 0 and 100 with mixed gains and losses."""
mgr = MarketDataManager()
prices = [44, 44.34, 44.09, 43.61, 44.33, 44.83, 45.10, 45.42,
45.84, 46.08, 45.89, 46.03, 45.61, 46.28, 46.28, 46.00]
for i, p in enumerate(prices):
mgr.add_bar("AAPL", _make_bar(p, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.rsi is not None
assert 0 < snap.rsi < 100
def test_rsi_returns_none_insufficient_data(self):
"""RSI should be None when fewer than 15 bars exist (need 14+1)."""
mgr = MarketDataManager()
for i in range(10):
mgr.add_bar("AAPL", _make_bar(100.0, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.rsi is None
# ---------------------------------------------------------------------------
# MarketDataManager — snapshot
# ---------------------------------------------------------------------------
class TestMarketDataManagerSnapshot:
"""Tests for get_snapshot behaviour."""
def test_snapshot_returns_none_for_unknown_ticker(self):
mgr = MarketDataManager()
assert mgr.get_snapshot("UNKNOWN") is None
def test_snapshot_uses_latest_bar_for_price(self):
mgr = MarketDataManager()
mgr.add_bar("AAPL", _make_bar(100.0, ts_offset=0))
mgr.add_bar("AAPL", _make_bar(105.0, ts_offset=1))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert snap.current_price == 105.0
def test_snapshot_contains_bars(self):
mgr = MarketDataManager()
for i in range(5):
mgr.add_bar("AAPL", _make_bar(100.0 + i, ts_offset=i))
snap = mgr.get_snapshot("AAPL")
assert snap is not None
assert len(snap.bars) == 5
# ---------------------------------------------------------------------------
# WeightedEnsemble — combines signals
# ---------------------------------------------------------------------------
class TestEnsembleCombinesSignals:
"""Test that the ensemble correctly combines strategy signals."""
@pytest.mark.asyncio
async def test_combines_two_long_signals(self):
"""Two LONG signals should produce a combined LONG signal."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.8))
s2 = _StubStrategy("beta", _make_signal(SignalDirection.LONG, 0.6))
ensemble = WeightedEnsemble([s1, s2], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 0.5, "beta": 0.5}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is not None
assert signal.direction == SignalDirection.LONG
# Weighted average = (0.8*0.5 + 0.6*0.5) / (0.5+0.5) = 0.7
assert signal.strength == pytest.approx(0.7, abs=0.01)
@pytest.mark.asyncio
async def test_opposing_signals_net_direction(self):
"""When strategies disagree, direction follows the stronger weighted side."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
s2 = _StubStrategy("beta", _make_signal(SignalDirection.SHORT, 0.3))
ensemble = WeightedEnsemble([s1, s2], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 0.5, "beta": 0.5}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is not None
# Net direction should be LONG since alpha is stronger
assert signal.direction == SignalDirection.LONG
# ---------------------------------------------------------------------------
# WeightedEnsemble — threshold filtering
# ---------------------------------------------------------------------------
class TestEnsembleThresholdFiltering:
"""Test that weak combined signals are filtered out by the threshold."""
@pytest.mark.asyncio
async def test_below_threshold_returns_none(self):
"""Combined strength below threshold should yield None."""
# Two opposing signals of similar strength will nearly cancel out
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.5))
s2 = _StubStrategy("beta", _make_signal(SignalDirection.SHORT, 0.45))
ensemble = WeightedEnsemble([s1, s2], threshold=0.5)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 0.5, "beta": 0.5}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is None
@pytest.mark.asyncio
async def test_above_threshold_returns_signal(self):
"""Strong combined signal above threshold should yield a signal."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.3)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 1.0}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is not None
assert signal.strength >= 0.3
# ---------------------------------------------------------------------------
# WeightedEnsemble — no signals returns None
# ---------------------------------------------------------------------------
class TestEnsembleNoSignals:
"""Test that the ensemble returns None when no strategy fires."""
@pytest.mark.asyncio
async def test_all_strategies_return_none(self):
s1 = _StubStrategy("alpha", None)
s2 = _StubStrategy("beta", None)
ensemble = WeightedEnsemble([s1, s2], threshold=0.3)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 0.5, "beta": 0.5}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is None
# ---------------------------------------------------------------------------
# WeightedEnsemble — tags strategy sources
# ---------------------------------------------------------------------------
class TestEnsembleTagsStrategySources:
"""Verify that the output signal records which strategies contributed."""
@pytest.mark.asyncio
async def test_strategy_sources_contains_all_contributors(self):
s1 = _StubStrategy("momentum", _make_signal(SignalDirection.LONG, 0.7, ["momentum"]))
s2 = _StubStrategy("news_driven", _make_signal(SignalDirection.LONG, 0.6, ["news_driven"]))
s3 = _StubStrategy("mean_reversion", None) # does not contribute
ensemble = WeightedEnsemble([s1, s2, s3], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"momentum": 0.5, "news_driven": 0.3, "mean_reversion": 0.2}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is not None
# Should have exactly 2 sources
assert len(signal.strategy_sources) == 2
source_names = [s.split(":")[0] for s in signal.strategy_sources]
assert "momentum" in source_names
assert "news_driven" in source_names
# mean_reversion should NOT be present
assert "mean_reversion" not in source_names
@pytest.mark.asyncio
async def test_strategy_sources_contain_direction_and_strength(self):
"""Each source tag should be formatted as name:DIRECTION:strength."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.75))
ensemble = WeightedEnsemble([s1], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 1.0}
signal = await ensemble.evaluate("AAPL", market, None, weights)
assert signal is not None
assert len(signal.strategy_sources) == 1
parts = signal.strategy_sources[0].split(":")
assert parts[0] == "alpha"
assert parts[1] == "LONG"
assert float(parts[2]) == pytest.approx(0.75, abs=0.01)
# ---------------------------------------------------------------------------
# Signal Generator — current_price injection into sentiment_context
# ---------------------------------------------------------------------------
class TestCurrentPriceInjection:
"""Verify that current_price flows into sentiment_context on published signals."""
@pytest.mark.asyncio
async def test_current_price_set_when_snapshot_has_price(self):
"""When snapshot has a positive current_price, it should appear in sentiment_context."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL",
current_price=155.50,
open=154.0,
high=156.0,
low=153.0,
close=155.50,
volume=5000,
)
sentiment = SentimentContext(
ticker="AAPL",
avg_score=0.7,
article_count=5,
recent_scores=[0.6, 0.7, 0.8],
avg_confidence=0.85,
)
weights = {"alpha": 1.0}
signal_result = await ensemble.evaluate("AAPL", market, sentiment, weights)
assert signal_result is not None
# Simulate the current_price injection from the signal generator main loop
if signal_result.sentiment_context is None:
signal_result.sentiment_context = {}
if market.current_price > 0:
signal_result.sentiment_context["current_price"] = market.current_price
assert "current_price" in signal_result.sentiment_context
assert signal_result.sentiment_context["current_price"] == 155.50
@pytest.mark.asyncio
async def test_current_price_not_set_when_snapshot_is_none(self):
"""When snapshot is None (minimal fallback), current_price should not be injected."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.0)
# Minimal fallback snapshot with current_price=0.0
market = MarketSnapshot(
ticker="AAPL",
current_price=0.0,
open=0.0,
high=0.0,
low=0.0,
close=0.0,
volume=0.0,
)
weights = {"alpha": 1.0}
signal_result = await ensemble.evaluate("AAPL", market, None, weights)
assert signal_result is not None
# Simulate the injection logic — should NOT set current_price when price is 0
if market.current_price > 0:
if signal_result.sentiment_context is None:
signal_result.sentiment_context = {}
signal_result.sentiment_context["current_price"] = market.current_price
# sentiment_context should be None (no sentiment was passed, and no price injection)
assert signal_result.sentiment_context is None
@pytest.mark.asyncio
async def test_current_price_not_set_when_price_is_zero(self):
"""current_price should NOT be injected when snapshot.current_price is exactly 0."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.8))
ensemble = WeightedEnsemble([s1], threshold=0.0)
sentiment = SentimentContext(
ticker="AAPL",
avg_score=0.5,
article_count=3,
recent_scores=[0.5],
avg_confidence=0.9,
)
market = MarketSnapshot(
ticker="AAPL",
current_price=0.0,
open=0.0,
high=0.0,
low=0.0,
close=0.0,
volume=0.0,
)
weights = {"alpha": 1.0}
signal_result = await ensemble.evaluate("AAPL", market, sentiment, weights)
assert signal_result is not None
# Simulate injection logic
if market.current_price > 0:
if signal_result.sentiment_context is None:
signal_result.sentiment_context = {}
signal_result.sentiment_context["current_price"] = market.current_price
# sentiment_context should exist (from ensemble) but WITHOUT current_price
assert signal_result.sentiment_context is not None
assert "current_price" not in signal_result.sentiment_context
@pytest.mark.asyncio
async def test_current_price_preserves_existing_sentiment_context(self):
"""Injecting current_price should not overwrite other sentiment_context fields."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.0)
sentiment = SentimentContext(
ticker="AAPL",
avg_score=0.75,
article_count=10,
recent_scores=[0.7, 0.8],
avg_confidence=0.9,
)
market = MarketSnapshot(
ticker="AAPL",
current_price=200.0,
open=198.0,
high=202.0,
low=197.0,
close=200.0,
volume=8000,
)
weights = {"alpha": 1.0}
signal_result = await ensemble.evaluate("AAPL", market, sentiment, weights)
assert signal_result is not None
assert signal_result.sentiment_context is not None
# Preserve existing keys
original_keys = set(signal_result.sentiment_context.keys())
# Inject current_price
if market.current_price > 0:
signal_result.sentiment_context["current_price"] = market.current_price
# All original keys should still be present
for key in original_keys:
assert key in signal_result.sentiment_context
# Plus the new one
assert signal_result.sentiment_context["current_price"] == 200.0
# ---------------------------------------------------------------------------
# Signal Generator — signal_id generation
# ---------------------------------------------------------------------------
class TestSignalIdGeneration:
"""Verify that TradeSignal includes a signal_id UUID."""
@pytest.mark.asyncio
async def test_signal_has_uuid(self):
"""Each generated signal should have a signal_id UUID."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 1.0}
signal_result = await ensemble.evaluate("AAPL", market, None, weights)
assert signal_result is not None
assert signal_result.signal_id is not None
from uuid import UUID
assert isinstance(signal_result.signal_id, UUID)
@pytest.mark.asyncio
async def test_signal_ids_are_unique(self):
"""Multiple signals should get different signal_id values."""
s1 = _StubStrategy("alpha", _make_signal(SignalDirection.LONG, 0.9))
ensemble = WeightedEnsemble([s1], threshold=0.0)
market = MarketSnapshot(
ticker="AAPL", current_price=150.0,
open=149.0, high=151.0, low=148.0, close=150.0, volume=1000,
)
weights = {"alpha": 1.0}
sig1 = await ensemble.evaluate("AAPL", market, None, weights)
sig2 = await ensemble.evaluate("AAPL", market, None, weights)
assert sig1 is not None and sig2 is not None
assert sig1.signal_id != sig2.signal_id
def test_signal_id_serializes_in_json(self):
"""signal_id should serialize properly in JSON mode."""
signal = _make_signal(SignalDirection.LONG, 0.8)
data = signal.model_dump(mode="json")
assert "signal_id" in data
assert isinstance(data["signal_id"], str)