14 KiB
Testing Patterns
Analysis Date: 2026-02-23
Test Framework
Runner:
- pytest 8.0+ (
pytest>=8.0inpyproject.toml) - Config:
pyproject.tomlunder[tool.pytest.ini_options]asyncio_mode = "auto"— Automatically handles async test discovery and executiontestpaths = ["tests"]— Tests located in root-leveltests/directory- Test markers:
integrationmarks tests requiring docker services (redis, postgres)
Assertion Library:
- Built-in pytest assertions (
assert,assert x == y) pytest.approx()for floating-point comparisons (e.g.,assert outcome.realized_pnl == pytest.approx(100.0))
Run Commands:
python -m pytest tests/ -v # Run all tests
python -m pytest tests/ -v -m "not integration" # Run unit tests only
python -m pytest tests/ -v -m integration # Run integration tests only (requires docker)
python -m pytest tests/ --cov # Run with coverage report
python -m pytest tests/ -x # Stop on first failure
python -m pytest tests/ -k test_name # Run tests matching pattern
Test Execution:
- Async tests automatically discovered and run via
asyncio_mode="auto" - No
@pytest.mark.asynciodecorator needed (though present in some tests for clarity) - Integration tests require
docker-compose up -dwith Redis and PostgreSQL running
Test File Organization
Location:
- Co-located tests pattern: Tests in
tests/directory mirroringservices/andshared/structure - Structure:
tests/ ├── test_redis_streams.py # Tests for shared/redis_streams.py ├── test_models.py # Tests for shared/models/ ├── test_schemas.py # Tests for shared/schemas/ ├── test_broker.py # Tests for shared/broker/ ├── test_strategies.py # Tests for shared/strategies/ ├── test_backtester.py # Tests for backtester/ ├── services/ │ ├── test_news_fetcher.py │ ├── test_sentiment_analyzer.py │ ├── test_signal_generator.py │ ├── test_trade_executor.py │ ├── test_learning_engine.py │ ├── test_api_auth.py │ ├── test_api_routes.py │ ├── test_market_data.py │ └── test_portfolio_sync.py └── integration/ ├── test_news_pipeline.py └── test_trading_flow.py
Naming:
- Test files:
test_{module}.py(e.g.,test_redis_streams.py) - Test functions:
test_{component}_{scenario}(e.g.,test_publisher_publishes_json) - Test classes:
Test{Scenario}(e.g.,TestEvaluateProfitableTrade) - Helper functions:
_make_{object}(e.g.,_make_config,_make_signal,_make_trade_id)
Test Structure
Suite Organization:
# Module docstring describing test scope
"""Tests for the Redis Streams publish/consume helpers."""
# Imports (pytest first, then unittest.mock, then project imports)
import json
from unittest.mock import AsyncMock
import pytest
from shared.redis_streams import StreamConsumer, StreamPublisher
# Fixtures (if any)
@pytest.fixture
async def redis_client():
"""Provide a clean Redis connection and clean up after."""
client = Redis.from_url(REDIS_URL)
yield client
await client.aclose()
# Test functions or classes
@pytest.mark.asyncio
async def test_publisher_publishes_json():
"""StreamPublisher should XADD a JSON-serialised payload."""
redis = AsyncMock()
# ... test implementation
class TestEvaluateProfitableTrade:
"""A long trade that gains in price should have positive PnL and ROI."""
def test_evaluate_profitable_trade(self):
# ... test implementation
Section Comments:
- Use comment separators:
# --------------------------------------------------------------------------- - Group tests by concern: Enums, Fixtures, RSS tests, Reddit tests, Integration tests, etc.
- Example from
test_models.py:# --------------------------------------------------------------------------- # Enum tests # --------------------------------------------------------------------------- class TestEnums:
Patterns:
-
Setup pattern: Create fixtures as pytest
@pytest.fixturedecorators- Can be module-level (reused) or function-level (isolated)
- Async fixtures use
async defandyieldoryield from - Example:
@pytest.fixture async def redis_client(): client = Redis.from_url(REDIS_URL) yield client await client.aclose()
-
Teardown pattern: Use
yieldin fixtures for cleanup- Code after
yieldruns after the test completes - Example from
test_news_pipeline.py:@pytest.fixture async def redis_client(): client = Redis.from_url(REDIS_URL) await client.delete(RAW_STREAM, SCORED_STREAM) # Setup yield client await client.delete(RAW_STREAM, SCORED_STREAM) # Teardown await client.aclose()
- Code after
-
Assertion pattern: Use pytest assertions directly
- For equality:
assert x == y - For calls:
redis.xadd.assert_called_once_with(...) - For floating point:
assert value == pytest.approx(expected) - Example from
test_redis_streams.py:redis.xadd.assert_called_once_with( "test:stream", {"data": json.dumps({"ticker": "AAPL", "score": 0.8})}, ) assert msg_id == b"1-0"
- For equality:
Mocking
Framework: unittest.mock (built-in)
Patterns:
- AsyncMock for async functions:
AsyncMock(return_value=...) - MagicMock for sync functions and objects:
MagicMock() - SimpleNamespace for lightweight objects:
SimpleNamespace(title=..., score=...)
Example from test_redis_streams.py:
redis = AsyncMock()
redis.xadd = AsyncMock(return_value=b"1-0")
pub = StreamPublisher(redis, "test:stream")
msg_id = await pub.publish({"ticker": "AAPL"})
redis.xadd.assert_called_once_with(...)
assert msg_id == b"1-0"
Example from test_news_fetcher.py (multi-call behavior):
redis.xreadgroup = AsyncMock(
side_effect=[
[("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
KeyboardInterrupt, # Break loop on second call
]
)
What to Mock:
- External services: Redis, database (use AsyncMock with return values)
- API calls: HTTP requests, OpenTelemetry counters
- ML models: FinBERT and Ollama analysis (patch and return synthetic scores)
- Broker connections: Alpaca API (return fake order results)
- File I/O and network operations
What NOT to Mock:
- Core business logic (RiskManager, TradeEvaluator, WeightAdjuster)
- Data structures and schemas
- Internal function calls within a module
- Time-based operations in unit tests (use fixtures for time-dependent tests)
Patching Example from test_sentiment_analyzer.py:
with patch("services.sentiment_analyzer.analyzers.finbert.FinBERTAnalyzer") as mock_finbert:
mock_instance = AsyncMock()
mock_instance.analyze = AsyncMock(return_value=(0.8, 0.95))
mock_finbert.return_value = mock_instance
# ... run test
Fixtures and Factories
Test Data Patterns:
Helper functions to create test objects:
def _make_config(**overrides) -> LearningEngineConfig:
"""Create a LearningEngineConfig with sensible defaults + overrides."""
defaults = dict(
learning_rate=0.1,
min_trades_before_adjustment=20,
max_weight_shift_pct=0.10,
)
defaults.update(overrides)
return LearningEngineConfig(**defaults)
def _make_signal(
ticker: str = "AAPL",
direction: SignalDirection = SignalDirection.LONG,
) -> TradeSignal:
return TradeSignal(
ticker=ticker,
direction=direction,
strength=0.8,
strategy_sources=["test"],
timestamp=datetime.now(timezone.utc),
)
Pytest Fixtures:
@pytest.fixture
def sample_article() -> RawArticle:
"""Return a sample RawArticle mentioning AAPL."""
return RawArticle(
source="rss",
url="https://example.com/aapl-news",
title="Apple Inc AAPL reports record quarterly earnings",
content="...",
published_at=datetime.now(timezone.utc),
fetched_at=datetime.now(timezone.utc),
content_hash="test-hash-aapl-001",
)
@pytest.fixture()
def config() -> ApiGatewayConfig:
return ApiGatewayConfig(
jwt_secret_key="test-secret-for-routes",
database_url="sqlite+aiosqlite:///:memory:",
redis_url="redis://localhost:6379/0",
)
Location:
- Helper functions: Top of test file, marked with
_make_prefix, after docstring and imports - Pytest fixtures: After helpers, before test classes/functions, decorated with
@pytest.fixture - Shared fixtures: In separate test files if reused across multiple tests
- Integration test fixtures:
redis_client(cleanup with delete and close), database fixtures
Coverage
Requirements: Not enforced by default; 246 unit tests pass with zero failures (as of last sprint)
View Coverage:
python -m pytest tests/ --cov=shared --cov=services --cov-report=term-missing
python -m pytest tests/ --cov --cov-report=html # Generate HTML report
Coverage Statistics (approximate):
tests/test_redis_streams.py— 5 tests (complete coverage of StreamPublisher/Consumer)tests/test_models.py— 21 tests (enums, relationships)tests/test_schemas.py— 49 tests (Pydantic schema validation)tests/test_broker.py— 18 tests (AlpacaBroker implementation)tests/test_strategies.py— 24 tests (RSI, EMA, Momentum strategies)tests/test_backtester.py— 13 tests (backtest simulation)tests/services/test_news_fetcher.py— 10 tests (RSS, Reddit, deduplication)tests/services/test_sentiment_analyzer.py— 19 tests (FinBERT, Ollama, tickers)tests/services/test_signal_generator.py— 17 tests (weighted ensemble)tests/services/test_trade_executor.py— 16 tests (RiskManager, order flow)tests/services/test_learning_engine.py— 28 tests (trade evaluation, weight adjustment)tests/services/test_api_auth.py— 13 tests (WebAuthn, JWT)tests/services/test_api_routes.py— 13 tests (endpoint responses)tests/integration/— 9 integration tests (news pipeline, trading flow)
Test Types
Unit Tests:
- Scope: Single function, class, or module
- Strategy: Mock all external dependencies (Redis, DB, API calls, ML models)
- Location:
tests/test_*.pyandtests/services/test_*.py - Execution: Runs in isolation without services
- Examples:
test_publisher_publishes_json,test_evaluate_profitable_trade
Integration Tests:
- Scope: Multi-service interaction (e.g., news fetcher → sentiment analyzer pipeline)
- Strategy: Real Redis streams, real database, mocked ML models and external APIs
- Location:
tests/integration/test_*.py - Execution: Requires
docker-compose up -dwith Redis and PostgreSQL running - Marker:
@pytest.mark.integration(separate viapytest -m integration) - Examples:
test_news_pipeline.py(publishes tonews:raw, reads fromnews:scored),test_trading_flow.py
E2E Tests:
- Not implemented; would require running full docker-compose stack with live Alpaca paper trading
- Could be added for smoke testing production deployments
Common Patterns
Async Testing:
@pytest.mark.asyncio
async def test_consumer_consume_yields_and_acks() -> None:
"""consume() should yield deserialised data and ACK each message."""
redis = AsyncMock()
redis.xgroup_create = AsyncMock()
redis.xreadgroup = AsyncMock(side_effect=[
[("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
KeyboardInterrupt,
])
consumer = StreamConsumer(redis, "test:stream", "grp", "c1")
results = []
try:
async for msg_id, data in consumer.consume():
results.append((msg_id, data))
except KeyboardInterrupt:
pass
assert len(results) == 1
assert results[0] == (b"1-0", payload)
Error Testing:
def test_consumer_ensure_group_ignores_existing() -> None:
"""If the group already exists the exception should be swallowed."""
redis = AsyncMock()
redis.xgroup_create = AsyncMock(side_effect=Exception("BUSYGROUP"))
consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1")
# Should not raise
await consumer.ensure_group() # No assertion; test passes if no exception
Parametrized Tests:
- Not heavily used in current codebase
- Could be added for testing multiple input scenarios (e.g., different signal directions)
- Use
@pytest.mark.parametrizeif needed
Floating-Point Assertions:
assert outcome.realized_pnl == pytest.approx(100.0) # Allows small differences
assert outcome.roi_pct == pytest.approx(10.0, rel=1e-5) # With tolerance
Class-Based Test Organization:
class TestEvaluateProfitableTrade:
"""A long trade that gains in price should have positive PnL and ROI."""
def test_evaluate_profitable_trade(self):
evaluator = TradeEvaluator()
outcome = evaluator.evaluate_trade(...)
assert outcome.realized_pnl == pytest.approx(100.0)
assert outcome.was_profitable is True
class TestEvaluateLosingTrade:
"""A long trade that drops should have negative PnL."""
def test_evaluate_losing_trade(self):
# ... different scenario
Test Configuration
pytest.ini_options (from pyproject.toml):
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
markers = ["integration: marks tests requiring docker services (redis, postgres)"]
Environment:
- Database URL: Tests use
sqlite+aiosqlite:///:memory:for in-memory databases - Redis: Tests use
redis://localhost:6379/1(DB 1) for integration tests to avoid conflicts - Async mode: "auto" handles all async test discovery automatically
Testing analysis: 2026-02-23