- Add phase-aware progress reporting across all crawl phases (splitting, fetching, filtering, processing) with per-step counters - Add TaskProgressDrawer component with phase timeline stepper, detail counters, progress bar with ETA, and live worker log viewer - Add on_step_complete callback to ListingProcessor for granular tracking of details/images/OCR steps - Extend QuerySplitter on_progress callback with structured counter data - Capture celery worker logs via ring buffer handler and inject into task state updates for frontend display - Guard taskResult updates with phase presence check to prevent drawer from blanking during state transitions
335 lines
11 KiB
Python
335 lines
11 KiB
Python
"""Query splitting service for handling Rightmove's result cap.
|
|
|
|
This module provides intelligent query splitting to work around Rightmove's
|
|
~1,500 listing cap per search. It adaptively splits queries by price bands
|
|
based on actual result counts.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, replace
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from config.scraper_config import ScraperConfig
|
|
from models.listing import ListingType, QueryParameters
|
|
from rec.districts import get_districts
|
|
from rec.exceptions import CircuitBreakerOpenError, ThrottlingError
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
|
|
@dataclass
|
|
class SubQuery:
|
|
"""Represents a single query subdivision.
|
|
|
|
Attributes:
|
|
district: District identifier string.
|
|
min_bedrooms: Minimum number of bedrooms.
|
|
max_bedrooms: Maximum number of bedrooms.
|
|
min_price: Minimum price in currency units.
|
|
max_price: Maximum price in currency units.
|
|
estimated_results: Cached result count from probing (None if not probed).
|
|
"""
|
|
|
|
district: str
|
|
min_bedrooms: int
|
|
max_bedrooms: int
|
|
min_price: int
|
|
max_price: int
|
|
estimated_results: int | None = None
|
|
|
|
@property
|
|
def price_range(self) -> int:
|
|
"""Returns the width of the price band."""
|
|
return self.max_price - self.min_price
|
|
|
|
|
|
class QuerySplitter:
|
|
"""Splits large queries into smaller subqueries to avoid result caps.
|
|
|
|
Uses adaptive binary search on price ranges to find optimal subdivisions
|
|
that keep each subquery under the result threshold.
|
|
"""
|
|
|
|
def __init__(self, config: ScraperConfig | None = None) -> None:
|
|
"""Initialize the splitter with configuration.
|
|
|
|
Args:
|
|
config: Scraper configuration. Loads from environment if not provided.
|
|
"""
|
|
self.config = config or ScraperConfig.from_env()
|
|
|
|
def create_initial_subqueries(
|
|
self,
|
|
parameters: QueryParameters,
|
|
districts: dict[str, str],
|
|
) -> list[SubQuery]:
|
|
"""Create initial subqueries by splitting on district and bedrooms.
|
|
|
|
This creates the initial split before probing for result counts.
|
|
Each bedroom count gets its own subquery to enable finer-grained splitting.
|
|
|
|
Args:
|
|
parameters: Original query parameters.
|
|
districts: Dictionary of district name to location ID.
|
|
|
|
Returns:
|
|
List of initial SubQuery objects.
|
|
"""
|
|
subqueries: list[SubQuery] = []
|
|
|
|
for district in districts.keys():
|
|
for num_bedrooms in range(
|
|
parameters.min_bedrooms, parameters.max_bedrooms + 1
|
|
):
|
|
subqueries.append(
|
|
SubQuery(
|
|
district=district,
|
|
min_bedrooms=num_bedrooms,
|
|
max_bedrooms=num_bedrooms,
|
|
min_price=parameters.min_price,
|
|
max_price=parameters.max_price,
|
|
)
|
|
)
|
|
|
|
return subqueries
|
|
|
|
async def probe_result_count(
|
|
self,
|
|
subquery: SubQuery,
|
|
session: aiohttp.ClientSession,
|
|
parameters: QueryParameters,
|
|
) -> int:
|
|
"""Probe the API to get the total result count for a subquery.
|
|
|
|
Makes a minimal request (page_size=1) to get totalAvailableResults.
|
|
|
|
Args:
|
|
subquery: The subquery to probe.
|
|
session: aiohttp session for making requests.
|
|
parameters: Original query parameters for additional settings.
|
|
|
|
Returns:
|
|
Total available results for this subquery.
|
|
|
|
Raises:
|
|
CircuitBreakerOpenError: If the circuit breaker is open.
|
|
"""
|
|
from rec.query import probe_query
|
|
|
|
try:
|
|
result = await probe_query(
|
|
session=session,
|
|
channel=parameters.listing_type,
|
|
min_bedrooms=subquery.min_bedrooms,
|
|
max_bedrooms=subquery.max_bedrooms,
|
|
radius=parameters.radius,
|
|
min_price=subquery.min_price,
|
|
max_price=subquery.max_price,
|
|
district=subquery.district,
|
|
max_days_since_added=parameters.max_days_since_added,
|
|
furnish_types=parameters.furnish_types or [],
|
|
config=self.config,
|
|
)
|
|
return result.get("totalAvailableResults", 0)
|
|
except CircuitBreakerOpenError:
|
|
logger.error("Circuit breaker is open, stopping probe operations")
|
|
raise
|
|
except ThrottlingError as e:
|
|
logger.warning(
|
|
f"Throttling detected during probe for {subquery.district}: {e}"
|
|
)
|
|
return 0
|
|
except Exception as e:
|
|
logger.warning(f"Failed to probe subquery {subquery}: {e}")
|
|
return 0
|
|
|
|
def split_by_price(self, subquery: SubQuery) -> list[SubQuery]:
|
|
"""Split a subquery into two by halving the price range.
|
|
|
|
Args:
|
|
subquery: The subquery to split.
|
|
|
|
Returns:
|
|
List of two subqueries covering the same price range.
|
|
"""
|
|
mid_price = (subquery.min_price + subquery.max_price) // 2
|
|
|
|
return [
|
|
replace(
|
|
subquery,
|
|
max_price=mid_price,
|
|
estimated_results=None,
|
|
),
|
|
replace(
|
|
subquery,
|
|
min_price=mid_price,
|
|
estimated_results=None,
|
|
),
|
|
]
|
|
|
|
async def adaptive_split(
|
|
self,
|
|
subquery: SubQuery,
|
|
session: aiohttp.ClientSession,
|
|
parameters: QueryParameters,
|
|
semaphore: asyncio.Semaphore,
|
|
) -> list[SubQuery]:
|
|
"""Recursively split a subquery until all parts are under threshold.
|
|
|
|
Uses binary search on price range to find optimal splits.
|
|
|
|
Args:
|
|
subquery: The subquery to split.
|
|
session: aiohttp session for making requests.
|
|
parameters: Original query parameters.
|
|
semaphore: Semaphore for rate limiting.
|
|
|
|
Returns:
|
|
List of subqueries that are all under the split threshold.
|
|
"""
|
|
# Check if we can split further
|
|
if subquery.price_range <= self.config.min_price_band:
|
|
logger.warning(
|
|
f"Cannot split further, price band at minimum: {subquery}"
|
|
)
|
|
return [subquery]
|
|
|
|
# Split into two halves
|
|
halves = self.split_by_price(subquery)
|
|
result: list[SubQuery] = []
|
|
|
|
for half in halves:
|
|
async with semaphore:
|
|
await asyncio.sleep(self.config.request_delay_ms / 1000)
|
|
count = await self.probe_result_count(half, session, parameters)
|
|
|
|
half = replace(half, estimated_results=count)
|
|
|
|
if count > self.config.split_threshold:
|
|
# Need to split further
|
|
result.extend(
|
|
await self.adaptive_split(
|
|
half, session, parameters, semaphore
|
|
)
|
|
)
|
|
else:
|
|
result.append(half)
|
|
|
|
return result
|
|
|
|
async def split(
|
|
self,
|
|
parameters: QueryParameters,
|
|
session: aiohttp.ClientSession,
|
|
on_progress: Any = None,
|
|
) -> list[SubQuery]:
|
|
"""Split query parameters into optimized subqueries.
|
|
|
|
Performs the full splitting algorithm:
|
|
1. Create initial splits by district and bedroom count
|
|
2. Probe each to get result counts
|
|
3. Adaptively split any that exceed the threshold
|
|
|
|
Args:
|
|
parameters: Original query parameters to split.
|
|
session: aiohttp session for making requests.
|
|
on_progress: Optional callback for progress updates.
|
|
Called as on_progress(phase, message, **kwargs) where kwargs
|
|
contains structured data like subqueries_probed, etc.
|
|
|
|
Returns:
|
|
List of SubQuery objects, each under the result threshold.
|
|
"""
|
|
# Get valid districts
|
|
if parameters.district_names:
|
|
districts = {
|
|
district: locid
|
|
for district, locid in get_districts().items()
|
|
if district in parameters.district_names
|
|
}
|
|
else:
|
|
districts = get_districts()
|
|
|
|
# Phase 1: Create initial subqueries
|
|
initial_subqueries = self.create_initial_subqueries(parameters, districts)
|
|
logger.info(f"Created {len(initial_subqueries)} initial subqueries")
|
|
|
|
if on_progress:
|
|
on_progress(
|
|
phase="splitting",
|
|
message=f"Created {len(initial_subqueries)} initial subqueries",
|
|
subqueries_initial=len(initial_subqueries),
|
|
subqueries_probed=0,
|
|
)
|
|
|
|
# Phase 2: Probe and adaptively split
|
|
semaphore = asyncio.Semaphore(self.config.max_concurrent_requests)
|
|
refined_subqueries: list[SubQuery] = []
|
|
probed_count = 0
|
|
|
|
# Probe all initial subqueries in parallel
|
|
async def probe_and_split(sq: SubQuery) -> list[SubQuery]:
|
|
nonlocal probed_count
|
|
async with semaphore:
|
|
await asyncio.sleep(self.config.request_delay_ms / 1000)
|
|
count = await self.probe_result_count(sq, session, parameters)
|
|
|
|
sq = replace(sq, estimated_results=count)
|
|
probed_count += 1
|
|
|
|
if on_progress:
|
|
on_progress(
|
|
phase="splitting",
|
|
message=f"Probed {probed_count}/{len(initial_subqueries)} subqueries",
|
|
subqueries_initial=len(initial_subqueries),
|
|
subqueries_probed=probed_count,
|
|
)
|
|
|
|
if count > self.config.split_threshold:
|
|
logger.info(
|
|
f"Subquery {sq.district}/{sq.min_bedrooms}BR "
|
|
f"has {count} results, splitting..."
|
|
)
|
|
return await self.adaptive_split(
|
|
sq, session, parameters, semaphore
|
|
)
|
|
return [sq]
|
|
|
|
tasks = [probe_and_split(sq) for sq in initial_subqueries]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for subquery_list in results:
|
|
refined_subqueries.extend(subquery_list)
|
|
|
|
logger.info(
|
|
f"Refined to {len(refined_subqueries)} subqueries after splitting"
|
|
)
|
|
|
|
total_estimated = self.calculate_total_estimated_results(refined_subqueries)
|
|
|
|
if on_progress:
|
|
on_progress(
|
|
phase="splitting_complete",
|
|
message=f"Refined to {len(refined_subqueries)} subqueries",
|
|
subqueries_total=len(refined_subqueries),
|
|
estimated_results=total_estimated,
|
|
)
|
|
|
|
return refined_subqueries
|
|
|
|
def calculate_total_estimated_results(
|
|
self, subqueries: list[SubQuery]
|
|
) -> int:
|
|
"""Calculate total estimated results across all subqueries.
|
|
|
|
Args:
|
|
subqueries: List of subqueries with estimated_results set.
|
|
|
|
Returns:
|
|
Sum of all estimated results.
|
|
"""
|
|
return sum(sq.estimated_results or 0 for sq in subqueries)
|