trading/.planning/codebase/TESTING.md
2026-02-23 20:04:05 +00:00

14 KiB

Testing Patterns

Analysis Date: 2026-02-23

Test Framework

Runner:

  • pytest 8.0+ (pytest>=8.0 in pyproject.toml)
  • Config: pyproject.toml under [tool.pytest.ini_options]
    • asyncio_mode = "auto" — Automatically handles async test discovery and execution
    • testpaths = ["tests"] — Tests located in root-level tests/ directory
    • Test markers: integration marks tests requiring docker services (redis, postgres)

Assertion Library:

  • Built-in pytest assertions (assert, assert x == y)
  • pytest.approx() for floating-point comparisons (e.g., assert outcome.realized_pnl == pytest.approx(100.0))

Run Commands:

python -m pytest tests/ -v                          # Run all tests
python -m pytest tests/ -v -m "not integration"    # Run unit tests only
python -m pytest tests/ -v -m integration          # Run integration tests only (requires docker)
python -m pytest tests/ --cov                      # Run with coverage report
python -m pytest tests/ -x                         # Stop on first failure
python -m pytest tests/ -k test_name               # Run tests matching pattern

Test Execution:

  • Async tests automatically discovered and run via asyncio_mode="auto"
  • No @pytest.mark.asyncio decorator needed (though present in some tests for clarity)
  • Integration tests require docker-compose up -d with Redis and PostgreSQL running

Test File Organization

Location:

  • Co-located tests pattern: Tests in tests/ directory mirroring services/ and shared/ structure
  • Structure:
    tests/
    ├── test_redis_streams.py      # Tests for shared/redis_streams.py
    ├── test_models.py             # Tests for shared/models/
    ├── test_schemas.py            # Tests for shared/schemas/
    ├── test_broker.py             # Tests for shared/broker/
    ├── test_strategies.py         # Tests for shared/strategies/
    ├── test_backtester.py         # Tests for backtester/
    ├── services/
    │   ├── test_news_fetcher.py
    │   ├── test_sentiment_analyzer.py
    │   ├── test_signal_generator.py
    │   ├── test_trade_executor.py
    │   ├── test_learning_engine.py
    │   ├── test_api_auth.py
    │   ├── test_api_routes.py
    │   ├── test_market_data.py
    │   └── test_portfolio_sync.py
    └── integration/
        ├── test_news_pipeline.py
        └── test_trading_flow.py
    

Naming:

  • Test files: test_{module}.py (e.g., test_redis_streams.py)
  • Test functions: test_{component}_{scenario} (e.g., test_publisher_publishes_json)
  • Test classes: Test{Scenario} (e.g., TestEvaluateProfitableTrade)
  • Helper functions: _make_{object} (e.g., _make_config, _make_signal, _make_trade_id)

Test Structure

Suite Organization:

# Module docstring describing test scope
"""Tests for the Redis Streams publish/consume helpers."""

# Imports (pytest first, then unittest.mock, then project imports)
import json
from unittest.mock import AsyncMock
import pytest
from shared.redis_streams import StreamConsumer, StreamPublisher

# Fixtures (if any)
@pytest.fixture
async def redis_client():
    """Provide a clean Redis connection and clean up after."""
    client = Redis.from_url(REDIS_URL)
    yield client
    await client.aclose()

# Test functions or classes
@pytest.mark.asyncio
async def test_publisher_publishes_json():
    """StreamPublisher should XADD a JSON-serialised payload."""
    redis = AsyncMock()
    # ... test implementation

class TestEvaluateProfitableTrade:
    """A long trade that gains in price should have positive PnL and ROI."""

    def test_evaluate_profitable_trade(self):
        # ... test implementation

Section Comments:

  • Use comment separators: # ---------------------------------------------------------------------------
  • Group tests by concern: Enums, Fixtures, RSS tests, Reddit tests, Integration tests, etc.
  • Example from test_models.py:
    # ---------------------------------------------------------------------------
    # Enum tests
    # ---------------------------------------------------------------------------
    
    class TestEnums:
    

