397 lines
14 KiB
Markdown
397 lines
14 KiB
Markdown
# Testing Patterns
|
|
|
|
**Analysis Date:** 2026-02-23
|
|
|
|
## Test Framework
|
|
|
|
**Runner:**
|
|
- pytest 8.0+ (`pytest>=8.0` in `pyproject.toml`)
|
|
- Config: `pyproject.toml` under `[tool.pytest.ini_options]`
|
|
- `asyncio_mode = "auto"` — Automatically handles async test discovery and execution
|
|
- `testpaths = ["tests"]` — Tests located in root-level `tests/` directory
|
|
- Test markers: `integration` marks tests requiring docker services (redis, postgres)
|
|
|
|
**Assertion Library:**
|
|
- Built-in pytest assertions (`assert`, `assert x == y`)
|
|
- `pytest.approx()` for floating-point comparisons (e.g., `assert outcome.realized_pnl == pytest.approx(100.0)`)
|
|
|
|
**Run Commands:**
|
|
```bash
|
|
python -m pytest tests/ -v # Run all tests
|
|
python -m pytest tests/ -v -m "not integration" # Run unit tests only
|
|
python -m pytest tests/ -v -m integration # Run integration tests only (requires docker)
|
|
python -m pytest tests/ --cov # Run with coverage report
|
|
python -m pytest tests/ -x # Stop on first failure
|
|
python -m pytest tests/ -k test_name # Run tests matching pattern
|
|
```
|
|
|
|
**Test Execution:**
|
|
- Async tests automatically discovered and run via `asyncio_mode="auto"`
|
|
- No `@pytest.mark.asyncio` decorator needed (though present in some tests for clarity)
|
|
- Integration tests require `docker-compose up -d` with Redis and PostgreSQL running
|
|
|
|
## Test File Organization
|
|
|
|
**Location:**
|
|
- Co-located tests pattern: Tests in `tests/` directory mirroring `services/` and `shared/` structure
|
|
- Structure:
|
|
```
|
|
tests/
|
|
├── test_redis_streams.py # Tests for shared/redis_streams.py
|
|
├── test_models.py # Tests for shared/models/
|
|
├── test_schemas.py # Tests for shared/schemas/
|
|
├── test_broker.py # Tests for shared/broker/
|
|
├── test_strategies.py # Tests for shared/strategies/
|
|
├── test_backtester.py # Tests for backtester/
|
|
├── services/
|
|
│ ├── test_news_fetcher.py
|
|
│ ├── test_sentiment_analyzer.py
|
|
│ ├── test_signal_generator.py
|
|
│ ├── test_trade_executor.py
|
|
│ ├── test_learning_engine.py
|
|
│ ├── test_api_auth.py
|
|
│ ├── test_api_routes.py
|
|
│ ├── test_market_data.py
|
|
│ └── test_portfolio_sync.py
|
|
└── integration/
|
|
├── test_news_pipeline.py
|
|
└── test_trading_flow.py
|
|
```
|
|
|
|
**Naming:**
|
|
- Test files: `test_{module}.py` (e.g., `test_redis_streams.py`)
|
|
- Test functions: `test_{component}_{scenario}` (e.g., `test_publisher_publishes_json`)
|
|
- Test classes: `Test{Scenario}` (e.g., `TestEvaluateProfitableTrade`)
|
|
- Helper functions: `_make_{object}` (e.g., `_make_config`, `_make_signal`, `_make_trade_id`)
|
|
|
|
## Test Structure
|
|
|
|
**Suite Organization:**
|
|
```python
|
|
# Module docstring describing test scope
|
|
"""Tests for the Redis Streams publish/consume helpers."""
|
|
|
|
# Imports (pytest first, then unittest.mock, then project imports)
|
|
import json
|
|
from unittest.mock import AsyncMock
|
|
import pytest
|
|
from shared.redis_streams import StreamConsumer, StreamPublisher
|
|
|
|
# Fixtures (if any)
|
|
@pytest.fixture
|
|
async def redis_client():
|
|
"""Provide a clean Redis connection and clean up after."""
|
|
client = Redis.from_url(REDIS_URL)
|
|
yield client
|
|
await client.aclose()
|
|
|
|
# Test functions or classes
|
|
@pytest.mark.asyncio
|
|
async def test_publisher_publishes_json():
|
|
"""StreamPublisher should XADD a JSON-serialised payload."""
|
|
redis = AsyncMock()
|
|
# ... test implementation
|
|
|
|
class TestEvaluateProfitableTrade:
|
|
"""A long trade that gains in price should have positive PnL and ROI."""
|
|
|
|
def test_evaluate_profitable_trade(self):
|
|
# ... test implementation
|
|
```
|
|
|
|
**Section Comments:**
|
|
- Use comment separators: `# ---------------------------------------------------------------------------`
|
|
- Group tests by concern: Enums, Fixtures, RSS tests, Reddit tests, Integration tests, etc.
|
|
- Example from `test_models.py`:
|
|
```python
|
|
# ---------------------------------------------------------------------------
|
|
# Enum tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEnums:
|
|
```
|
|
|
|
**Patterns:**
|
|
- **Setup pattern**: Create fixtures as pytest `@pytest.fixture` decorators
|
|
- Can be module-level (reused) or function-level (isolated)
|
|
- Async fixtures use `async def` and `yield` or `yield from`
|
|
- Example:
|
|
```python
|
|
@pytest.fixture
|
|
async def redis_client():
|
|
client = Redis.from_url(REDIS_URL)
|
|
yield client
|
|
await client.aclose()
|
|
```
|
|
|
|
- **Teardown pattern**: Use `yield` in fixtures for cleanup
|
|
- Code after `yield` runs after the test completes
|
|
- Example from `test_news_pipeline.py`:
|
|
```python
|
|
@pytest.fixture
|
|
async def redis_client():
|
|
client = Redis.from_url(REDIS_URL)
|
|
await client.delete(RAW_STREAM, SCORED_STREAM) # Setup
|
|
yield client
|
|
await client.delete(RAW_STREAM, SCORED_STREAM) # Teardown
|
|
await client.aclose()
|
|
```
|
|
|
|
- **Assertion pattern**: Use pytest assertions directly
|
|
- For equality: `assert x == y`
|
|
- For calls: `redis.xadd.assert_called_once_with(...)`
|
|
- For floating point: `assert value == pytest.approx(expected)`
|
|
- Example from `test_redis_streams.py`:
|
|
```python
|
|
redis.xadd.assert_called_once_with(
|
|
"test:stream",
|
|
{"data": json.dumps({"ticker": "AAPL", "score": 0.8})},
|
|
)
|
|
assert msg_id == b"1-0"
|
|
```
|
|
|
|
## Mocking
|
|
|
|
**Framework:** `unittest.mock` (built-in)
|
|
|
|
**Patterns:**
|
|
- AsyncMock for async functions: `AsyncMock(return_value=...)`
|
|
- MagicMock for sync functions and objects: `MagicMock()`
|
|
- SimpleNamespace for lightweight objects: `SimpleNamespace(title=..., score=...)`
|
|
|
|
**Example from `test_redis_streams.py`:**
|
|
```python
|
|
redis = AsyncMock()
|
|
redis.xadd = AsyncMock(return_value=b"1-0")
|
|
|
|
pub = StreamPublisher(redis, "test:stream")
|
|
msg_id = await pub.publish({"ticker": "AAPL"})
|
|
|
|
redis.xadd.assert_called_once_with(...)
|
|
assert msg_id == b"1-0"
|
|
```
|
|
|
|
**Example from `test_news_fetcher.py` (multi-call behavior):**
|
|
```python
|
|
redis.xreadgroup = AsyncMock(
|
|
side_effect=[
|
|
[("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
|
|
KeyboardInterrupt, # Break loop on second call
|
|
]
|
|
)
|
|
```
|
|
|
|
**What to Mock:**
|
|
- External services: Redis, database (use AsyncMock with return values)
|
|
- API calls: HTTP requests, OpenTelemetry counters
|
|
- ML models: FinBERT and Ollama analysis (patch and return synthetic scores)
|
|
- Broker connections: Alpaca API (return fake order results)
|
|
- File I/O and network operations
|
|
|
|
**What NOT to Mock:**
|
|
- Core business logic (RiskManager, TradeEvaluator, WeightAdjuster)
|
|
- Data structures and schemas
|
|
- Internal function calls within a module
|
|
- Time-based operations in unit tests (use fixtures for time-dependent tests)
|
|
|
|
**Patching Example from `test_sentiment_analyzer.py`:**
|
|
```python
|
|
with patch("services.sentiment_analyzer.analyzers.finbert.FinBERTAnalyzer") as mock_finbert:
|
|
mock_instance = AsyncMock()
|
|
mock_instance.analyze = AsyncMock(return_value=(0.8, 0.95))
|
|
mock_finbert.return_value = mock_instance
|
|
# ... run test
|
|
```
|
|
|
|
## Fixtures and Factories
|
|
|
|
**Test Data Patterns:**
|
|
|
|
Helper functions to create test objects:
|
|
```python
|
|
def _make_config(**overrides) -> LearningEngineConfig:
|
|
"""Create a LearningEngineConfig with sensible defaults + overrides."""
|
|
defaults = dict(
|
|
learning_rate=0.1,
|
|
min_trades_before_adjustment=20,
|
|
max_weight_shift_pct=0.10,
|
|
)
|
|
defaults.update(overrides)
|
|
return LearningEngineConfig(**defaults)
|
|
|
|
def _make_signal(
|
|
ticker: str = "AAPL",
|
|
direction: SignalDirection = SignalDirection.LONG,
|
|
) -> TradeSignal:
|
|
return TradeSignal(
|
|
ticker=ticker,
|
|
direction=direction,
|
|
strength=0.8,
|
|
strategy_sources=["test"],
|
|
timestamp=datetime.now(timezone.utc),
|
|
)
|
|
```
|
|
|
|
**Pytest Fixtures:**
|
|
```python
|
|
@pytest.fixture
|
|
def sample_article() -> RawArticle:
|
|
"""Return a sample RawArticle mentioning AAPL."""
|
|
return RawArticle(
|
|
source="rss",
|
|
url="https://example.com/aapl-news",
|
|
title="Apple Inc AAPL reports record quarterly earnings",
|
|
content="...",
|
|
published_at=datetime.now(timezone.utc),
|
|
fetched_at=datetime.now(timezone.utc),
|
|
content_hash="test-hash-aapl-001",
|
|
)
|
|
|
|
@pytest.fixture()
|
|
def config() -> ApiGatewayConfig:
|
|
return ApiGatewayConfig(
|
|
jwt_secret_key="test-secret-for-routes",
|
|
database_url="sqlite+aiosqlite:///:memory:",
|
|
redis_url="redis://localhost:6379/0",
|
|
)
|
|
```
|
|
|
|
**Location:**
|
|
- Helper functions: Top of test file, marked with `_make_` prefix, after docstring and imports
|
|
- Pytest fixtures: After helpers, before test classes/functions, decorated with `@pytest.fixture`
|
|
- Shared fixtures: In separate test files if reused across multiple tests
|
|
- Integration test fixtures: `redis_client` (cleanup with delete and close), database fixtures
|
|
|
|
## Coverage
|
|
|
|
**Requirements:** Not enforced by default; 246 unit tests pass with zero failures (as of last sprint)
|
|
|
|
**View Coverage:**
|
|
```bash
|
|
python -m pytest tests/ --cov=shared --cov=services --cov-report=term-missing
|
|
python -m pytest tests/ --cov --cov-report=html # Generate HTML report
|
|
```
|
|
|
|
**Coverage Statistics (approximate):**
|
|
- `tests/test_redis_streams.py` — 5 tests (complete coverage of StreamPublisher/Consumer)
|
|
- `tests/test_models.py` — 21 tests (enums, relationships)
|
|
- `tests/test_schemas.py` — 49 tests (Pydantic schema validation)
|
|
- `tests/test_broker.py` — 18 tests (AlpacaBroker implementation)
|
|
- `tests/test_strategies.py` — 24 tests (RSI, EMA, Momentum strategies)
|
|
- `tests/test_backtester.py` — 13 tests (backtest simulation)
|
|
- `tests/services/test_news_fetcher.py` — 10 tests (RSS, Reddit, deduplication)
|
|
- `tests/services/test_sentiment_analyzer.py` — 19 tests (FinBERT, Ollama, tickers)
|
|
- `tests/services/test_signal_generator.py` — 17 tests (weighted ensemble)
|
|
- `tests/services/test_trade_executor.py` — 16 tests (RiskManager, order flow)
|
|
- `tests/services/test_learning_engine.py` — 28 tests (trade evaluation, weight adjustment)
|
|
- `tests/services/test_api_auth.py` — 13 tests (WebAuthn, JWT)
|
|
- `tests/services/test_api_routes.py` — 13 tests (endpoint responses)
|
|
- `tests/integration/` — 9 integration tests (news pipeline, trading flow)
|
|
|
|
## Test Types
|
|
|
|
**Unit Tests:**
|
|
- Scope: Single function, class, or module
|
|
- Strategy: Mock all external dependencies (Redis, DB, API calls, ML models)
|
|
- Location: `tests/test_*.py` and `tests/services/test_*.py`
|
|
- Execution: Runs in isolation without services
|
|
- Examples: `test_publisher_publishes_json`, `test_evaluate_profitable_trade`
|
|
|
|
**Integration Tests:**
|
|
- Scope: Multi-service interaction (e.g., news fetcher → sentiment analyzer pipeline)
|
|
- Strategy: Real Redis streams, real database, mocked ML models and external APIs
|
|
- Location: `tests/integration/test_*.py`
|
|
- Execution: Requires `docker-compose up -d` with Redis and PostgreSQL running
|
|
- Marker: `@pytest.mark.integration` (separate via `pytest -m integration`)
|
|
- Examples: `test_news_pipeline.py` (publishes to `news:raw`, reads from `news:scored`), `test_trading_flow.py`
|
|
|
|
**E2E Tests:**
|
|
- Not implemented; would require running full docker-compose stack with live Alpaca paper trading
|
|
- Could be added for smoke testing production deployments
|
|
|
|
## Common Patterns
|
|
|
|
**Async Testing:**
|
|
```python
|
|
@pytest.mark.asyncio
|
|
async def test_consumer_consume_yields_and_acks() -> None:
|
|
"""consume() should yield deserialised data and ACK each message."""
|
|
redis = AsyncMock()
|
|
redis.xgroup_create = AsyncMock()
|
|
redis.xreadgroup = AsyncMock(side_effect=[
|
|
[("test:stream", [(b"1-0", {b"data": json.dumps(payload).encode()})])],
|
|
KeyboardInterrupt,
|
|
])
|
|
|
|
consumer = StreamConsumer(redis, "test:stream", "grp", "c1")
|
|
results = []
|
|
try:
|
|
async for msg_id, data in consumer.consume():
|
|
results.append((msg_id, data))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
|
|
assert len(results) == 1
|
|
assert results[0] == (b"1-0", payload)
|
|
```
|
|
|
|
**Error Testing:**
|
|
```python
|
|
def test_consumer_ensure_group_ignores_existing() -> None:
|
|
"""If the group already exists the exception should be swallowed."""
|
|
redis = AsyncMock()
|
|
redis.xgroup_create = AsyncMock(side_effect=Exception("BUSYGROUP"))
|
|
|
|
consumer = StreamConsumer(redis, "test:stream", "my-group", "worker-1")
|
|
# Should not raise
|
|
await consumer.ensure_group() # No assertion; test passes if no exception
|
|
```
|
|
|
|
**Parametrized Tests:**
|
|
- Not heavily used in current codebase
|
|
- Could be added for testing multiple input scenarios (e.g., different signal directions)
|
|
- Use `@pytest.mark.parametrize` if needed
|
|
|
|
**Floating-Point Assertions:**
|
|
```python
|
|
assert outcome.realized_pnl == pytest.approx(100.0) # Allows small differences
|
|
assert outcome.roi_pct == pytest.approx(10.0, rel=1e-5) # With tolerance
|
|
```
|
|
|
|
**Class-Based Test Organization:**
|
|
```python
|
|
class TestEvaluateProfitableTrade:
|
|
"""A long trade that gains in price should have positive PnL and ROI."""
|
|
|
|
def test_evaluate_profitable_trade(self):
|
|
evaluator = TradeEvaluator()
|
|
outcome = evaluator.evaluate_trade(...)
|
|
|
|
assert outcome.realized_pnl == pytest.approx(100.0)
|
|
assert outcome.was_profitable is True
|
|
|
|
class TestEvaluateLosingTrade:
|
|
"""A long trade that drops should have negative PnL."""
|
|
|
|
def test_evaluate_losing_trade(self):
|
|
# ... different scenario
|
|
```
|
|
|
|
## Test Configuration
|
|
|
|
**pytest.ini_options (from pyproject.toml):**
|
|
```toml
|
|
[tool.pytest.ini_options]
|
|
asyncio_mode = "auto"
|
|
testpaths = ["tests"]
|
|
markers = ["integration: marks tests requiring docker services (redis, postgres)"]
|
|
```
|
|
|
|
**Environment:**
|
|
- Database URL: Tests use `sqlite+aiosqlite:///:memory:` for in-memory databases
|
|
- Redis: Tests use `redis://localhost:6379/1` (DB 1) for integration tests to avoid conflicts
|
|
- Async mode: "auto" handles all async test discovery automatically
|
|
|
|
---
|
|
|
|
*Testing analysis: 2026-02-23*
|