import asyncio import logging import time from collections import deque from typing import Any from celery import Task from celery.schedules import crontab from celery_app import app from config.schedule_config import SchedulesConfig from config.scraper_config import ScraperConfig from listing_processor import ListingProcessor from models.listing import Listing, QueryParameters from rec.query import create_session, listing_query from rec.exceptions import CircuitBreakerOpenError, ThrottlingError from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics from repositories.listing_repository import ListingRepository from database import engine from services.query_splitter import QuerySplitter, SubQuery from utils.redis_lock import redis_lock from services.listing_cache import invalidate_cache logger = logging.getLogger("uvicorn.error") # Also configure a celery-specific logger that always outputs to stdout celery_logger = logging.getLogger("celery.task") if not celery_logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s" )) celery_logger.addHandler(handler) celery_logger.setLevel(logging.INFO) SCRAPE_LOCK_NAME = "scrape_listings" LOG_BUFFER_MAX_LINES = 200 # Module-level log buffer — active only during task execution. # The TaskLogHandler appends here; _update_task_state reads from here. _active_log_buffer: deque[str] | None = None class TaskLogHandler(logging.Handler): """Captures log records into a deque for inclusion in task state updates.""" def __init__(self, buffer: deque[str]) -> None: super().__init__() self.buffer = buffer def emit(self, record: logging.LogRecord) -> None: try: self.buffer.append(self.format(record)) except Exception: pass def _update_task_state(task: Task, state: str, meta: dict[str, Any]) -> None: """Call task.update_state with logs injected from the active log buffer.""" if _active_log_buffer is not None: meta["logs"] = list(_active_log_buffer) task.update_state(state=state, meta=meta) @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: msg = "Another scrape job is already running, skipping this execution" logger.warning(msg) celery_logger.warning(msg) self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"}) return {"status": "skipped", "reason": "another_job_running"} celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}") self.update_state(state="Starting...", meta={"phase": "splitting", "progress": 0}) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) return {"phase": "completed", "progress": 1} async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: logger.warning("Another scrape job is already running, skipping this execution") return {"status": "skipped", "reason": "another_job_running"} logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) await dump_listings_full(task=Task(), parameters=parsed_parameters) return {"progress": 0} async def dump_listings_full( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" global _active_log_buffer # Set up log capture into a module-level buffer so _update_task_state # can inject logs into every state update. log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES) log_handler = TaskLogHandler(log_buffer) log_handler.setFormatter( logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S") ) # Attach handler to both loggers used in the codebase, and ensure # they accept INFO-level messages (Celery's worker setup may leave # the celery.task logger at WARNING). _prev_celery_level = celery_logger.level _prev_logger_level = logger.level celery_logger.addHandler(log_handler) logger.addHandler(log_handler) if celery_logger.level == logging.NOTSET or celery_logger.level > logging.INFO: celery_logger.setLevel(logging.INFO) if logger.level == logging.NOTSET or logger.level > logging.INFO: logger.setLevel(logging.INFO) _active_log_buffer = log_buffer try: return await _dump_listings_full_inner(task=task, parameters=parameters) finally: _active_log_buffer = None celery_logger.removeHandler(log_handler) logger.removeHandler(log_handler) celery_logger.setLevel(_prev_celery_level) logger.setLevel(_prev_logger_level) async def _dump_listings_full_inner( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Inner implementation with log capture active.""" start_time = time.time() celery_logger.info("=" * 60) celery_logger.info("PHASE 1: Initializing listing fetch") celery_logger.info("=" * 60) repository = ListingRepository(engine) _update_task_state(task, "Identifying missing listings", {"phase": "splitting", "progress": 0}) celery_logger.info("Querying Rightmove API to identify new listings...") ids_to_process = await get_ids_to_process( parameters=parameters, repository=repository, task=task ) celery_logger.info(f"Found {len(ids_to_process)} new listings to process") logger.info(f"Found {len(ids_to_process)} listings to process") if len(ids_to_process) == 0: elapsed = time.time() - start_time celery_logger.info(f"No new listings found. Completed in {elapsed:.1f}s") invalidate_cache() _update_task_state(task, "No new listings found", { "phase": "completed", "progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date", }) return [] celery_logger.info("=" * 60) celery_logger.info("PHASE 2: Processing listings (fetch details, images, OCR)") celery_logger.info("=" * 60) listing_processor = ListingProcessor(repository) celery_logger.info(f"Starting processing {len(ids_to_process)} listings") logger.info(f"Starting processing {len(ids_to_process)} listings") result = await dump_listings_and_monitor( task=task, listing_processor=listing_processor, missing_ids=ids_to_process ) elapsed = time.time() - start_time celery_logger.info("=" * 60) celery_logger.info(f"COMPLETED: Processed {len(result)} listings in {elapsed:.1f}s") celery_logger.info("=" * 60) invalidate_cache() # Send final state so the frontend has rich data even after task completes _update_task_state(task, "Completed", { "phase": "completed", "progress": 1, "processed": len(result), "total": len(ids_to_process), "message": f"Processed {len(result)} listings in {elapsed:.0f}s", }) return result async def dump_listings_and_monitor( *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] ) -> list[Listing]: task_progress = {missing_id: 0 for missing_id in missing_ids} processed_count = 0 failed_count = 0 details_fetched = 0 images_downloaded = 0 ocr_completed = 0 start_time = time.time() async def process(missing_id: int) -> Listing | None: nonlocal processed_count, failed_count def step_callback(step_name: str) -> None: nonlocal details_fetched, images_downloaded, ocr_completed if step_name == "details": details_fetched += 1 elif step_name == "images": images_downloaded += 1 elif step_name == "ocr": ocr_completed += 1 listing = await listing_processor.process_listing( missing_id, on_step_complete=step_callback ) task_progress[missing_id] = 1 if listing is not None: processed_count += 1 else: failed_count += 1 return listing async def monitor() -> None: last_progress = 0 while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) elapsed = time.time() - start_time rate = progress / elapsed if elapsed > 0 else 0 eta = (len(missing_ids) - progress) / rate if rate > 0 else 0 # Log every 10% progress or at least every update if progress_ratio >= last_progress + 0.1 or progress == 1: celery_logger.info( f"Progress: {progress_ratio * 100:.0f}% " f"({progress}/{len(missing_ids)}) " f"| Elapsed: {elapsed:.0f}s " f"| Rate: {rate:.1f}/s " f"| ETA: {eta:.0f}s" ) last_progress = progress_ratio _update_task_state( task, f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})", { "phase": "processing", "progress": progress_ratio, "processed": progress, "total": len(missing_ids), "details_fetched": details_fetched, "images_downloaded": images_downloaded, "ocr_completed": ocr_completed, "failed": failed_count, "elapsed_seconds": round(elapsed, 1), "rate_per_second": round(rate, 2), "eta_seconds": round(eta, 1), }, ) await asyncio.sleep(1) processed_listings = await asyncio.gather( *[process(id) for id in missing_ids], *[monitor()] ) filtered_listings = [listing for listing in processed_listings if listing is not None] celery_logger.info( f"Processing complete: {processed_count} successful, {failed_count} failed" ) return filtered_listings @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): """Register periodic tasks from environment configuration.""" try: config = SchedulesConfig.from_env() except ValueError as e: logger.error(f"Failed to load schedule configuration: {e}") return for schedule in config.get_enabled_schedules(): logger.info( f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}" ) sender.add_periodic_task( crontab( minute=schedule.minute, hour=schedule.hour, day_of_week=schedule.day_of_week, ), dump_listings_task.s(schedule.to_query_parameters().model_dump_json()), name=schedule.name, ) async def get_ids_to_process( *, parameters: QueryParameters, repository: ListingRepository, task: Task, ) -> set[int]: """Fetch all listing IDs using intelligent query splitting. Uses the QuerySplitter to adaptively split large queries and maximize data extraction while respecting Rightmove's result caps. Args: parameters: Query parameters for the search. repository: Repository for checking existing listings. task: Celery task for progress updates. Returns: Set of new listing IDs that need to be processed. """ config = ScraperConfig.from_env() splitter = QuerySplitter(config) # Reset throttle metrics reset_throttle_metrics() def on_progress(phase: str, message: str, **kwargs: Any) -> None: meta: dict[str, Any] = {"phase": phase, "message": message} meta.update(kwargs) _update_task_state(task, message, meta) celery_logger.info(f"[{phase}] {message}") celery_logger.info("Starting query splitting and probing...") try: async with create_session(config) as session: # Phase 1 & 2: Split and probe queries _update_task_state(task, "Analyzing query and splitting by price bands...", { "phase": "splitting", "progress": 0, }) subqueries = await splitter.split(parameters, session, on_progress) total_estimated = splitter.calculate_total_estimated_results(subqueries) celery_logger.info( f"Query split complete: {len(subqueries)} subqueries, " f"~{total_estimated} estimated total results" ) logger.info( f"Split into {len(subqueries)} subqueries, " f"estimated {total_estimated} total results" ) # Phase 3: Fetch all pages for each subquery _update_task_state( task, f"Fetching listings from {len(subqueries)} subqueries...", { "phase": "fetching", "subqueries_completed": 0, "subqueries_total": len(subqueries), "ids_collected": 0, "pages_fetched": 0, "estimated_results": total_estimated, }, ) celery_logger.info(f"Fetching pages from {len(subqueries)} subqueries...") semaphore = asyncio.Semaphore(config.max_concurrent_requests) identifiers: set[int] = set() completed_subqueries = 0 total_pages_fetched = 0 async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]: """Fetch all pages for a single subquery.""" nonlocal completed_subqueries, total_pages_fetched results: list[dict[str, Any]] = [] # Calculate how many pages we need based on estimated results estimated = sq.estimated_results or 0 if estimated == 0: completed_subqueries += 1 _update_task_state( task, f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries", { "phase": "fetching", "subqueries_completed": completed_subqueries, "subqueries_total": len(subqueries), "ids_collected": len(identifiers), "pages_fetched": total_pages_fetched, }, ) return results # Fetch pages up to max_pages_per_query or until no more results page_size = parameters.page_size max_pages = min( config.max_pages_per_query, (estimated // page_size) + 1, ) for page_id in range(1, max_pages + 1): async with semaphore: await asyncio.sleep(config.request_delay_ms / 1000) try: result = await listing_query( page=page_id, channel=parameters.listing_type, min_bedrooms=sq.min_bedrooms, max_bedrooms=sq.max_bedrooms, radius=parameters.radius, min_price=sq.min_price, max_price=sq.max_price, district=sq.district, page_size=page_size, max_days_since_added=parameters.max_days_since_added, furnish_types=parameters.furnish_types or [], session=session, config=config, ) results.append(result) total_pages_fetched += 1 # Check if we've received all results properties = result.get("properties", []) if len(properties) < page_size: # No more results on next page break except CircuitBreakerOpenError as e: celery_logger.error(f"Circuit breaker open: {e}") break except ThrottlingError as e: celery_logger.warning( f"Throttling on {sq.district} page {page_id}: {e}" ) break except Exception as e: if "GENERIC_ERROR" in str(e): # Reached end of results logger.debug( f"Max page for {sq.district}: {page_id - 1}" ) break logger.warning( f"Error fetching page {page_id} for {sq.district}: {e}" ) break completed_subqueries += 1 _update_task_state( task, f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries", { "phase": "fetching", "subqueries_completed": completed_subqueries, "subqueries_total": len(subqueries), "ids_collected": len(identifiers), "pages_fetched": total_pages_fetched, }, ) return results # Fetch all subqueries concurrently all_results = await asyncio.gather( *[fetch_subquery(sq) for sq in subqueries] ) celery_logger.info( f"Fetch complete: {total_pages_fetched} pages from " f"{completed_subqueries} subqueries" ) # Extract identifiers from all results for subquery_results in all_results: for response_json in subquery_results: if not response_json: continue if response_json.get("totalAvailableResults", 0) == 0: continue for property_data in response_json.get("properties", []): identifier = property_data.get("identifier") if identifier: identifiers.add(identifier) except CircuitBreakerOpenError as e: celery_logger.error(f"Circuit breaker prevented query: {e}") # Log throttle metrics metrics = get_throttle_metrics() if metrics.total_requests > 0: celery_logger.info(metrics.summary()) return set() finally: # Log throttle metrics metrics = get_throttle_metrics() if metrics.total_requests > 0: celery_logger.info(f"API Stats: {metrics.total_requests} requests, " f"avg {metrics.average_response_time:.2f}s, " f"{metrics.total_throttling_events} throttled") celery_logger.info(f"Found {len(identifiers)} unique listing IDs from API") logger.info(f"Found {len(identifiers)} unique listings") # Filter out listings already in the database celery_logger.info("Checking database for existing listings...") all_listing_ids = {listing.id for listing in await repository.get_listings()} new_ids = identifiers - all_listing_ids celery_logger.info( f"Filtering: {len(identifiers)} total, " f"{len(all_listing_ids)} existing in DB, " f"{len(new_ids)} new to process" ) _update_task_state(task, f"Found {len(new_ids)} new listings to process", { "phase": "filtering", "total_found": len(identifiers), "existing_in_db": len(all_listing_ids), "new_listings": len(new_ids), }) return new_ids