make task processing a bit better. still doing 1 query to check if needs processing; will fix later

This commit is contained in:
Viktor Barzin 2025-07-27 20:09:41 +00:00
parent 87efe0694c
commit d1cef99c5a
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
2 changed files with 39 additions and 9 deletions

View file

@ -54,6 +54,14 @@ class Step:
class FetchListingDetailsStep(Step): class FetchListingDetailsStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if (existing_listings) == 0:
return True
return False
async def process(self, listing_id: int) -> Listing: async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching details for {listing_id=}") logger.debug(f"Fetching details for {listing_id=}")
existing_listings = await self.listing_repository.get_listings( existing_listings = await self.listing_repository.get_listings(
@ -115,6 +123,15 @@ class FetchListingDetailsStep(Step):
class FetchImagesStep(Step): class FetchImagesStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
return False # if listing doesn't exist, we can't process it
listing = existing_listings[0]
return len(listing.floorplan_image_paths) == 0
async def process(self, listing_id: int) -> Listing: async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching images for {listing_id=}") logger.debug(f"Fetching images for {listing_id=}")
existing_listings = await self.listing_repository.get_listings( existing_listings = await self.listing_repository.get_listings(

View file

@ -39,13 +39,15 @@ async def dump_listings_full(
repository = ListingRepository(engine) repository = ListingRepository(engine)
task.update_state(state="Identifying missing listings", meta={"progress": 0}) task.update_state(state="Identifying missing listings", meta={"progress": 0})
missing_ids = await get_missing_listing_ids(parameters, repository) ids_to_process = await get_ids_to_process(
logger.info(f"Found {len(missing_ids)} missing listings") parameters=parameters, repository=repository, task=task
)
logger.info(f"Found {len(ids_to_process)} listings to process")
listing_processor = ListingProcessor(repository) listing_processor = ListingProcessor(repository)
logger.info(f"Starting processing {len(missing_ids)} new listings") logger.info(f"Starting processing {len(ids_to_process)} listings")
return await dump_listings_and_monitor( return await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=missing_ids task=task, listing_processor=listing_processor, missing_ids=ids_to_process
) )
@ -63,7 +65,7 @@ async def dump_listings_and_monitor(
while (progress := sum(task_progress.values())) < len(missing_ids): while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2) progress_ratio = round(progress / len(missing_ids), 2)
logger.error( logger.error(
f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})"
) )
task.update_state( task.update_state(
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
@ -96,15 +98,20 @@ def setup_periodic_tasks(sender, **kwargs):
) )
async def get_missing_listing_ids( async def get_ids_to_process(
*,
parameters: QueryParameters, parameters: QueryParameters,
repository: ListingRepository, repository: ListingRepository,
task: Task,
) -> set[int]: ) -> set[int]:
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
districts = await get_valid_districts_to_scrape(parameters.district_names) districts = await get_valid_districts_to_scrape(parameters.district_names)
task.update_state(state="Fetching listings to scrape", meta={"progress": 0})
json_responses: list[list[dict[str, Any]]] = await asyncio.gather( json_responses: list[list[dict[str, Any]]] = await asyncio.gather(
*[ *[
_fetch_listings_with_semaphore(semaphore, parameters, district) _fetch_listings_with_semaphore(
task=task, semaphore=semaphore, parameters=parameters, district=district
)
for district in districts.keys() for district in districts.keys()
], ],
) )
@ -123,8 +130,8 @@ async def get_missing_listing_ids(
# if listing is already in db, do not fetch details again # if listing is already in db, do not fetch details again
all_listing_ids = {l.id for l in await repository.get_listings()} all_listing_ids = {l.id for l in await repository.get_listings()}
missing_ids = all_listing_ids - identifiers all_ids = all_listing_ids.union(identifiers)
return missing_ids return all_ids
async def get_valid_districts_to_scrape( async def get_valid_districts_to_scrape(
@ -142,6 +149,8 @@ async def get_valid_districts_to_scrape(
async def _fetch_listings_with_semaphore( async def _fetch_listings_with_semaphore(
*,
task: Task,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
parameters: QueryParameters, parameters: QueryParameters,
district: str, district: str,
@ -156,6 +165,10 @@ async def _fetch_listings_with_semaphore(
price_step = parameters.max_price // number_of_steps price_step = parameters.max_price // number_of_steps
for step in range(number_of_steps): for step in range(number_of_steps):
task.update_state(
state=f"Fetching listings ({step} out of {number_of_steps})",
meta={"progress": step / number_of_steps},
)
min_price = step * price_step min_price = step * price_step
max_price = (step + 1) * price_step max_price = (step + 1) * price_step
logger.debug( logger.debug(