Add throttling detection and circuit breaker for Rightmove scraper
This commit is contained in:
parent
e8293c6042
commit
f880664a98
10 changed files with 1428 additions and 86 deletions
|
|
@ -6,6 +6,8 @@ from typing import Any
|
|||
from config.scraper_config import ScraperConfig
|
||||
from listing_processor import ListingProcessor
|
||||
from rec.query import create_session, listing_query
|
||||
from rec.exceptions import CircuitBreakerOpenError, ThrottlingError
|
||||
from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics
|
||||
from models.listing import QueryParameters
|
||||
from repositories import ListingRepository
|
||||
from tqdm.asyncio import tqdm
|
||||
|
|
@ -40,76 +42,98 @@ async def dump_listings(
|
|||
config = ScraperConfig.from_env()
|
||||
splitter = QuerySplitter(config)
|
||||
|
||||
async with create_session(config) as session:
|
||||
# Phase 1 & 2: Split and probe queries
|
||||
logger.info("Splitting query and probing result counts...")
|
||||
subqueries = await splitter.split(parameters, session)
|
||||
# Reset throttle metrics at start
|
||||
reset_throttle_metrics()
|
||||
|
||||
total_estimated = splitter.calculate_total_estimated_results(subqueries)
|
||||
logger.info(
|
||||
f"Split into {len(subqueries)} subqueries, "
|
||||
f"estimated {total_estimated} total results"
|
||||
)
|
||||
try:
|
||||
async with create_session(config) as session:
|
||||
# Phase 1 & 2: Split and probe queries
|
||||
logger.info("Splitting query and probing result counts...")
|
||||
subqueries = await splitter.split(parameters, session)
|
||||
|
||||
# Phase 3: Fetch all pages for each subquery
|
||||
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
||||
|
||||
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
|
||||
"""Fetch all pages for a single subquery."""
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
estimated = sq.estimated_results or 0
|
||||
if estimated == 0:
|
||||
return results
|
||||
|
||||
page_size = parameters.page_size
|
||||
max_pages = min(
|
||||
config.max_pages_per_query,
|
||||
(estimated // page_size) + 1,
|
||||
total_estimated = splitter.calculate_total_estimated_results(subqueries)
|
||||
logger.info(
|
||||
f"Split into {len(subqueries)} subqueries, "
|
||||
f"estimated {total_estimated} total results"
|
||||
)
|
||||
|
||||
for page_id in range(1, max_pages + 1):
|
||||
async with semaphore:
|
||||
await asyncio.sleep(config.request_delay_ms / 1000)
|
||||
try:
|
||||
result = await listing_query(
|
||||
page=page_id,
|
||||
channel=parameters.listing_type,
|
||||
min_bedrooms=sq.min_bedrooms,
|
||||
max_bedrooms=sq.max_bedrooms,
|
||||
radius=parameters.radius,
|
||||
min_price=sq.min_price,
|
||||
max_price=sq.max_price,
|
||||
district=sq.district,
|
||||
page_size=page_size,
|
||||
max_days_since_added=parameters.max_days_since_added,
|
||||
furnish_types=parameters.furnish_types or [],
|
||||
session=session,
|
||||
)
|
||||
results.append(result)
|
||||
# Phase 3: Fetch all pages for each subquery
|
||||
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
||||
|
||||
properties = result.get("properties", [])
|
||||
if len(properties) < page_size:
|
||||
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
|
||||
"""Fetch all pages for a single subquery."""
|
||||
results: list[dict[str, Any]] = []
|
||||
|
||||
estimated = sq.estimated_results or 0
|
||||
if estimated == 0:
|
||||
return results
|
||||
|
||||
page_size = parameters.page_size
|
||||
max_pages = min(
|
||||
config.max_pages_per_query,
|
||||
(estimated // page_size) + 1,
|
||||
)
|
||||
|
||||
for page_id in range(1, max_pages + 1):
|
||||
async with semaphore:
|
||||
await asyncio.sleep(config.request_delay_ms / 1000)
|
||||
try:
|
||||
result = await listing_query(
|
||||
page=page_id,
|
||||
channel=parameters.listing_type,
|
||||
min_bedrooms=sq.min_bedrooms,
|
||||
max_bedrooms=sq.max_bedrooms,
|
||||
radius=parameters.radius,
|
||||
min_price=sq.min_price,
|
||||
max_price=sq.max_price,
|
||||
district=sq.district,
|
||||
page_size=page_size,
|
||||
max_days_since_added=parameters.max_days_since_added,
|
||||
furnish_types=parameters.furnish_types or [],
|
||||
session=session,
|
||||
config=config,
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
properties = result.get("properties", [])
|
||||
if len(properties) < page_size:
|
||||
break
|
||||
|
||||
except CircuitBreakerOpenError as e:
|
||||
logger.error(f"Circuit breaker open: {e}")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
if "GENERIC_ERROR" in str(e):
|
||||
logger.debug(
|
||||
f"Max page for {sq.district}: {page_id - 1}"
|
||||
except ThrottlingError as e:
|
||||
logger.warning(
|
||||
f"Throttling error on page {page_id} for {sq.district}: {e}"
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
if "GENERIC_ERROR" in str(e):
|
||||
logger.debug(
|
||||
f"Max page for {sq.district}: {page_id - 1}"
|
||||
)
|
||||
break
|
||||
logger.warning(
|
||||
f"Error fetching page {page_id} for {sq.district}: {e}"
|
||||
)
|
||||
break
|
||||
logger.warning(
|
||||
f"Error fetching page {page_id} for {sq.district}: {e}"
|
||||
)
|
||||
break
|
||||
|
||||
return results
|
||||
return results
|
||||
|
||||
# Fetch all subqueries with progress bar
|
||||
all_results = await tqdm.gather(
|
||||
*[fetch_subquery(sq) for sq in subqueries],
|
||||
desc="Fetching listings",
|
||||
)
|
||||
# Fetch all subqueries with progress bar
|
||||
all_results = await tqdm.gather(
|
||||
*[fetch_subquery(sq) for sq in subqueries],
|
||||
desc="Fetching listings",
|
||||
)
|
||||
except CircuitBreakerOpenError as e:
|
||||
logger.error(f"Circuit breaker prevented listing fetch: {e}")
|
||||
logger.info(get_throttle_metrics().summary())
|
||||
return []
|
||||
finally:
|
||||
# Log throttle metrics at end
|
||||
metrics = get_throttle_metrics()
|
||||
if metrics.total_requests > 0:
|
||||
logger.info("\n" + metrics.summary())
|
||||
|
||||
# Extract listing identifiers from results
|
||||
listing_ids: list[int] = []
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import aiohttp
|
|||
from config.scraper_config import ScraperConfig
|
||||
from models.listing import ListingType, QueryParameters
|
||||
from rec.districts import get_districts
|
||||
from rec.exceptions import CircuitBreakerOpenError, ThrottlingError
|
||||
|
||||
logger = logging.getLogger("uvicorn.error")
|
||||
|
||||
|
|
@ -113,6 +114,9 @@ class QuerySplitter:
|
|||
|
||||
Returns:
|
||||
Total available results for this subquery.
|
||||
|
||||
Raises:
|
||||
CircuitBreakerOpenError: If the circuit breaker is open.
|
||||
"""
|
||||
from rec.query import probe_query
|
||||
|
||||
|
|
@ -128,8 +132,17 @@ class QuerySplitter:
|
|||
district=subquery.district,
|
||||
max_days_since_added=parameters.max_days_since_added,
|
||||
furnish_types=parameters.furnish_types or [],
|
||||
config=self.config,
|
||||
)
|
||||
return result.get("totalAvailableResults", 0)
|
||||
except CircuitBreakerOpenError:
|
||||
logger.error("Circuit breaker is open, stopping probe operations")
|
||||
raise
|
||||
except ThrottlingError as e:
|
||||
logger.warning(
|
||||
f"Throttling detected during probe for {subquery.district}: {e}"
|
||||
)
|
||||
return 0
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to probe subquery {subquery}: {e}")
|
||||
return 0
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue