From d1cef99c5a34d8d14ddc2bbf052bb660e1910a1d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 20:09:41 +0000 Subject: [PATCH] make task processing a bit better. still doing 1 query to check if needs processing; will fix later --- crawler/listing_processor.py | 17 +++++++++++++++++ crawler/tasks/listing_tasks.py | 31 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py index 6955050..91943b3 100644 --- a/crawler/listing_processor.py +++ b/crawler/listing_processor.py @@ -54,6 +54,14 @@ class Step: class FetchListingDetailsStep(Step): + async def needs_processing(self, listing_id: int) -> bool: + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if (existing_listings) == 0: + return True + return False + async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching details for {listing_id=}") existing_listings = await self.listing_repository.get_listings( @@ -115,6 +123,15 @@ class FetchListingDetailsStep(Step): class FetchImagesStep(Step): + async def needs_processing(self, listing_id: int) -> bool: + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if len(existing_listings) == 0: + return False # if listing doesn't exist, we can't process it + listing = existing_listings[0] + return len(listing.floorplan_image_paths) == 0 + async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching images for {listing_id=}") existing_listings = await self.listing_repository.get_listings( diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index ecf3626..3738d32 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -39,13 +39,15 @@ async def dump_listings_full( repository = ListingRepository(engine) task.update_state(state="Identifying missing listings", meta={"progress": 0}) - missing_ids = await get_missing_listing_ids(parameters, repository) - logger.info(f"Found {len(missing_ids)} missing listings") + ids_to_process = await get_ids_to_process( + parameters=parameters, repository=repository, task=task + ) + logger.info(f"Found {len(ids_to_process)} listings to process") listing_processor = ListingProcessor(repository) - logger.info(f"Starting processing {len(missing_ids)} new listings") + logger.info(f"Starting processing {len(ids_to_process)} listings") return await dump_listings_and_monitor( - task=task, listing_processor=listing_processor, missing_ids=missing_ids + task=task, listing_processor=listing_processor, missing_ids=ids_to_process ) @@ -63,7 +65,7 @@ async def dump_listings_and_monitor( while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) logger.error( - f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" + f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})" ) task.update_state( state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", @@ -96,15 +98,20 @@ def setup_periodic_tasks(sender, **kwargs): ) -async def get_missing_listing_ids( +async def get_ids_to_process( + *, parameters: QueryParameters, repository: ListingRepository, + task: Task, ) -> set[int]: semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections districts = await get_valid_districts_to_scrape(parameters.district_names) + task.update_state(state="Fetching listings to scrape", meta={"progress": 0}) json_responses: list[list[dict[str, Any]]] = await asyncio.gather( *[ - _fetch_listings_with_semaphore(semaphore, parameters, district) + _fetch_listings_with_semaphore( + task=task, semaphore=semaphore, parameters=parameters, district=district + ) for district in districts.keys() ], ) @@ -123,8 +130,8 @@ async def get_missing_listing_ids( # if listing is already in db, do not fetch details again all_listing_ids = {l.id for l in await repository.get_listings()} - missing_ids = all_listing_ids - identifiers - return missing_ids + all_ids = all_listing_ids.union(identifiers) + return all_ids async def get_valid_districts_to_scrape( @@ -142,6 +149,8 @@ async def get_valid_districts_to_scrape( async def _fetch_listings_with_semaphore( + *, + task: Task, semaphore: asyncio.Semaphore, parameters: QueryParameters, district: str, @@ -156,6 +165,10 @@ async def _fetch_listings_with_semaphore( price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): + task.update_state( + state=f"Fetching listings ({step} out of {number_of_steps})", + meta={"progress": step / number_of_steps}, + ) min_price = step * price_step max_price = (step + 1) * price_step logger.debug(