wrongmove/crawler/1_dump_listings.py

108 lines
3.5 KiB
Python
Raw Normal View History

2025-05-17 21:55:42 +00:00
import asyncio
import json
import pathlib
from typing import Any
from rec.query import detail_query, listing_query, QueryParameters
from rec.districts import get_districts
from repositories import ListingRepository
from tqdm.asyncio import tqdm
from data_access import Listing
from models import Listing as modelListing
2025-05-17 21:55:42 +00:00
async def dump_listings(
2025-05-31 23:50:43 +00:00
parameters: QueryParameters,
repository: ListingRepository,
2025-05-31 23:50:43 +00:00
data_dir: pathlib.Path = pathlib.Path("data/rs/"),
) -> list[modelListing]:
if parameters.district_names:
districts = {
district: locid
for district, locid in get_districts().items()
if district in parameters.district_names
}
else:
districts = get_districts()
print("Valid districts to scrape:", districts.keys())
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
json_responses = await tqdm.gather(
2025-05-31 23:50:43 +00:00
*[
_fetch_listings_with_semaphore(semaphore, i, parameters, locid)
2025-05-31 23:50:43 +00:00
for locid in districts.values()
for i in [1, 2]
],
desc="Fetching listings",
2025-05-31 23:50:43 +00:00
)
listings: list[Listing] = []
2025-05-17 21:55:42 +00:00
for response_json in json_responses:
if response_json["totalAvailableResults"] == 0:
continue
for property in response_json["properties"]:
identifier = property["identifier"]
listing = Listing(identifier, data_dir=data_dir, _listing_object=property)
2025-05-17 21:55:42 +00:00
listings.append(listing)
# if listing is already in db, do not fetch details again
all_listing_ids = [l.id for l in await repository.get_listings()]
missing_listing = [
listing for listing in listings if listing.identifier not in all_listing_ids
]
listing_details = await tqdm.gather(
*[
_fetch_detail_with_semaphore(semaphore, listing.identifier)
for listing in missing_listing
],
2025-06-08 20:58:28 +00:00
desc="Fetching details (only missing)",
)
for listing, detail in zip(missing_listing, listing_details):
listing._details_object = detail
await dump_listings_to_fs(missing_listing)
model_listings = await repository.upsert_listings_legacy(
missing_listing
) # upsert in db
return model_listings
async def _fetch_listings_with_semaphore(
semaphore: asyncio.Semaphore,
page_id: int,
parameters: QueryParameters,
location_id: str,
) -> dict[str, Any]:
async with semaphore:
listing_query_result = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=parameters.min_bedrooms,
max_bedrooms=parameters.max_bedrooms,
radius=parameters.radius,
min_price=parameters.min_price,
max_price=parameters.max_price,
location_id=location_id,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
return listing_query_result
async def _fetch_detail_with_semaphore(
semaphore: asyncio.Semaphore, listing_id: int
) -> dict[str, Any]:
async with semaphore:
d = await detail_query(listing_id)
return d
async def dump_listings_to_fs(listings: list[Listing]) -> None:
for listing in tqdm(listings, desc="Dumping listings to FS"):
listing.dump_listing()
# if not listing.path_detail_json().exists():
with open(listing.path_detail_json(), "w") as f:
json.dump(listing._details_object, f, indent=4)