diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py index 020016f..e1151ca 100644 --- a/crawler/listing_processor.py +++ b/crawler/listing_processor.py @@ -13,6 +13,9 @@ from repositories.listing_repository import ListingRepository logger = logging.getLogger("uvicorn.error") +# Also use celery task logger for visibility in worker output +celery_logger = logging.getLogger("celery.task") + class ListingProcessor: semaphore: asyncio.Semaphore @@ -36,15 +39,16 @@ class ListingProcessor: for step in self.process_steps: if await step.needs_processing(listing_id): async with self.semaphore: + step_name = step.__class__.__name__ try: listing = await step.process(listing_id) + logger.debug(f"[{listing_id}] {step_name} completed") except Exception as e: - logger.error(f"Failed to process {listing_id=}: {e}") + logger.error(f"[{listing_id}] {step_name} failed: {e}") + celery_logger.error(f"[{listing_id}] {step_name} failed: {e}") return None return listing - async def listing_exists(self, listing_id: int) -> bool: ... - class Step: listing_repository: ListingRepository @@ -65,19 +69,23 @@ class FetchListingDetailsStep(Step): existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) - if (existing_listings) == 0: + if len(existing_listings) == 0: return True return False async def process(self, listing_id: int) -> Listing: - logger.debug(f"Fetching details for {listing_id=}") + logger.debug(f"[{listing_id}] Fetching property details from API") + celery_logger.info(f"[{listing_id}] Fetching details...") + existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) now = datetime.now() if len(existing_listings) > 0: # listing exists, do not refresh + logger.debug(f"[{listing_id}] Already exists, skipping refresh") return existing_listings[0] + listing_details = await detail_query(listing_id) furnish_type_str = listing_details["property"].get("letFurnishType", "unknown") @@ -124,7 +132,12 @@ class FetchListingDetailsStep(Step): additional_info=listing_details, ) await self.listing_repository.upsert_listings([listing]) - logger.debug(f"Completed fetching details for {listing_id=}") + + celery_logger.info( + f"[{listing_id}] Details fetched: £{listing.price}, " + f"{listing.number_of_bedrooms}BR, {listing.agency}" + ) + logger.debug(f"[{listing_id}] Details fetch complete") # TODO: dump to filesystem return listing @@ -140,7 +153,8 @@ class FetchImagesStep(Step): return len(listing.floorplan_image_paths) == 0 async def process(self, listing_id: int) -> Listing: - logger.debug(f"Fetching images for {listing_id=}") + logger.debug(f"[{listing_id}] Fetching floorplan images") + existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) @@ -152,6 +166,12 @@ class FetchImagesStep(Step): all_floorplans = listing.additional_info.get("property", {}).get( "floorplans", [] ) + + if len(all_floorplans) == 0: + logger.debug(f"[{listing_id}] No floorplans available") + return listing + + downloaded = 0 client_timeout = aiohttp.ClientTimeout(total=30) for floorplan_obj in all_floorplans: url = floorplan_obj["url"] @@ -169,8 +189,12 @@ class FetchImagesStep(Step): with open(floorplan_path, "wb") as f: f.write(await response.read()) listing.floorplan_image_paths.append(str(floorplan_path)) + downloaded += 1 + await self.listing_repository.upsert_listings([listing]) - logger.debug(f"Completed fetching images for {listing_id=}") + + celery_logger.info(f"[{listing_id}] Downloaded {downloaded} floorplan images") + logger.debug(f"[{listing_id}] Image fetch complete") return listing @@ -188,11 +212,19 @@ class DetectFloorplanStep(Step): return listings[0].square_meters is None async def process(self, listing_id: int) -> Listing: - logger.debug(f"Running floorplan detection for {listing_id=}") + logger.debug(f"[{listing_id}] Running OCR on floorplans") + listings = await self.listing_repository.get_listings(only_ids=[listing_id]) if len(listings) == 0: raise ValueError(f"Listing {listing_id} does not exist") listing = listings[0] + + if len(listing.floorplan_image_paths) == 0: + logger.debug(f"[{listing_id}] No floorplan images to process") + listing.square_meters = 0 + await self.listing_repository.upsert_listings([listing]) + return listing + sqms = [] for floorplan_path in listing.floorplan_image_paths: async with self.ocr_semaphore: @@ -201,9 +233,15 @@ class DetectFloorplanStep(Step): ) if estimated_sqm is not None: sqms.append(estimated_sqm) + max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 - # if max_sqm is not None: listing.square_meters = max_sqm await self.listing_repository.upsert_listings([listing]) - logger.debug(f"Completed running floorplan detection for {listing_id=}") + + if max_sqm > 0: + celery_logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm") + else: + logger.debug(f"[{listing_id}] OCR: no square meters detected") + + logger.debug(f"[{listing_id}] OCR complete") return listing diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index 1fb3041..713a56d 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from typing import Any from celery import Task from celery.schedules import crontab @@ -9,6 +10,8 @@ from config.scraper_config import ScraperConfig from listing_processor import ListingProcessor from models.listing import Listing, QueryParameters from rec.query import create_session, listing_query +from rec.exceptions import CircuitBreakerOpenError, ThrottlingError +from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics from repositories.listing_repository import ListingRepository from database import engine from services.query_splitter import QuerySplitter, SubQuery @@ -16,6 +19,16 @@ from utils.redis_lock import redis_lock logger = logging.getLogger("uvicorn.error") +# Also configure a celery-specific logger that always outputs to stdout +celery_logger = logging.getLogger("celery.task") +if not celery_logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s" + )) + celery_logger.addHandler(handler) + celery_logger.setLevel(logging.INFO) + SCRAPE_LOCK_NAME = "scrape_listings" @@ -23,12 +36,18 @@ SCRAPE_LOCK_NAME = "scrape_listings" def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: - logger.warning("Another scrape job is already running, skipping this execution") + msg = "Another scrape job is already running, skipping this execution" + logger.warning(msg) + celery_logger.warning(msg) self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"}) return {"status": "skipped", "reason": "another_job_running"} + celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") + parsed_parameters = QueryParameters.model_validate_json(parameters_json) + celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}") + self.update_state(state="Starting...", meta={"progress": 0}) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) return {"progress": 0} @@ -50,46 +69,91 @@ async def dump_listings_full( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" + start_time = time.time() + celery_logger.info("=" * 60) + celery_logger.info("PHASE 1: Initializing listing fetch") + celery_logger.info("=" * 60) + repository = ListingRepository(engine) task.update_state(state="Identifying missing listings", meta={"progress": 0}) + celery_logger.info("Querying Rightmove API to identify new listings...") ids_to_process = await get_ids_to_process( parameters=parameters, repository=repository, task=task ) + + celery_logger.info(f"Found {len(ids_to_process)} new listings to process") logger.info(f"Found {len(ids_to_process)} listings to process") if len(ids_to_process) == 0: + elapsed = time.time() - start_time + celery_logger.info(f"No new listings found. Completed in {elapsed:.1f}s") task.update_state( state="No new listings found", meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"}, ) return [] + celery_logger.info("=" * 60) + celery_logger.info("PHASE 2: Processing listings (fetch details, images, OCR)") + celery_logger.info("=" * 60) + listing_processor = ListingProcessor(repository) + celery_logger.info(f"Starting processing {len(ids_to_process)} listings") logger.info(f"Starting processing {len(ids_to_process)} listings") - return await dump_listings_and_monitor( + + result = await dump_listings_and_monitor( task=task, listing_processor=listing_processor, missing_ids=ids_to_process ) + elapsed = time.time() - start_time + celery_logger.info("=" * 60) + celery_logger.info(f"COMPLETED: Processed {len(result)} listings in {elapsed:.1f}s") + celery_logger.info("=" * 60) + + return result + async def dump_listings_and_monitor( *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] ) -> list[Listing]: task_progress = {missing_id: 0 for missing_id in missing_ids} + processed_count = 0 + failed_count = 0 + start_time = time.time() async def process(missing_id: int) -> Listing | None: + nonlocal processed_count, failed_count listing = await listing_processor.process_listing(missing_id) task_progress[missing_id] = 1 + if listing is not None: + processed_count += 1 + else: + failed_count += 1 return listing async def monitor() -> None: + last_progress = 0 while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) - logger.error( - f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})" - ) + + # Log every 10% progress or at least every update + if progress_ratio >= last_progress + 0.1 or progress == 1: + elapsed = time.time() - start_time + rate = progress / elapsed if elapsed > 0 else 0 + eta = (len(missing_ids) - progress) / rate if rate > 0 else 0 + + celery_logger.info( + f"Progress: {progress_ratio * 100:.0f}% " + f"({progress}/{len(missing_ids)}) " + f"| Elapsed: {elapsed:.0f}s " + f"| Rate: {rate:.1f}/s " + f"| ETA: {eta:.0f}s" + ) + last_progress = progress_ratio + task.update_state( - state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", + state=f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})", meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)}, ) await asyncio.sleep(1) @@ -97,7 +161,11 @@ async def dump_listings_and_monitor( processed_listings = await asyncio.gather( *[process(id) for id in missing_ids], *[monitor()] ) - filtered_listings = [l for l in processed_listings if l is not None] + filtered_listings = [listing for listing in processed_listings if listing is not None] + + celery_logger.info( + f"Processing complete: {processed_count} successful, {failed_count} failed" + ) return filtered_listings @@ -149,115 +217,171 @@ async def get_ids_to_process( config = ScraperConfig.from_env() splitter = QuerySplitter(config) + # Reset throttle metrics + reset_throttle_metrics() + def on_progress(phase: str, message: str) -> None: task.update_state(state=message, meta={"phase": phase}) + celery_logger.info(f"[{phase}] {message}") - async with create_session(config) as session: - # Phase 1 & 2: Split and probe queries - task.update_state( - state="Analyzing query and splitting by price bands...", - meta={"phase": "splitting", "progress": 0}, - ) - subqueries = await splitter.split(parameters, session, on_progress) + celery_logger.info("Starting query splitting and probing...") - total_estimated = splitter.calculate_total_estimated_results(subqueries) - logger.info( - f"Split into {len(subqueries)} subqueries, " - f"estimated {total_estimated} total results" - ) + try: + async with create_session(config) as session: + # Phase 1 & 2: Split and probe queries + task.update_state( + state="Analyzing query and splitting by price bands...", + meta={"phase": "splitting", "progress": 0}, + ) + subqueries = await splitter.split(parameters, session, on_progress) - # Phase 3: Fetch all pages for each subquery - task.update_state( - state=f"Fetching listings from {len(subqueries)} subqueries...", - meta={ - "phase": "fetching", - "subqueries": len(subqueries), - "estimated_results": total_estimated, - }, - ) - - semaphore = asyncio.Semaphore(config.max_concurrent_requests) - identifiers: set[int] = set() - - async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]: - """Fetch all pages for a single subquery.""" - results: list[dict[str, Any]] = [] - - # Calculate how many pages we need based on estimated results - estimated = sq.estimated_results or 0 - if estimated == 0: - return results - - # Fetch pages up to max_pages_per_query or until no more results - page_size = parameters.page_size - max_pages = min( - config.max_pages_per_query, - (estimated // page_size) + 1, + total_estimated = splitter.calculate_total_estimated_results(subqueries) + celery_logger.info( + f"Query split complete: {len(subqueries)} subqueries, " + f"~{total_estimated} estimated total results" + ) + logger.info( + f"Split into {len(subqueries)} subqueries, " + f"estimated {total_estimated} total results" ) - for page_id in range(1, max_pages + 1): - async with semaphore: - await asyncio.sleep(config.request_delay_ms / 1000) - try: - result = await listing_query( - page=page_id, - channel=parameters.listing_type, - min_bedrooms=sq.min_bedrooms, - max_bedrooms=sq.max_bedrooms, - radius=parameters.radius, - min_price=sq.min_price, - max_price=sq.max_price, - district=sq.district, - page_size=page_size, - max_days_since_added=parameters.max_days_since_added, - furnish_types=parameters.furnish_types or [], - session=session, - ) - results.append(result) + # Phase 3: Fetch all pages for each subquery + task.update_state( + state=f"Fetching listings from {len(subqueries)} subqueries...", + meta={ + "phase": "fetching", + "subqueries": len(subqueries), + "estimated_results": total_estimated, + }, + ) - # Check if we've received all results - properties = result.get("properties", []) - if len(properties) < page_size: - # No more results on next page + celery_logger.info(f"Fetching pages from {len(subqueries)} subqueries...") + + semaphore = asyncio.Semaphore(config.max_concurrent_requests) + identifiers: set[int] = set() + completed_subqueries = 0 + total_pages_fetched = 0 + + async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]: + """Fetch all pages for a single subquery.""" + nonlocal completed_subqueries, total_pages_fetched + results: list[dict[str, Any]] = [] + + # Calculate how many pages we need based on estimated results + estimated = sq.estimated_results or 0 + if estimated == 0: + completed_subqueries += 1 + return results + + # Fetch pages up to max_pages_per_query or until no more results + page_size = parameters.page_size + max_pages = min( + config.max_pages_per_query, + (estimated // page_size) + 1, + ) + + for page_id in range(1, max_pages + 1): + async with semaphore: + await asyncio.sleep(config.request_delay_ms / 1000) + try: + result = await listing_query( + page=page_id, + channel=parameters.listing_type, + min_bedrooms=sq.min_bedrooms, + max_bedrooms=sq.max_bedrooms, + radius=parameters.radius, + min_price=sq.min_price, + max_price=sq.max_price, + district=sq.district, + page_size=page_size, + max_days_since_added=parameters.max_days_since_added, + furnish_types=parameters.furnish_types or [], + session=session, + config=config, + ) + results.append(result) + total_pages_fetched += 1 + + # Check if we've received all results + properties = result.get("properties", []) + if len(properties) < page_size: + # No more results on next page + break + + except CircuitBreakerOpenError as e: + celery_logger.error(f"Circuit breaker open: {e}") break - - except Exception as e: - if "GENERIC_ERROR" in str(e): - # Reached end of results - logger.debug( - f"Max page for {sq.district}: {page_id - 1}" + except ThrottlingError as e: + celery_logger.warning( + f"Throttling on {sq.district} page {page_id}: {e}" + ) + break + except Exception as e: + if "GENERIC_ERROR" in str(e): + # Reached end of results + logger.debug( + f"Max page for {sq.district}: {page_id - 1}" + ) + break + logger.warning( + f"Error fetching page {page_id} for {sq.district}: {e}" ) break - logger.warning( - f"Error fetching page {page_id} for {sq.district}: {e}" - ) - break - return results + completed_subqueries += 1 + return results - # Fetch all subqueries concurrently - all_results = await asyncio.gather( - *[fetch_subquery(sq) for sq in subqueries] - ) + # Fetch all subqueries concurrently + all_results = await asyncio.gather( + *[fetch_subquery(sq) for sq in subqueries] + ) - # Extract identifiers from all results - for subquery_results in all_results: - for response_json in subquery_results: - if not response_json: - continue - if response_json.get("totalAvailableResults", 0) == 0: - continue - for property_data in response_json.get("properties", []): - identifier = property_data.get("identifier") - if identifier: - identifiers.add(identifier) + celery_logger.info( + f"Fetch complete: {total_pages_fetched} pages from " + f"{completed_subqueries} subqueries" + ) + # Extract identifiers from all results + for subquery_results in all_results: + for response_json in subquery_results: + if not response_json: + continue + if response_json.get("totalAvailableResults", 0) == 0: + continue + for property_data in response_json.get("properties", []): + identifier = property_data.get("identifier") + if identifier: + identifiers.add(identifier) + + except CircuitBreakerOpenError as e: + celery_logger.error(f"Circuit breaker prevented query: {e}") + # Log throttle metrics + metrics = get_throttle_metrics() + if metrics.total_requests > 0: + celery_logger.info(metrics.summary()) + return set() + finally: + # Log throttle metrics + metrics = get_throttle_metrics() + if metrics.total_requests > 0: + celery_logger.info(f"API Stats: {metrics.total_requests} requests, " + f"avg {metrics.average_response_time:.2f}s, " + f"{metrics.total_throttling_events} throttled") + + celery_logger.info(f"Found {len(identifiers)} unique listing IDs from API") logger.info(f"Found {len(identifiers)} unique listings") # Filter out listings already in the database - all_listing_ids = {l.id for l in await repository.get_listings()} + celery_logger.info("Checking database for existing listings...") + all_listing_ids = {listing.id for listing in await repository.get_listings()} new_ids = identifiers - all_listing_ids + celery_logger.info( + f"Filtering: {len(identifiers)} total, " + f"{len(all_listing_ids)} existing in DB, " + f"{len(new_ids)} new to process" + ) + task.update_state( state=f"Found {len(new_ids)} new listings to process", meta={