diff --git a/crawler/1_dump_listings.py b/crawler/1_dump_listings.py index aacb787..aeef2c5 100644 --- a/crawler/1_dump_listings.py +++ b/crawler/1_dump_listings.py @@ -5,6 +5,7 @@ import json import logging import pathlib from typing import Any +from listing_processor import ListingProcessor from rec.query import detail_query, listing_query, QueryParameters from rec.districts import get_districts from repositories import ListingRepository @@ -27,12 +28,12 @@ async def dump_listings_full( """Fetches all listings, images as well as detects floorplans""" new_listings = await dump_listings(parameters, repository, data_dir) logger.debug(f"Upserted {len(new_listings)} new listings") - logger.debug("Starting to fetch floorplans") - await dump_images_module.dump_images(repository, image_base_path=data_dir) - logger.debug("Completed fetching floorplans") - logger.debug("Starting floorplan detection") - await detect_floorplan_module.detect_floorplan(repository) - logger.debug("Completed floorplan detection") + # logger.debug("Starting to fetch floorplans") + # await dump_images_module.dump_images(repository, image_base_path=data_dir) + # logger.debug("Completed fetching floorplans") + # logger.debug("Starting floorplan detection") + # await detect_floorplan_module.detect_floorplan(repository) + # logger.debug("Completed floorplan detection") # refresh listings listings = await repository.get_listings(parameters) # this can be better new_listings = [l for l in listings if l.id in new_listings] @@ -81,25 +82,16 @@ async def dump_listings( missing_listing = [ listing for listing in listings if listing.identifier not in all_listing_ids ] - logger.debug(f"Fetching details for {len(missing_listing)} missing listings") - listing_details = await tqdm.gather( - *[ - _fetch_detail_with_semaphore(semaphore, listing.identifier) - for listing in missing_listing - ], - desc="Fetching details (only missing)", + missing_ids = [listing.identifier for listing in missing_listing] + missing_ids = [missing_ids[0]] + listing_processor = ListingProcessor(repository) + logger.info(f"Starting processing {len(missing_listing)} new listings") + processed_listings = await tqdm.gather( + *[listing_processor.process_listing(id) for id in missing_ids] ) - for listing, detail in zip(missing_listing, listing_details): - listing._details_object = detail + filtered_listings = [l for l in processed_listings if l is not None] - logger.debug("Dumping listings to fs") - await dump_listings_to_fs(missing_listing) - logger.debug("Upserting listings in db") - model_listings = await repository.upsert_listings_legacy( - missing_listing - ) # upsert in db - - return model_listings + return filtered_listings async def _fetch_listings_with_semaphore( @@ -113,7 +105,7 @@ async def _fetch_listings_with_semaphore( # we do 10 queries each with an increment in price range so we send more queries but each # has a smaller chance of returning more than 1.5k results - number_of_steps = 10 + number_of_steps = 1 price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): @@ -157,14 +149,6 @@ async def _fetch_listings_with_semaphore( return result -async def _fetch_detail_with_semaphore( - semaphore: asyncio.Semaphore, listing_id: int -) -> dict[str, Any]: - async with semaphore: - d = await detail_query(listing_id) - return d - - async def dump_listings_to_fs(listings: list[Listing]) -> None: for listing in tqdm(listings, desc="Dumping listings to FS"): listing.dump_listing() diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py index 91943b3..7414a0a 100644 --- a/crawler/listing_processor.py +++ b/crawler/listing_processor.py @@ -33,7 +33,11 @@ class ListingProcessor: for step in self.process_steps: if await step.needs_processing(listing_id): async with self.semaphore: - listing = await step.process(listing_id) + try: + listing = await step.process(listing_id) + except Exception as e: + logger.error(f"Failed to process {listing_id=}: {e}") + return None return listing async def listing_exists(self, listing_id: int) -> bool: ... @@ -145,6 +149,7 @@ class FetchImagesStep(Step): all_floorplans = listing.additional_info.get("property", {}).get( "floorplans", [] ) + client_timeout = aiohttp.ClientTimeout(total=30) for floorplan in all_floorplans: url = floorplan["url"] picname = url.split("/")[-1] @@ -152,7 +157,7 @@ class FetchImagesStep(Step): if floorplan_path.exists(): continue async with aiohttp.ClientSession() as session: - async with session.get(url) as response: + async with session.get(url, timeout=client_timeout) as response: if response.status == 404: return listing if response.status != 200: