Stream-process listings as IDs arrive via asyncio.Queue

Replace the sequential fetch-all-then-process pipeline with a streaming
architecture where listing processing starts as soon as IDs become
available from each subquery. A producer task fetches pages and enqueues
new IDs (filtered inline against DB), while 20 consumer workers process
listings concurrently from the queue.

- Add ListingRepository.get_listing_ids() for fast ID-only projection
- Refactor listing_tasks.py: remove get_ids_to_process/dump_listings_and_monitor,
  replace with unified producer/worker/monitor pipeline
- Apply same pattern to CLI path in listing_fetcher.py
- Remove 'filtering' phase from frontend, show combined fetch+process metrics
- Add fetching_done flag to TaskResult for phase transition tracking
This commit is contained in:
Viktor Barzin 2026-02-06 23:43:54 +00:00
parent 7e8f1f0339
commit b9f576ae2b
No known key found for this signature in database
GPG key ID: 0EB088298288D958
6 changed files with 372 additions and 420 deletions

View file

@ -134,144 +134,303 @@ async def dump_listings_full(
async def _dump_listings_full_inner(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Inner implementation with log capture active."""
"""Inner implementation with log capture active.
Uses a streaming pipeline: an asyncio.Queue bridges the fetcher (producer)
and processor workers (consumers) so that listing processing starts as
soon as IDs become available from each subquery.
"""
start_time = time.time()
NUM_WORKERS = 20
celery_logger.info("=" * 60)
celery_logger.info("PHASE 1: Initializing listing fetch")
celery_logger.info("PHASE 1: Splitting queries")
celery_logger.info("=" * 60)
repository = ListingRepository(engine)
config = ScraperConfig.from_env()
splitter = QuerySplitter(config)
_update_task_state(task, "Identifying missing listings", {"phase": "splitting", "progress": 0})
celery_logger.info("Querying Rightmove API to identify new listings...")
ids_to_process = await get_ids_to_process(
parameters=parameters, repository=repository, task=task
)
# Reset throttle metrics
reset_throttle_metrics()
celery_logger.info(f"Found {len(ids_to_process)} new listings to process")
logger.info(f"Found {len(ids_to_process)} listings to process")
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
meta: dict[str, Any] = {"phase": phase, "message": message}
meta.update(kwargs)
_update_task_state(task, message, meta)
celery_logger.info(f"[{phase}] {message}")
if len(ids_to_process) == 0:
elapsed = time.time() - start_time
celery_logger.info(f"No new listings found. Completed in {elapsed:.1f}s")
invalidate_cache()
_update_task_state(task, "No new listings found", {
"phase": "completed", "progress": 1, "processed": 0, "total": 0,
"message": "All listings are up to date",
})
_update_task_state(task, "Analyzing query and splitting by price bands...", {
"phase": "splitting", "progress": 0,
})
celery_logger.info("Starting query splitting and probing...")
try:
async with create_session(config) as session:
subqueries = await splitter.split(parameters, session, on_progress)
total_estimated = splitter.calculate_total_estimated_results(subqueries)
celery_logger.info(
f"Query split complete: {len(subqueries)} subqueries, "
f"~{total_estimated} estimated total results"
)
# Load existing IDs (fast, ID-only projection)
celery_logger.info("Loading existing listing IDs from database...")
existing_ids = repository.get_listing_ids(parameters.listing_type)
celery_logger.info(f"Found {len(existing_ids)} existing listings in DB")
celery_logger.info("=" * 60)
celery_logger.info("PHASE 2: Streaming fetch & process")
celery_logger.info("=" * 60)
# Shared state for the streaming pipeline
queue: asyncio.Queue[int | None] = asyncio.Queue()
ids_collected = 0
completed_subqueries = 0
total_pages_fetched = 0
fetching_done = False
processed_count = 0
failed_count = 0
details_fetched = 0
images_downloaded = 0
ocr_completed = 0
processed_listings: list[Listing] = []
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
_update_task_state(
task,
f"Fetching listings from {len(subqueries)} subqueries...",
{
"phase": "fetching",
"subqueries_completed": 0,
"subqueries_total": len(subqueries),
"ids_collected": 0,
"pages_fetched": 0,
"estimated_results": total_estimated,
"fetching_done": False,
},
)
listing_processor = ListingProcessor(repository)
# --- Producer: fetch subquery pages and enqueue new IDs ---
async def producer() -> None:
nonlocal ids_collected, completed_subqueries, total_pages_fetched
nonlocal fetching_done
async def fetch_subquery(sq: SubQuery) -> None:
nonlocal ids_collected, completed_subqueries, total_pages_fetched
estimated = sq.estimated_results or 0
if estimated == 0:
completed_subqueries += 1
return
page_size = parameters.page_size
max_pages = min(
config.max_pages_per_query,
(estimated // page_size) + 1,
)
for page_id in range(1, max_pages + 1):
async with semaphore:
await asyncio.sleep(config.request_delay_ms / 1000)
try:
result = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=sq.min_bedrooms,
max_bedrooms=sq.max_bedrooms,
radius=parameters.radius,
min_price=sq.min_price,
max_price=sq.max_price,
district=sq.district,
page_size=page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
session=session,
config=config,
)
total_pages_fetched += 1
# Extract and enqueue new IDs inline
properties = result.get("properties", [])
for prop in properties:
identifier = prop.get("identifier")
if identifier and identifier not in existing_ids:
existing_ids.add(identifier)
ids_collected += 1
await queue.put(identifier)
if len(properties) < page_size:
break
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker open: {e}")
break
except ThrottlingError as e:
celery_logger.warning(
f"Throttling on {sq.district} page {page_id}: {e}"
)
break
except Exception as e:
if "GENERIC_ERROR" in str(e):
logger.debug(
f"Max page for {sq.district}: {page_id - 1}"
)
break
logger.warning(
f"Error fetching page {page_id} for "
f"{sq.district}: {e}"
)
break
completed_subqueries += 1
# Fetch all subqueries concurrently
await asyncio.gather(
*[fetch_subquery(sq) for sq in subqueries]
)
celery_logger.info(
f"Fetch complete: {total_pages_fetched} pages from "
f"{completed_subqueries} subqueries, "
f"{ids_collected} new IDs"
)
fetching_done = True
# Send sentinel values to stop workers
for _ in range(NUM_WORKERS):
await queue.put(None)
# --- Consumer workers: process listings from queue ---
async def worker() -> None:
nonlocal processed_count, failed_count
nonlocal details_fetched, images_downloaded, ocr_completed
while True:
listing_id = await queue.get()
if listing_id is None:
break
def step_callback(step_name: str) -> None:
nonlocal details_fetched, images_downloaded, ocr_completed
if step_name == "details":
details_fetched += 1
elif step_name == "images":
images_downloaded += 1
elif step_name == "ocr":
ocr_completed += 1
listing = await listing_processor.process_listing(
listing_id, on_step_complete=step_callback
)
if listing is not None:
processed_count += 1
processed_listings.append(listing)
else:
failed_count += 1
# --- Monitor: reports combined progress ---
async def monitor() -> None:
last_progress = 0.0
while True:
total = ids_collected
done = processed_count + failed_count
if fetching_done and done >= total and total > 0:
break
if fetching_done and total == 0:
break
# Determine phase label
phase = "processing" if fetching_done else "fetching"
if total > 0:
progress_ratio = round(done / total, 2)
else:
progress_ratio = 0.0
elapsed = time.time() - start_time
rate = done / elapsed if elapsed > 0 else 0
remaining = (total - done) if total > 0 else 0
eta = remaining / rate if rate > 0 else 0
if progress_ratio >= last_progress + 0.1 or done == 1:
celery_logger.info(
f"Progress: {progress_ratio * 100:.0f}% "
f"({done}/{total}) "
f"| Elapsed: {elapsed:.0f}s "
f"| Rate: {rate:.1f}/s "
f"| ETA: {eta:.0f}s"
)
last_progress = progress_ratio
_update_task_state(
task,
f"{'Processing' if fetching_done else 'Fetching & processing'}: "
f"{done}/{total}",
{
"phase": phase,
"progress": progress_ratio,
"processed": done,
"total": total,
"subqueries_completed": completed_subqueries,
"subqueries_total": len(subqueries),
"ids_collected": ids_collected,
"pages_fetched": total_pages_fetched,
"fetching_done": fetching_done,
"details_fetched": details_fetched,
"images_downloaded": images_downloaded,
"ocr_completed": ocr_completed,
"failed": failed_count,
"elapsed_seconds": round(elapsed, 1),
"rate_per_second": round(rate, 2),
"eta_seconds": round(eta, 1),
},
)
await asyncio.sleep(1)
# Run producer, workers, and monitor concurrently
await asyncio.gather(
producer(),
*[worker() for _ in range(NUM_WORKERS)],
monitor(),
)
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker prevented query: {e}")
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(metrics.summary())
return []
celery_logger.info("=" * 60)
celery_logger.info("PHASE 2: Processing listings (fetch details, images, OCR)")
celery_logger.info("=" * 60)
listing_processor = ListingProcessor(repository)
celery_logger.info(f"Starting processing {len(ids_to_process)} listings")
logger.info(f"Starting processing {len(ids_to_process)} listings")
result = await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=ids_to_process
)
finally:
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(
f"API Stats: {metrics.total_requests} requests, "
f"avg {metrics.average_response_time:.2f}s, "
f"{metrics.total_throttling_events} throttled"
)
elapsed = time.time() - start_time
celery_logger.info("=" * 60)
celery_logger.info(f"COMPLETED: Processed {len(result)} listings in {elapsed:.1f}s")
celery_logger.info(
f"COMPLETED: Processed {len(processed_listings)} listings in {elapsed:.1f}s"
)
celery_logger.info("=" * 60)
invalidate_cache()
# Send final state so the frontend has rich data even after task completes
_update_task_state(task, "Completed", {
"phase": "completed", "progress": 1,
"processed": len(result), "total": len(ids_to_process),
"message": f"Processed {len(result)} listings in {elapsed:.0f}s",
"processed": len(processed_listings), "total": ids_collected,
"message": f"Processed {len(processed_listings)} listings in {elapsed:.0f}s",
})
return result
async def dump_listings_and_monitor(
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
) -> list[Listing]:
task_progress = {missing_id: 0 for missing_id in missing_ids}
processed_count = 0
failed_count = 0
details_fetched = 0
images_downloaded = 0
ocr_completed = 0
start_time = time.time()
async def process(missing_id: int) -> Listing | None:
nonlocal processed_count, failed_count
def step_callback(step_name: str) -> None:
nonlocal details_fetched, images_downloaded, ocr_completed
if step_name == "details":
details_fetched += 1
elif step_name == "images":
images_downloaded += 1
elif step_name == "ocr":
ocr_completed += 1
listing = await listing_processor.process_listing(
missing_id, on_step_complete=step_callback
)
task_progress[missing_id] = 1
if listing is not None:
processed_count += 1
else:
failed_count += 1
return listing
async def monitor() -> None:
last_progress = 0
while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2)
elapsed = time.time() - start_time
rate = progress / elapsed if elapsed > 0 else 0
eta = (len(missing_ids) - progress) / rate if rate > 0 else 0
# Log every 10% progress or at least every update
if progress_ratio >= last_progress + 0.1 or progress == 1:
celery_logger.info(
f"Progress: {progress_ratio * 100:.0f}% "
f"({progress}/{len(missing_ids)}) "
f"| Elapsed: {elapsed:.0f}s "
f"| Rate: {rate:.1f}/s "
f"| ETA: {eta:.0f}s"
)
last_progress = progress_ratio
_update_task_state(
task,
f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})",
{
"phase": "processing",
"progress": progress_ratio,
"processed": progress,
"total": len(missing_ids),
"details_fetched": details_fetched,
"images_downloaded": images_downloaded,
"ocr_completed": ocr_completed,
"failed": failed_count,
"elapsed_seconds": round(elapsed, 1),
"rate_per_second": round(rate, 2),
"eta_seconds": round(eta, 1),
},
)
await asyncio.sleep(1)
processed_listings = await asyncio.gather(
*[process(id) for id in missing_ids], *[monitor()]
)
filtered_listings = [listing for listing in processed_listings if listing is not None]
celery_logger.info(
f"Processing complete: {processed_count} successful, {failed_count} failed"
)
return filtered_listings
return processed_listings
@app.on_after_finalize.connect
@ -297,227 +456,3 @@ def setup_periodic_tasks(sender, **kwargs):
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
name=schedule.name,
)
async def get_ids_to_process(
*,
parameters: QueryParameters,
repository: ListingRepository,
task: Task,
) -> set[int]:
"""Fetch all listing IDs using intelligent query splitting.
Uses the QuerySplitter to adaptively split large queries and maximize
data extraction while respecting Rightmove's result caps.
Args:
parameters: Query parameters for the search.
repository: Repository for checking existing listings.
task: Celery task for progress updates.
Returns:
Set of new listing IDs that need to be processed.
"""
config = ScraperConfig.from_env()
splitter = QuerySplitter(config)
# Reset throttle metrics
reset_throttle_metrics()
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
meta: dict[str, Any] = {"phase": phase, "message": message}
meta.update(kwargs)
_update_task_state(task, message, meta)
celery_logger.info(f"[{phase}] {message}")
celery_logger.info("Starting query splitting and probing...")
try:
async with create_session(config) as session:
# Phase 1 & 2: Split and probe queries
_update_task_state(task, "Analyzing query and splitting by price bands...", {
"phase": "splitting", "progress": 0,
})
subqueries = await splitter.split(parameters, session, on_progress)
total_estimated = splitter.calculate_total_estimated_results(subqueries)
celery_logger.info(
f"Query split complete: {len(subqueries)} subqueries, "
f"~{total_estimated} estimated total results"
)
logger.info(
f"Split into {len(subqueries)} subqueries, "
f"estimated {total_estimated} total results"
)
# Phase 3: Fetch all pages for each subquery
_update_task_state(
task,
f"Fetching listings from {len(subqueries)} subqueries...",
{
"phase": "fetching",
"subqueries_completed": 0,
"subqueries_total": len(subqueries),
"ids_collected": 0,
"pages_fetched": 0,
"estimated_results": total_estimated,
},
)
celery_logger.info(f"Fetching pages from {len(subqueries)} subqueries...")
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
identifiers: set[int] = set()
completed_subqueries = 0
total_pages_fetched = 0
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
"""Fetch all pages for a single subquery."""
nonlocal completed_subqueries, total_pages_fetched
results: list[dict[str, Any]] = []
# Calculate how many pages we need based on estimated results
estimated = sq.estimated_results or 0
if estimated == 0:
completed_subqueries += 1
_update_task_state(
task,
f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries",
{
"phase": "fetching",
"subqueries_completed": completed_subqueries,
"subqueries_total": len(subqueries),
"ids_collected": len(identifiers),
"pages_fetched": total_pages_fetched,
},
)
return results
# Fetch pages up to max_pages_per_query or until no more results
page_size = parameters.page_size
max_pages = min(
config.max_pages_per_query,
(estimated // page_size) + 1,
)
for page_id in range(1, max_pages + 1):
async with semaphore:
await asyncio.sleep(config.request_delay_ms / 1000)
try:
result = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=sq.min_bedrooms,
max_bedrooms=sq.max_bedrooms,
radius=parameters.radius,
min_price=sq.min_price,
max_price=sq.max_price,
district=sq.district,
page_size=page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
session=session,
config=config,
)
results.append(result)
total_pages_fetched += 1
# Check if we've received all results
properties = result.get("properties", [])
if len(properties) < page_size:
# No more results on next page
break
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker open: {e}")
break
except ThrottlingError as e:
celery_logger.warning(
f"Throttling on {sq.district} page {page_id}: {e}"
)
break
except Exception as e:
if "GENERIC_ERROR" in str(e):
# Reached end of results
logger.debug(
f"Max page for {sq.district}: {page_id - 1}"
)
break
logger.warning(
f"Error fetching page {page_id} for {sq.district}: {e}"
)
break
completed_subqueries += 1
_update_task_state(
task,
f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries",
{
"phase": "fetching",
"subqueries_completed": completed_subqueries,
"subqueries_total": len(subqueries),
"ids_collected": len(identifiers),
"pages_fetched": total_pages_fetched,
},
)
return results
# Fetch all subqueries concurrently
all_results = await asyncio.gather(
*[fetch_subquery(sq) for sq in subqueries]
)
celery_logger.info(
f"Fetch complete: {total_pages_fetched} pages from "
f"{completed_subqueries} subqueries"
)
# Extract identifiers from all results
for subquery_results in all_results:
for response_json in subquery_results:
if not response_json:
continue
if response_json.get("totalAvailableResults", 0) == 0:
continue
for property_data in response_json.get("properties", []):
identifier = property_data.get("identifier")
if identifier:
identifiers.add(identifier)
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker prevented query: {e}")
# Log throttle metrics
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(metrics.summary())
return set()
finally:
# Log throttle metrics
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(f"API Stats: {metrics.total_requests} requests, "
f"avg {metrics.average_response_time:.2f}s, "
f"{metrics.total_throttling_events} throttled")
celery_logger.info(f"Found {len(identifiers)} unique listing IDs from API")
logger.info(f"Found {len(identifiers)} unique listings")
# Filter out listings already in the database
celery_logger.info("Checking database for existing listings...")
all_listing_ids = {listing.id for listing in await repository.get_listings()}
new_ids = identifiers - all_listing_ids
celery_logger.info(
f"Filtering: {len(identifiers)} total, "
f"{len(all_listing_ids)} existing in DB, "
f"{len(new_ids)} new to process"
)
_update_task_state(task, f"Found {len(new_ids)} new listings to process", {
"phase": "filtering",
"total_found": len(identifiers),
"existing_in_db": len(all_listing_ids),
"new_listings": len(new_ids),
})
return new_ids