refactor dump listings to extract all pages with data

This commit is contained in:
Viktor Barzin 2025-06-22 12:43:24 +00:00
parent fbd39bb67f
commit 03d24aff99
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863

View file

@ -1,11 +1,14 @@
import asyncio import asyncio
import importlib import importlib
import itertools
import json import json
import pathlib import pathlib
from typing import Any from typing import Any
from logger import get_logger
from rec.query import detail_query, listing_query, QueryParameters from rec.query import detail_query, listing_query, QueryParameters
from rec.districts import get_districts from rec.districts import get_districts
from repositories import ListingRepository from repositories import ListingRepository
import requests
from tqdm.asyncio import tqdm from tqdm.asyncio import tqdm
from data_access import Listing from data_access import Listing
from models import Listing as modelListing from models import Listing as modelListing
@ -13,6 +16,8 @@ from models import Listing as modelListing
dump_images_module = importlib.import_module("3_dump_images") dump_images_module = importlib.import_module("3_dump_images")
detect_floorplan_module = importlib.import_module("4_detect_floorplan") detect_floorplan_module = importlib.import_module("4_detect_floorplan")
logger = get_logger(__file__)
async def dump_listings_full( async def dump_listings_full(
parameters: QueryParameters, parameters: QueryParameters,
@ -45,16 +50,18 @@ async def dump_listings(
print("Valid districts to scrape:", districts.keys()) print("Valid districts to scrape:", districts.keys())
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
json_responses = await tqdm.gather( json_responses: list[list[dict[str, Any]]] = await tqdm.gather(
*[ *[
_fetch_listings_with_semaphore(semaphore, i, parameters, locid) _fetch_listings_with_semaphore(semaphore, parameters, district)
for locid in districts.values() for district in districts.keys()
for i in [1, 2]
], ],
desc="Fetching listings", desc="Fetching listings",
) )
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
listings: list[Listing] = [] listings: list[Listing] = []
for response_json in json_responses: for response_json in json_responses_flat:
if response_json == {}:
continue
if response_json["totalAvailableResults"] == 0: if response_json["totalAvailableResults"] == 0:
continue continue
for property in response_json["properties"]: for property in response_json["properties"]:
@ -88,26 +95,35 @@ async def dump_listings(
async def _fetch_listings_with_semaphore( async def _fetch_listings_with_semaphore(
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
page_id: int,
parameters: QueryParameters, parameters: QueryParameters,
location_id: str, district: str,
) -> dict[str, Any]: ) -> list[dict[str, Any]]:
async with semaphore: result = []
listing_query_result = await listing_query( # we don't know how many pages we have but we stop as soon as there's no more
page=page_id, for page_id in range(999):
channel=parameters.listing_type, async with semaphore:
min_bedrooms=parameters.min_bedrooms, try:
max_bedrooms=parameters.max_bedrooms, listing_query_result = await listing_query(
radius=parameters.radius, page=page_id,
min_price=parameters.min_price, channel=parameters.listing_type,
max_price=parameters.max_price, min_bedrooms=parameters.min_bedrooms,
location_id=location_id, max_bedrooms=parameters.max_bedrooms,
page_size=parameters.page_size, radius=parameters.radius,
max_days_since_added=parameters.max_days_since_added, min_price=parameters.min_price,
furnish_types=parameters.furnish_types or [], max_price=parameters.max_price,
) district=district,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
return listing_query_result except Exception as e:
if "GENERIC_ERROR" in str(e): # Too big page id
logger.debug(f"Max page id for {district=}: {page_id-1}")
break
raise e
result.append(listing_query_result)
return result
async def _fetch_detail_with_semaphore( async def _fetch_detail_with_semaphore(