From 03d24aff9952ce4ffeab8e48c2c92fe527e45e68 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 22 Jun 2025 12:43:24 +0000 Subject: [PATCH] refactor dump listings to extract all pages with data --- crawler/1_dump_listings.py | 62 ++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 23 deletions(-) diff --git a/crawler/1_dump_listings.py b/crawler/1_dump_listings.py index 537c997..8379940 100644 --- a/crawler/1_dump_listings.py +++ b/crawler/1_dump_listings.py @@ -1,11 +1,14 @@ import asyncio import importlib +import itertools import json import pathlib from typing import Any +from logger import get_logger from rec.query import detail_query, listing_query, QueryParameters from rec.districts import get_districts from repositories import ListingRepository +import requests from tqdm.asyncio import tqdm from data_access import Listing from models import Listing as modelListing @@ -13,6 +16,8 @@ from models import Listing as modelListing dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") +logger = get_logger(__file__) + async def dump_listings_full( parameters: QueryParameters, @@ -45,16 +50,18 @@ async def dump_listings( print("Valid districts to scrape:", districts.keys()) semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections - json_responses = await tqdm.gather( + json_responses: list[list[dict[str, Any]]] = await tqdm.gather( *[ - _fetch_listings_with_semaphore(semaphore, i, parameters, locid) - for locid in districts.values() - for i in [1, 2] + _fetch_listings_with_semaphore(semaphore, parameters, district) + for district in districts.keys() ], desc="Fetching listings", ) + json_responses_flat = list(itertools.chain.from_iterable(json_responses)) listings: list[Listing] = [] - for response_json in json_responses: + for response_json in json_responses_flat: + if response_json == {}: + continue if response_json["totalAvailableResults"] == 0: continue for property in response_json["properties"]: @@ -88,26 +95,35 @@ async def dump_listings( async def _fetch_listings_with_semaphore( semaphore: asyncio.Semaphore, - page_id: int, parameters: QueryParameters, - location_id: str, -) -> dict[str, Any]: - async with semaphore: - listing_query_result = await listing_query( - page=page_id, - channel=parameters.listing_type, - min_bedrooms=parameters.min_bedrooms, - max_bedrooms=parameters.max_bedrooms, - radius=parameters.radius, - min_price=parameters.min_price, - max_price=parameters.max_price, - location_id=location_id, - page_size=parameters.page_size, - max_days_since_added=parameters.max_days_since_added, - furnish_types=parameters.furnish_types or [], - ) + district: str, +) -> list[dict[str, Any]]: + result = [] + # we don't know how many pages we have but we stop as soon as there's no more + for page_id in range(999): + async with semaphore: + try: + listing_query_result = await listing_query( + page=page_id, + channel=parameters.listing_type, + min_bedrooms=parameters.min_bedrooms, + max_bedrooms=parameters.max_bedrooms, + radius=parameters.radius, + min_price=parameters.min_price, + max_price=parameters.max_price, + district=district, + page_size=parameters.page_size, + max_days_since_added=parameters.max_days_since_added, + furnish_types=parameters.furnish_types or [], + ) - return listing_query_result + except Exception as e: + if "GENERIC_ERROR" in str(e): # Too big page id + logger.debug(f"Max page id for {district=}: {page_id-1}") + break + raise e + result.append(listing_query_result) + return result async def _fetch_detail_with_semaphore(