Patterns:

  • Setup pattern: Create fixtures as pytest @pytest.fixture decorators

    • Can be module-level (reused) or function-level (isolated)
    • Async fixtures use async def and yield or yield from
    • Example:
      @pytest.fixture
      async def redis_client():
          client = Redis.from_url(REDIS_URL)
          yield client
          await client.aclose()
      
  • Teardown pattern: Use yield in fixtures for cleanup

    • Code after yield runs after the test completes
    • Example from test_news_pipeline.py:
      @pytest.fixture
      async def redis_client():
          client = Redis.from_url(REDIS_URL)
          await client.delete(RAW_STREAM, SCORED_STREAM)  # Setup
          yield client
          await client.delete(RAW_STREAM, SCORED_STREAM)  # Teardown
          await client.aclose()
      
  • Assertion pattern: Use pytest assertions directly

    • For equality: assert x == y
    • For calls: redis.xadd.assert_called_once_with(...)
    • For floating point: assert value == pytest.approx(expected)
    • Example from test_redis_streams.py:
      redis.xadd.assert_called_once_with(
          "test:stream",
          {"data": json.dumps({"ticker": "AAPL", "score": 0.8})},
      )
      assert msg_id == b"1-0"
      

Mocking

Framework: unittest.mock (built-in)

Patterns:

  • AsyncMock for async functions: AsyncMock(return_value=...)
  • MagicMock for sync functions and objects: MagicMock()
  • SimpleNamespace for lightweight objects: SimpleNamespace(title=..., score=...)

Example from test_redis_streams.py:

redis = AsyncMock()
redis.xadd = AsyncMock(return_value=b"1-0")

pub = StreamPublisher(redis, "test:stream")
msg_id = await pub.publish({"ticker": "AAPL"})

redis.xadd.assert_called_once_with(...)
assert msg_id == b"1-0"

Example from test_news_fetcher.py (multi-call behavior):

