wrongmove/crawler/tests/integration/test_throttle_integration.py

312 lines
10 KiB
Python
Raw Permalink Normal View History

"""Integration tests for throttle detection and circuit breaker."""
import asyncio
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from aiohttp import ClientResponse
from config.scraper_config import ScraperConfig
from rec.exceptions import (
CircuitBreakerOpenError,
RateLimitError,
ServiceUnavailableError,
ThrottlingError,
)
from rec.query import (
detail_query,
listing_query,
probe_query,
get_circuit_breaker,
reset_circuit_breaker,
)
from rec.throttle_detector import reset_throttle_metrics, get_throttle_metrics
from rec.circuit_breaker import CircuitBreaker, CircuitState
from models.listing import ListingType
@pytest.fixture
def config() -> ScraperConfig:
"""Create a test configuration."""
return ScraperConfig(
max_concurrent_requests=5,
request_delay_ms=10,
slow_response_threshold=2.0,
enable_circuit_breaker=True,
circuit_breaker_failure_threshold=3,
circuit_breaker_recovery_timeout=0.5,
)
@pytest.fixture(autouse=True)
def reset_globals() -> None:
"""Reset global state before each test."""
reset_throttle_metrics()
reset_circuit_breaker()
class MockResponse:
"""Mock aiohttp response."""
def __init__(
self,
status: int = 200,
json_data: dict | None = None,
text: str = "",
):
self.status = status
self._json_data = json_data or {}
self._text = text
async def json(self) -> dict:
return self._json_data
async def text(self) -> str:
return self._text
async def __aenter__(self) -> "MockResponse":
return self
async def __aexit__(self, *args: object) -> None:
pass
class TestThrottlingRetryBehavior:
"""Test retry behavior for throttling errors."""
@pytest.mark.asyncio
async def test_rate_limit_triggers_retry(self, config: ScraperConfig) -> None:
"""Test that 429 responses trigger retry with backoff."""
call_count = 0
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
nonlocal call_count
call_count += 1
if call_count < 3:
return MockResponse(status=429)
return MockResponse(
status=200,
json_data={"totalAvailableResults": 10, "properties": []},
)
mock_session = MagicMock()
mock_session.get = mock_get
# Mock district lookup
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
# The retry decorator will catch RateLimitError and retry
# We need to patch the tenacity wait to speed up the test
with patch("tenacity.wait_exponential.__call__", return_value=0):
result = await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
assert result["totalAvailableResults"] == 10
assert call_count == 3
@pytest.mark.asyncio
async def test_service_unavailable_triggers_retry(
self, config: ScraperConfig
) -> None:
"""Test that 503 responses trigger retry."""
call_count = 0
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
nonlocal call_count
call_count += 1
if call_count < 2:
return MockResponse(status=503)
return MockResponse(
status=200,
json_data={"totalAvailableResults": 5, "properties": []},
)
mock_session = MagicMock()
mock_session.get = mock_get
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
with patch("tenacity.wait_exponential.__call__", return_value=0):
result = await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
assert call_count == 2
class TestCircuitBreakerIntegration:
"""Test circuit breaker integration with queries."""
@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_failures(
self, config: ScraperConfig
) -> None:
"""Test that circuit breaker opens after consecutive failures."""
call_count = 0
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
nonlocal call_count
call_count += 1
return MockResponse(status=429)
mock_session = MagicMock()
mock_session.get = mock_get
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
# After enough failures, circuit should open
with pytest.raises((RateLimitError, CircuitBreakerOpenError)):
with patch("tenacity.wait_exponential.__call__", return_value=0):
await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
# Check circuit breaker state
cb = get_circuit_breaker(config)
assert cb is not None
# After many failures, the circuit should be open
assert cb.failure_count >= config.circuit_breaker_failure_threshold
@pytest.mark.asyncio
async def test_circuit_breaker_blocks_requests_when_open(
self, config: ScraperConfig
) -> None:
"""Test that open circuit breaker blocks requests immediately."""
# Force open the circuit breaker
cb = get_circuit_breaker(config)
assert cb is not None
for _ in range(config.circuit_breaker_failure_threshold):
cb.record_failure()
assert cb.is_open
mock_session = MagicMock()
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
with pytest.raises(CircuitBreakerOpenError):
await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
class TestMetricsTracking:
"""Test throttle metrics are properly tracked."""
@pytest.mark.asyncio
async def test_metrics_tracked_on_rate_limit(self, config: ScraperConfig) -> None:
"""Test that rate limit errors are tracked in metrics."""
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
return MockResponse(status=429)
mock_session = MagicMock()
mock_session.get = mock_get
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
with pytest.raises(RateLimitError):
with patch("tenacity.wait_exponential.__call__", return_value=0):
await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
metrics = get_throttle_metrics()
assert metrics.rate_limit_count > 0
@pytest.mark.asyncio
async def test_metrics_tracked_on_success(self, config: ScraperConfig) -> None:
"""Test that successful requests are tracked in metrics."""
async def mock_get(*args: object, **kwargs: object) -> MockResponse:
return MockResponse(
status=200,
json_data={"totalAvailableResults": 10, "properties": []},
)
mock_session = MagicMock()
mock_session.get = mock_get
with patch("rec.query.districts.get_districts", return_value={"Test": "LOC1"}):
await probe_query(
session=mock_session,
channel=ListingType.RENT,
min_bedrooms=1,
max_bedrooms=2,
radius=1.0,
min_price=1000,
max_price=2000,
district="Test",
config=config,
)
metrics = get_throttle_metrics()
assert metrics.total_requests == 1
assert metrics.total_throttling_events == 0
class TestConfigIntegration:
"""Test configuration integration."""
def test_config_from_env_includes_throttle_settings(self) -> None:
"""Test that config loads throttle settings from environment."""
import os
original_env = os.environ.copy()
try:
os.environ["RIGHTMOVE_SLOW_RESPONSE_THRESHOLD"] = "5.0"
os.environ["RIGHTMOVE_ENABLE_CIRCUIT_BREAKER"] = "false"
os.environ["RIGHTMOVE_CIRCUIT_BREAKER_FAILURES"] = "10"
os.environ["RIGHTMOVE_CIRCUIT_BREAKER_TIMEOUT"] = "120.0"
config = ScraperConfig.from_env()
assert config.slow_response_threshold == 5.0
assert config.enable_circuit_breaker is False
assert config.circuit_breaker_failure_threshold == 10
assert config.circuit_breaker_recovery_timeout == 120.0
finally:
os.environ.clear()
os.environ.update(original_env)
def test_circuit_breaker_disabled_returns_none(self) -> None:
"""Test that disabled circuit breaker returns None."""
config = ScraperConfig(
enable_circuit_breaker=False,
)
reset_circuit_breaker()
cb = get_circuit_breaker(config)
assert cb is None