import asyncio import importlib import itertools import json import logging import pathlib from typing import Any from listing_processor import ListingProcessor from rec.query import listing_query, QueryParameters from rec.districts import get_districts from repositories import ListingRepository from tqdm.asyncio import tqdm from data_access import Listing from models import Listing as modelListing dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") logger = logging.getLogger("uvicorn.error") async def dump_listings_full( parameters: QueryParameters, repository: ListingRepository, data_dir: pathlib.Path = pathlib.Path("data/rs/"), ) -> list[modelListing]: """Fetches all listings, images as well as detects floorplans""" new_listings = await dump_listings(parameters, repository, data_dir) logger.debug(f"Upserted {len(new_listings)} new listings") # logger.debug("Starting to fetch floorplans") # await dump_images_module.dump_images(repository, image_base_path=data_dir) # logger.debug("Completed fetching floorplans") # logger.debug("Starting floorplan detection") # await detect_floorplan_module.detect_floorplan(repository) # logger.debug("Completed floorplan detection") # refresh listings listings = await repository.get_listings(parameters) # this can be better new_listings = [l for l in listings if l.id in new_listings] return new_listings async def dump_listings( parameters: QueryParameters, repository: ListingRepository, data_dir: pathlib.Path = pathlib.Path("data/rs/"), ) -> list[modelListing]: if parameters.district_names: districts = { district: locid for district, locid in get_districts().items() if district in parameters.district_names } else: districts = get_districts() logger.debug("Valid districts to scrape:", districts.keys()) semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections json_responses: list[list[dict[str, Any]]] = await tqdm.gather( *[ _fetch_listings_with_semaphore(semaphore, parameters, district) for district in districts.keys() ], desc="Fetching listings", ) json_responses_flat = list(itertools.chain.from_iterable(json_responses)) logger.debug(f"Total listings fetched {len(json_responses_flat)}") listings: list[Listing] = [] for response_json in json_responses_flat: if response_json == {}: continue if response_json["totalAvailableResults"] == 0: continue for property in response_json["properties"]: identifier = property["identifier"] listing = Listing(identifier, data_dir=data_dir, _listing_object=property) listings.append(listing) # if listing is already in db, do not fetch details again all_listing_ids = [l.id for l in await repository.get_listings()] missing_listing = [ listing for listing in listings if listing.identifier not in all_listing_ids ] missing_ids = [listing.identifier for listing in missing_listing] missing_ids = [missing_ids[0]] listing_processor = ListingProcessor(repository) logger.info(f"Starting processing {len(missing_listing)} new listings") processed_listings = await tqdm.gather( *[listing_processor.process_listing(id) for id in missing_ids] ) filtered_listings = [l for l in processed_listings if l is not None] return filtered_listings async def _fetch_listings_with_semaphore( semaphore: asyncio.Semaphore, parameters: QueryParameters, district: str, ) -> list[dict[str, Any]]: result = [] # split the price in N bands to avoid the 1.5k capping by rightmove # basically instead of 1 query with price between 1k and 5k that is capped at 1500 results # we do 10 queries each with an increment in price range so we send more queries but each # has a smaller chance of returning more than 1.5k results number_of_steps = 1 price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): min_price = step * price_step max_price = (step + 1) * price_step logger.debug( f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}" ) for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1): for page_id in range( 1, 3, # seems like all searches stop at 1500 entries (page_id * page_size) ): logger.debug(f"Processing {page_id=} for {district=}") async with semaphore: try: listing_query_result = await listing_query( page=page_id, channel=parameters.listing_type, # min_bedrooms=parameters.min_bedrooms, # max_bedrooms=parameters.max_bedrooms, min_bedrooms=num_bedrooms, max_bedrooms=num_bedrooms, radius=parameters.radius, min_price=min_price, max_price=max_price, district=district, page_size=parameters.page_size, max_days_since_added=parameters.max_days_since_added, furnish_types=parameters.furnish_types or [], ) except Exception as e: if "GENERIC_ERROR" in str(e): # Too big page id logger.debug(f"Max page id for {district=}: {page_id-1}") break raise e result.append(listing_query_result) return result async def dump_listings_to_fs(listings: list[Listing]) -> None: for listing in tqdm(listings, desc="Dumping listings to FS"): listing.dump_listing() # if not listing.path_detail_json().exists(): with open(listing.path_detail_json(), "w") as f: json.dump(listing._details_object, f, indent=4)