redis.xreadgroup = AsyncMock(
    side_effect=[
        [("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
        KeyboardInterrupt,  # Break loop on second call
    ]
)

What to Mock:

  • External services: Redis, database (use AsyncMock with return values)
  • API calls: HTTP requests, OpenTelemetry counters
  • ML models: FinBERT and Ollama analysis (patch and return synthetic scores)
  • Broker connections: Alpaca API (return fake order results)
  • File I/O and network operations

What NOT to Mock:

  • Core business logic (RiskManager, TradeEvaluator, WeightAdjuster)
  • Data structures and schemas
  • Internal function calls within a module
  • Time-based operations in unit tests (use fixtures for time-dependent tests)

Patching Example from test_sentiment_analyzer.py:

with patch("services.sentiment_analyzer.analyzers.finbert.FinBERTAnalyzer") as mock_finbert:
    mock_instance = AsyncMock()
    mock_instance.analyze = AsyncMock(return_value=(0.8, 0.95))
    mock_finbert.return_value = mock_instance
    # ... run test

Fixtures and Factories

Test Data Patterns:

Helper functions to create test objects:

def _make_config(**overrides) -> LearningEngineConfig:
    """Create a LearningEngineConfig with sensible defaults + overrides."""
    defaults = dict(
        learning_rate=0.1,
        min_trades_before_adjustment=20,
        max_weight_shift_pct=0.10,
    )
    defaults.update(overrides)
    return LearningEngineConfig(**defaults)

def _make_signal(
    ticker: str = "AAPL",
    direction: SignalDirection = SignalDirection.LONG,
) -> TradeSignal:
    return TradeSignal(
        ticker=ticker,
        direction=direction,
        strength=0.8,
        strategy_sources=["test"],
        timestamp=datetime.now(timezone.utc),
    )

Pytest Fixtures:

@pytest.fixture
def sample_article() -> RawArticle:
    """Return a sample RawArticle mentioning AAPL."""
    return RawArticle(
        source="rss",
        url="https://example.com/aapl-news",
        title="Apple Inc AAPL reports record quarterly earnings",
        content="...",
        published_at=datetime.now(timezone.utc),
        fetched_at=datetime.now(timezone.utc),
        content_hash="test-hash-aapl-001",
    )

@pytest.fixture()
def config() -> ApiGatewayConfig:
    return ApiGatewayConfig(
        jwt_secret_key="test-secret-for-routes",
        database_url="sqlite+aiosqlite:///:memory:",
        redis_url="redis://localhost:6379/0",
    )

Location:

  • Helper functions: Top of test file, marked with _make_ prefix, after docstring and imports
  • Pytest fixtures: After helpers, before test classes/functions, decorated with @pytest.fixture
  • Shared fixtures: In separate test files if reused across multiple tests
  • Integration test fixtures: redis_client (cleanup with delete and close), database fixtures

Coverage

Requirements: Not enforced by default; 246 unit tests pass with zero failures (as of last sprint)

View Coverage:

python -m pytest tests/ --cov=shared --cov=services --cov-report=term-missing
python -m pytest tests/ --cov --cov-report=html  # Generate HTML report

Coverage Statistics (approximate):

  • tests/test_redis_streams.py — 5 tests (complete coverage of StreamPublisher/Consumer)
  • tests/test_models.py — 21 tests (enums, relationships)
  • tests/test_schemas.py — 49 tests (Pydantic schema validation)
  • tests/test_broker.py — 18 tests (AlpacaBroker implementation)
  • tests/test_strategies.py — 24 tests (RSI, EMA, Momentum strategies)
  • tests/test_backtester.py — 13 tests (backtest simulation)
  • tests/services/test_news_fetcher.py — 10 tests (RSS, Reddit, deduplication)
  • tests/services/test_sentiment_analyzer.py — 19 tests (FinBERT, Ollama, tickers)
  • tests/services/test_signal_generator.py — 17 tests (weighted ensemble)
  • tests/services/test_trade_executor.py — 16 tests (RiskManager, order flow)
  • tests/services/test_learning_engine.py — 28 tests (trade evaluation, weight adjustment)
  • tests/services/test_api_auth.py — 13 tests (WebAuthn, JWT)
  • tests/services/test_api_routes.py — 13 tests (endpoint responses)
  • tests/integration/ — 9 integration tests (news pipeline, trading flow)

Test Types

Unit Tests:

  • Scope: Single function, class, or module
  • Strategy: Mock all external dependencies (Redis, DB, API calls, ML models)
  • Location: tests/test_*.py and tests/services/test_*.py
  • Execution: Runs in isolation without services
  • Examples: test_publisher_publishes_json, test_evaluate_profitable_trade

Integration Tests:

  • Scope: Multi-service interaction (e.g., news fetcher → sentiment analyzer pipeline)
  • Strategy: Real Redis streams, real database, mocked ML models and external APIs
  • Location: tests/integration/test_*.py
  • Execution: Requires docker-compose up -d with Redis and PostgreSQL running
  • Marker: @pytest.mark.integration (separate via pytest -m integration)
  • Examples: test_news_pipeline.py (publishes to news:raw, reads from news:scored), test_trading_flow.py

E2E Tests:

  • Not implemented; would require running full docker-compose stack with live Alpaca paper trading
  • Could be added for smoke testing production deployments

Common Patterns

Async Testing:

@pytest.mark.asyncio
async def test_consumer_consume_yields_and_acks() -> None:
    """consume() should yield deserialised data and ACK each message."""
    redis = AsyncMock()
    redis.xgroup_create = AsyncMock()
    redis.xreadgroup = AsyncMock(side_effect=[
        [("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
        KeyboardInterrupt,
    ])

    consumer = StreamConsumer(redis, "test:stream", "grp", "c1")
    results = []
    try:
        async for msg_id, data in consumer.consume():
            results.append((msg_id, data))
    except KeyboardInterrupt:
        pass

    assert len(results) == 1
    assert results[0] == (b"1-0", payload)

Error Testing:

def test_consumer_ensure_group_ignores_existing() -> None:
    """If the group already exists the exception should be swallowed."""
    redis = AsyncMock()
    redis.xgroup_create = AsyncMock(side_effect=Exception("BUSYGROUP"))

    consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1")
    # Should not raise
    await consumer.ensure_group()  # No assertion; test passes if no exception

Parametrized Tests:

  • Not heavily used in current codebase
  • Could be added for testing multiple input scenarios (e.g., different signal directions)
  • Use @pytest.mark.parametrize if needed

Floating-Point Assertions:

assert outcome.realized_pnl == pytest.approx(100.0)  # Allows small differences
assert outcome.roi_pct == pytest.approx(10.0, rel=1e-5)  # With tolerance

Class-Based Test Organization:

class TestEvaluateProfitableTrade:
    """A long trade that gains in price should have positive PnL and ROI."""

    def test_evaluate_profitable_trade(self):
        evaluator = TradeEvaluator()
        outcome = evaluator.evaluate_trade(...)

        assert outcome.realized_pnl == pytest.approx(100.0)
        assert outcome.was_profitable is True

class TestEvaluateLosingTrade:
    """A long trade that drops should have negative PnL."""

    def test_evaluate_losing_trade(self):
        # ... different scenario

Test Configuration

pytest.ini_options (from pyproject.toml):

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
markers = ["integration: marks tests requiring docker services (redis, postgres)"]

Environment:

  • Database URL: Tests use sqlite+aiosqlite:///:memory: for in-memory databases
  • Redis: Tests use redis://localhost:6379/1 (DB 1) for integration tests to avoid conflicts
  • Async mode: "auto" handles all async test discovery automatically

Testing analysis: 2026-02-23