diff --git a/crawler/1_dump_listings.py b/crawler/1_dump_listings.py deleted file mode 100644 index f31f1b5..0000000 --- a/crawler/1_dump_listings.py +++ /dev/null @@ -1,157 +0,0 @@ -import asyncio -import importlib -import itertools -import json -import logging -import pathlib -from typing import Any -from listing_processor import ListingProcessor -from rec.query import listing_query -from models.listing import QueryParameters -from rec.districts import get_districts -from repositories import ListingRepository -from tqdm.asyncio import tqdm -from data_access import Listing -from models import Listing as modelListing - -dump_images_module = importlib.import_module("3_dump_images") -detect_floorplan_module = importlib.import_module("4_detect_floorplan") - -logger = logging.getLogger("uvicorn.error") - - -async def dump_listings_full( - parameters: QueryParameters, - repository: ListingRepository, - data_dir: pathlib.Path = pathlib.Path("data/rs/"), -) -> list[modelListing]: - """Fetches all listings, images as well as detects floorplans""" - new_listings = await dump_listings(parameters, repository, data_dir) - logger.debug(f"Upserted {len(new_listings)} new listings") - # logger.debug("Starting to fetch floorplans") - # await dump_images_module.dump_images(repository, image_base_path=data_dir) - # logger.debug("Completed fetching floorplans") - # logger.debug("Starting floorplan detection") - # await detect_floorplan_module.detect_floorplan(repository) - # logger.debug("Completed floorplan detection") - # refresh listings - listings = await repository.get_listings(parameters) # this can be better - new_listings = [x for x in listings if x.id in new_listings] - return new_listings - - -async def dump_listings( - parameters: QueryParameters, - repository: ListingRepository, - data_dir: pathlib.Path = pathlib.Path("data/rs/"), -) -> list[modelListing]: - if parameters.district_names: - districts = { - district: locid - for district, locid in get_districts().items() - if district in parameters.district_names - } - else: - districts = get_districts() - logger.debug("Valid districts to scrape:", districts.keys()) - - semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections - json_responses: list[list[dict[str, Any]]] = await tqdm.gather( - *[ - _fetch_listings_with_semaphore(semaphore, parameters, district) - for district in districts.keys() - ], - desc="Fetching listings", - ) - json_responses_flat = list(itertools.chain.from_iterable(json_responses)) - logger.debug(f"Total listings fetched {len(json_responses_flat)}") - listings: list[Listing] = [] - for response_json in json_responses_flat: - if response_json == {}: - continue - if response_json["totalAvailableResults"] == 0: - continue - for property in response_json["properties"]: - identifier = property["identifier"] - - listing = Listing(identifier, data_dir=data_dir, _listing_object=property) - listings.append(listing) - - # if listing is already in db, do not fetch details again - all_listing_ids = [x.id for x in await repository.get_listings()] - missing_listing = [ - listing for listing in listings if listing.identifier not in all_listing_ids - ] - missing_ids = [listing.identifier for listing in missing_listing] - missing_ids = [missing_ids[0]] - listing_processor = ListingProcessor(repository) - logger.info(f"Starting processing {len(missing_listing)} new listings") - processed_listings = await tqdm.gather( - *[listing_processor.process_listing(id) for id in missing_ids] - ) - filtered_listings = [x for x in processed_listings if x is not None] - - return filtered_listings - - -async def _fetch_listings_with_semaphore( - semaphore: asyncio.Semaphore, - parameters: QueryParameters, - district: str, -) -> list[dict[str, Any]]: - result = [] - # split the price in N bands to avoid the 1.5k capping by rightmove - # basically instead of 1 query with price between 1k and 5k that is capped at 1500 results - # we do 10 queries each with an increment in price range so we send more queries but each - # has a smaller chance of returning more than 1.5k results - - number_of_steps = 1 - price_step = parameters.max_price // number_of_steps - - for step in range(number_of_steps): - min_price = step * price_step - max_price = (step + 1) * price_step - logger.debug( - f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}" - ) - - for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1): - for page_id in range( - 1, - 3, # seems like all searches stop at 1500 entries (page_id * page_size) - ): - logger.debug(f"Processing {page_id=} for {district=}") - - async with semaphore: - try: - listing_query_result = await listing_query( - page=page_id, - channel=parameters.listing_type, - # min_bedrooms=parameters.min_bedrooms, - # max_bedrooms=parameters.max_bedrooms, - min_bedrooms=num_bedrooms, - max_bedrooms=num_bedrooms, - radius=parameters.radius, - min_price=min_price, - max_price=max_price, - district=district, - page_size=parameters.page_size, - max_days_since_added=parameters.max_days_since_added, - furnish_types=parameters.furnish_types or [], - ) - - except Exception as e: - if "GENERIC_ERROR" in str(e): # Too big page id - logger.debug(f"Max page id for {district=}: {page_id-1}") - break - raise e - result.append(listing_query_result) - return result - - -async def dump_listings_to_fs(listings: list[Listing]) -> None: - for listing in tqdm(listings, desc="Dumping listings to FS"): - listing.dump_listing() - # if not listing.path_detail_json().exists(): - with open(listing.path_detail_json(), "w") as f: - json.dump(listing._details_object, f, indent=4)