add timeout when fetching details and use new entrypoint for task processing

This commit is contained in:
Viktor Barzin 2025-08-23 22:20:42 +00:00
parent e5c68f6bb7
commit 480957dc72
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
2 changed files with 23 additions and 34 deletions

View file

@ -5,6 +5,7 @@ import json
import logging import logging
import pathlib import pathlib
from typing import Any from typing import Any
from listing_processor import ListingProcessor
from rec.query import detail_query, listing_query, QueryParameters from rec.query import detail_query, listing_query, QueryParameters
from rec.districts import get_districts from rec.districts import get_districts
from repositories import ListingRepository from repositories import ListingRepository
@ -27,12 +28,12 @@ async def dump_listings_full(
"""Fetches all listings, images as well as detects floorplans""" """Fetches all listings, images as well as detects floorplans"""
new_listings = await dump_listings(parameters, repository, data_dir) new_listings = await dump_listings(parameters, repository, data_dir)
logger.debug(f"Upserted {len(new_listings)} new listings") logger.debug(f"Upserted {len(new_listings)} new listings")
logger.debug("Starting to fetch floorplans") # logger.debug("Starting to fetch floorplans")
await dump_images_module.dump_images(repository, image_base_path=data_dir) # await dump_images_module.dump_images(repository, image_base_path=data_dir)
logger.debug("Completed fetching floorplans") # logger.debug("Completed fetching floorplans")
logger.debug("Starting floorplan detection") # logger.debug("Starting floorplan detection")
await detect_floorplan_module.detect_floorplan(repository) # await detect_floorplan_module.detect_floorplan(repository)
logger.debug("Completed floorplan detection") # logger.debug("Completed floorplan detection")
# refresh listings # refresh listings
listings = await repository.get_listings(parameters) # this can be better listings = await repository.get_listings(parameters) # this can be better
new_listings = [l for l in listings if l.id in new_listings] new_listings = [l for l in listings if l.id in new_listings]
@ -81,25 +82,16 @@ async def dump_listings(
missing_listing = [ missing_listing = [
listing for listing in listings if listing.identifier not in all_listing_ids listing for listing in listings if listing.identifier not in all_listing_ids
] ]
logger.debug(f"Fetching details for {len(missing_listing)} missing listings") missing_ids = [listing.identifier for listing in missing_listing]
listing_details = await tqdm.gather( missing_ids = [missing_ids[0]]
*[ listing_processor = ListingProcessor(repository)
_fetch_detail_with_semaphore(semaphore, listing.identifier) logger.info(f"Starting processing {len(missing_listing)} new listings")
for listing in missing_listing processed_listings = await tqdm.gather(
], *[listing_processor.process_listing(id) for id in missing_ids]
desc="Fetching details (only missing)",
) )
for listing, detail in zip(missing_listing, listing_details): filtered_listings = [l for l in processed_listings if l is not None]
listing._details_object = detail
logger.debug("Dumping listings to fs") return filtered_listings
await dump_listings_to_fs(missing_listing)
logger.debug("Upserting listings in db")
model_listings = await repository.upsert_listings_legacy(
missing_listing
) # upsert in db
return model_listings
async def _fetch_listings_with_semaphore( async def _fetch_listings_with_semaphore(
@ -113,7 +105,7 @@ async def _fetch_listings_with_semaphore(
# we do 10 queries each with an increment in price range so we send more queries but each # we do 10 queries each with an increment in price range so we send more queries but each
# has a smaller chance of returning more than 1.5k results # has a smaller chance of returning more than 1.5k results
number_of_steps = 10 number_of_steps = 1
price_step = parameters.max_price // number_of_steps price_step = parameters.max_price // number_of_steps
for step in range(number_of_steps): for step in range(number_of_steps):
@ -157,14 +149,6 @@ async def _fetch_listings_with_semaphore(
return result return result
async def _fetch_detail_with_semaphore(
semaphore: asyncio.Semaphore, listing_id: int
) -> dict[str, Any]:
async with semaphore:
d = await detail_query(listing_id)
return d
async def dump_listings_to_fs(listings: list[Listing]) -> None: async def dump_listings_to_fs(listings: list[Listing]) -> None:
for listing in tqdm(listings, desc="Dumping listings to FS"): for listing in tqdm(listings, desc="Dumping listings to FS"):
listing.dump_listing() listing.dump_listing()

View file

@ -33,7 +33,11 @@ class ListingProcessor:
for step in self.process_steps: for step in self.process_steps:
if await step.needs_processing(listing_id): if await step.needs_processing(listing_id):
async with self.semaphore: async with self.semaphore:
listing = await step.process(listing_id) try:
listing = await step.process(listing_id)
except Exception as e:
logger.error(f"Failed to process {listing_id=}: {e}")
return None
return listing return listing
async def listing_exists(self, listing_id: int) -> bool: ... async def listing_exists(self, listing_id: int) -> bool: ...
@ -145,6 +149,7 @@ class FetchImagesStep(Step):
all_floorplans = listing.additional_info.get("property", {}).get( all_floorplans = listing.additional_info.get("property", {}).get(
"floorplans", [] "floorplans", []
) )
client_timeout = aiohttp.ClientTimeout(total=30)
for floorplan in all_floorplans: for floorplan in all_floorplans:
url = floorplan["url"] url = floorplan["url"]
picname = url.split("/")[-1] picname = url.split("/")[-1]
@ -152,7 +157,7 @@ class FetchImagesStep(Step):
if floorplan_path.exists(): if floorplan_path.exists():
continue continue
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url) as response: async with session.get(url, timeout=client_timeout) as response:
if response.status == 404: if response.status == 404:
return listing return listing
if response.status != 200: if response.status != 200: