311 lines
10 KiB
Python
311 lines
10 KiB
Python
"""Integration tests for throttle detection and circuit breaker."""
|
|
import asyncio
|
|
import pytest
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from aiohttp import ClientResponse
|
|
|
|
from config.scraper_config import ScraperConfig
|
|
from rec.exceptions import (
|
|
CircuitBreakerOpenError,
|
|
RateLimitError,
|
|
ServiceUnavailableError,
|
|
ThrottlingError,
|
|
)
|
|
from rec.query import (
|
|
detail_query,
|
|
listing_query,
|
|
probe_query,
|
|
get_circuit_breaker,
|
|
reset_circuit_breaker,
|
|
)
|
|
from rec.throttle_detector import reset_throttle_metrics, get_throttle_metrics
|
|
from rec.circuit_breaker import CircuitBreaker, CircuitState
|
|
from models.listing import ListingType
|
|
|
|
|
|
@pytest.fixture
|
|
def config() -> ScraperConfig:
|
|
"""Create a test configuration."""
|
|
return ScraperConfig(
|
|
max_concurrent_requests=5,
|
|
request_delay_ms=10,
|
|
slow_response_threshold=2.0,
|
|
enable_circuit_breaker=True,
|
|
circuit_breaker_failure_threshold=3,
|
|
circuit_breaker_recovery_timeout=0.5,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_globals() -> None:
|
|
"""Reset global state before each test."""
|
|
reset_throttle_metrics()
|
|
reset_circuit_breaker()
|
|
|
|
|
|
class MockResponse:
|
|
"""Mock aiohttp response."""
|
|
|
|
def __init__(
|
|
self,
|
|
status: int = 200,
|
|
json_data: dict | None = None,
|
|
text: str = "",
|
|
):
|
|
self.status = status
|
|
self._json_data = json_data or {}
|
|
self._text = text
|
|
|
|
async def json(self) -> dict:
|
|
return self._json_data
|
|
|
|
async def text(self) -> str:
|
|
return self._text
|
|
|
|
async def __aenter__(self) -> "MockResponse":
|
|
return self
|
|
|
|
async def __aexit__(self, *args: object) -> None:
|
|
pass
|
|
|
|
|
|
class TestThrottlingRetryBehavior:
|
|
"""Test retry behavior for throttling errors."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rate_limit_triggers_retry(self, config: ScraperConfig) -> None:
|
|
"""Test that 429 responses trigger retry with backoff."""
|
|
call_count = 0
|
|
|
|
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count < 3:
|
|
return MockResponse(status=429)
|
|
return MockResponse(
|
|
status=200,
|
|
json_data={"totalAvailableResults": 10, "properties": []},
|
|
)
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = mock_get
|
|
|
|
# Mock district lookup
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
# The retry decorator will catch RateLimitError and retry
|
|
# We need to patch the tenacity wait to speed up the test
|
|
with patch("tenacity.wait_exponential.__call__", return_value=0):
|
|
result = await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
assert result["totalAvailableResults"] == 10
|
|
assert call_count == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_unavailable_triggers_retry(
|
|
self, config: ScraperConfig
|
|
) -> None:
|
|
"""Test that 503 responses trigger retry."""
|
|
call_count = 0
|
|
|
|
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count < 2:
|
|
return MockResponse(status=503)
|
|
return MockResponse(
|
|
status=200,
|
|
json_data={"totalAvailableResults": 5, "properties": []},
|
|
)
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = mock_get
|
|
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
with patch("tenacity.wait_exponential.__call__", return_value=0):
|
|
result = await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
assert call_count == 2
|
|
|
|
|
|
class TestCircuitBreakerIntegration:
|
|
"""Test circuit breaker integration with queries."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_circuit_breaker_opens_after_failures(
|
|
self, config: ScraperConfig
|
|
) -> None:
|
|
"""Test that circuit breaker opens after consecutive failures."""
|
|
call_count = 0
|
|
|
|
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
|
|
nonlocal call_count
|
|
call_count += 1
|
|
return MockResponse(status=429)
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = mock_get
|
|
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
# After enough failures, circuit should open
|
|
with pytest.raises((RateLimitError, CircuitBreakerOpenError)):
|
|
with patch("tenacity.wait_exponential.__call__", return_value=0):
|
|
await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
# Check circuit breaker state
|
|
cb = get_circuit_breaker(config)
|
|
assert cb is not None
|
|
# After many failures, the circuit should be open
|
|
assert cb.failure_count >= config.circuit_breaker_failure_threshold
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_circuit_breaker_blocks_requests_when_open(
|
|
self, config: ScraperConfig
|
|
) -> None:
|
|
"""Test that open circuit breaker blocks requests immediately."""
|
|
# Force open the circuit breaker
|
|
cb = get_circuit_breaker(config)
|
|
assert cb is not None
|
|
for _ in range(config.circuit_breaker_failure_threshold):
|
|
cb.record_failure()
|
|
|
|
assert cb.is_open
|
|
|
|
mock_session = MagicMock()
|
|
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
with pytest.raises(CircuitBreakerOpenError):
|
|
await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
|
|
class TestMetricsTracking:
|
|
"""Test throttle metrics are properly tracked."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metrics_tracked_on_rate_limit(self, config: ScraperConfig) -> None:
|
|
"""Test that rate limit errors are tracked in metrics."""
|
|
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
|
|
return MockResponse(status=429)
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = mock_get
|
|
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
with pytest.raises(RateLimitError):
|
|
with patch("tenacity.wait_exponential.__call__", return_value=0):
|
|
await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
metrics = get_throttle_metrics()
|
|
assert metrics.rate_limit_count > 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_metrics_tracked_on_success(self, config: ScraperConfig) -> None:
|
|
"""Test that successful requests are tracked in metrics."""
|
|
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
|
|
return MockResponse(
|
|
status=200,
|
|
json_data={"totalAvailableResults": 10, "properties": []},
|
|
)
|
|
|
|
mock_session = MagicMock()
|
|
mock_session.get = mock_get
|
|
|
|
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
|
|
await probe_query(
|
|
session=mock_session,
|
|
channel=ListingType.RENT,
|
|
min_bedrooms=1,
|
|
max_bedrooms=2,
|
|
radius=1.0,
|
|
min_price=1000,
|
|
max_price=2000,
|
|
district="Test",
|
|
config=config,
|
|
)
|
|
|
|
metrics = get_throttle_metrics()
|
|
assert metrics.total_requests == 1
|
|
assert metrics.total_throttling_events == 0
|
|
|
|
|
|
class TestConfigIntegration:
|
|
"""Test configuration integration."""
|
|
|
|
def test_config_from_env_includes_throttle_settings(self) -> None:
|
|
"""Test that config loads throttle settings from environment."""
|
|
import os
|
|
|
|
original_env = os.environ.copy()
|
|
try:
|
|
os.environ["RIGHTMOVE_SLOW_RESPONSE_THRESHOLD"] = "5.0"
|
|
os.environ["RIGHTMOVE_ENABLE_CIRCUIT_BREAKER"] = "false"
|
|
os.environ["RIGHTMOVE_CIRCUIT_BREAKER_FAILURES"] = "10"
|
|
os.environ["RIGHTMOVE_CIRCUIT_BREAKER_TIMEOUT"] = "120.0"
|
|
|
|
config = ScraperConfig.from_env()
|
|
|
|
assert config.slow_response_threshold == 5.0
|
|
assert config.enable_circuit_breaker is False
|
|
assert config.circuit_breaker_failure_threshold == 10
|
|
assert config.circuit_breaker_recovery_timeout == 120.0
|
|
finally:
|
|
os.environ.clear()
|
|
os.environ.update(original_env)
|
|
|
|
def test_circuit_breaker_disabled_returns_none(self) -> None:
|
|
"""Test that disabled circuit breaker returns None."""
|
|
config = ScraperConfig(
|
|
enable_circuit_breaker=False,
|
|
)
|
|
reset_circuit_breaker()
|
|
cb = get_circuit_breaker(config)
|
|
assert cb is None
|