import asyncio import importlib import json import pathlib from typing import Any from rec.query import detail_query, listing_query, QueryParameters from rec.districts import get_districts from repositories import ListingRepository from tqdm.asyncio import tqdm from data_access import Listing from models import Listing as modelListing dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") async def dump_listings_full( parameters: QueryParameters, repository: ListingRepository, data_dir: pathlib.Path = pathlib.Path("data/rs/"), ) -> list[modelListing]: """Fetches all listings, images as well as detects floorplans""" new_listings = await dump_listings(parameters, repository, data_dir) await dump_images_module.dump_images(repository, image_base_path=data_dir) await detect_floorplan_module.detect_floorplan(repository) # refresh listings listings = await repository.get_listings(parameters) # this can be better new_listings = [l for l in listings if l.id in new_listings] return new_listings async def dump_listings( parameters: QueryParameters, repository: ListingRepository, data_dir: pathlib.Path = pathlib.Path("data/rs/"), ) -> list[modelListing]: if parameters.district_names: districts = { district: locid for district, locid in get_districts().items() if district in parameters.district_names } else: districts = get_districts() print("Valid districts to scrape:", districts.keys()) semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections json_responses = await tqdm.gather( *[ _fetch_listings_with_semaphore(semaphore, i, parameters, locid) for locid in districts.values() for i in [1, 2] ], desc="Fetching listings", ) listings: list[Listing] = [] for response_json in json_responses: if response_json["totalAvailableResults"] == 0: continue for property in response_json["properties"]: identifier = property["identifier"] listing = Listing(identifier, data_dir=data_dir, _listing_object=property) listings.append(listing) # if listing is already in db, do not fetch details again all_listing_ids = [l.id for l in await repository.get_listings()] missing_listing = [ listing for listing in listings if listing.identifier not in all_listing_ids ] listing_details = await tqdm.gather( *[ _fetch_detail_with_semaphore(semaphore, listing.identifier) for listing in missing_listing ], desc="Fetching details (only missing)", ) for listing, detail in zip(missing_listing, listing_details): listing._details_object = detail await dump_listings_to_fs(missing_listing) model_listings = await repository.upsert_listings_legacy( missing_listing ) # upsert in db return model_listings async def _fetch_listings_with_semaphore( semaphore: asyncio.Semaphore, page_id: int, parameters: QueryParameters, location_id: str, ) -> dict[str, Any]: async with semaphore: listing_query_result = await listing_query( page=page_id, channel=parameters.listing_type, min_bedrooms=parameters.min_bedrooms, max_bedrooms=parameters.max_bedrooms, radius=parameters.radius, min_price=parameters.min_price, max_price=parameters.max_price, location_id=location_id, page_size=parameters.page_size, max_days_since_added=parameters.max_days_since_added, furnish_types=parameters.furnish_types or [], ) return listing_query_result async def _fetch_detail_with_semaphore( semaphore: asyncio.Semaphore, listing_id: int ) -> dict[str, Any]: async with semaphore: d = await detail_query(listing_id) return d async def dump_listings_to_fs(listings: list[Listing]) -> None: for listing in tqdm(listings, desc="Dumping listings to FS"): listing.dump_listing() # if not listing.path_detail_json().exists(): with open(listing.path_detail_json(), "w") as f: json.dump(listing._details_object, f, indent=4)