import asyncio import itertools import logging from typing import Any from celery import Task from celery.schedules import crontab from celery_app import app from config.schedule_config import SchedulesConfig from listing_processor import ListingProcessor from models.listing import Listing, QueryParameters from rec.districts import get_districts from rec.query import listing_query from repositories.listing_repository import ListingRepository from database import engine from services import image_fetcher, floorplan_detector from utils.redis_lock import redis_lock logger = logging.getLogger("uvicorn.error") SCRAPE_LOCK_NAME = "scrape_listings" @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: logger.warning("Another scrape job is already running, skipping this execution") self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"}) return {"status": "skipped", "reason": "another_job_running"} logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) self.update_state(state="Starting...", meta={"progress": 0}) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) return {"progress": 0} async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: logger.warning("Another scrape job is already running, skipping this execution") return {"status": "skipped", "reason": "another_job_running"} logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) await dump_listings_full(task=Task(), parameters=parsed_parameters) return {"progress": 0} async def dump_listings_full( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" repository = ListingRepository(engine) task.update_state(state="Identifying missing listings", meta={"progress": 0}) ids_to_process = await get_ids_to_process( parameters=parameters, repository=repository, task=task ) logger.info(f"Found {len(ids_to_process)} listings to process") listing_processor = ListingProcessor(repository) logger.info(f"Starting processing {len(ids_to_process)} listings") return await dump_listings_and_monitor( task=task, listing_processor=listing_processor, missing_ids=ids_to_process ) async def dump_listings_and_monitor( *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] ) -> list[Listing]: task_progress = {missing_id: 0 for missing_id in missing_ids} async def process(missing_id: int) -> Listing | None: listing = await listing_processor.process_listing(missing_id) task_progress[missing_id] = 1 return listing async def monitor() -> None: while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) logger.error( f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})" ) task.update_state( state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", meta={"progress": progress_ratio}, ) await asyncio.sleep(1) processed_listings = await asyncio.gather( *[process(id) for id in missing_ids], *[monitor()] ) filtered_listings = [l for l in processed_listings if l is not None] return filtered_listings @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): """Register periodic tasks from environment configuration.""" try: config = SchedulesConfig.from_env() except ValueError as e: logger.error(f"Failed to load schedule configuration: {e}") return for schedule in config.get_enabled_schedules(): logger.info( f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}" ) sender.add_periodic_task( crontab( minute=schedule.minute, hour=schedule.hour, day_of_week=schedule.day_of_week, ), dump_listings_task.s(schedule.to_query_parameters().model_dump_json()), name=schedule.name, ) async def get_ids_to_process( *, parameters: QueryParameters, repository: ListingRepository, task: Task, ) -> set[int]: semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections districts = await get_valid_districts_to_scrape(parameters.district_names) task.update_state(state="Fetching listings to scrape", meta={"progress": 0}) json_responses: list[list[dict[str, Any]]] = await asyncio.gather( *[ _fetch_listings_with_semaphore( task=task, semaphore=semaphore, parameters=parameters, district=district ) for district in districts.keys() ], ) json_responses_flat = list(itertools.chain.from_iterable(json_responses)) logger.debug(f"Total listings fetched {len(json_responses_flat)}") identifiers: set[int] = set() for response_json in json_responses_flat: if response_json == {}: continue if response_json["totalAvailableResults"] == 0: continue for property in response_json["properties"]: identifier = property["identifier"] identifiers.add(identifier) # if listing is already in db, do not fetch details again all_listing_ids = {l.id for l in await repository.get_listings()} all_ids = all_listing_ids.union(identifiers) return all_ids async def get_valid_districts_to_scrape( district_names: set[str] | None, ) -> dict[str, str]: if district_names: districts = { district: locid for district, locid in get_districts().items() if district in district_names } else: districts = get_districts() return districts async def _fetch_listings_with_semaphore( *, task: Task, semaphore: asyncio.Semaphore, parameters: QueryParameters, district: str, ) -> list[dict[str, Any]]: result = [] # split the price in N bands to avoid the 1.5k capping by rightmove # basically instead of 1 query with price between 1k and 5k that is capped at 1500 results # we do 10 queries each with an increment in price range so we send more queries but each # has a smaller chance of returning more than 1.5k results number_of_steps = 10 price_step = parameters.max_price // number_of_steps for step in range(number_of_steps): task.update_state( state=f"Fetching listings ({step} out of {number_of_steps})", meta={"progress": step / number_of_steps}, ) min_price = step * price_step max_price = (step + 1) * price_step logger.debug( f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}" ) for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1): for page_id in range( 1, 3, # seems like all searches stop at 1500 entries (page_id * page_size) ): logger.debug(f"Processing {page_id=} for {district=}") async with semaphore: try: listing_query_result = await listing_query( page=page_id, channel=parameters.listing_type, # min_bedrooms=parameters.min_bedrooms, # max_bedrooms=parameters.max_bedrooms, min_bedrooms=num_bedrooms, max_bedrooms=num_bedrooms, radius=parameters.radius, min_price=min_price, max_price=max_price, district=district, page_size=parameters.page_size, max_days_since_added=parameters.max_days_since_added, furnish_types=parameters.furnish_types or [], ) except Exception as e: if "GENERIC_ERROR" in str(e): # Too big page id logger.debug(f"Max page id for {district=}: {page_id-1}") break raise e result.append(listing_query_result) return result