From 91a0436f7fd1cdf35ffd3a65552da49042a69982 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 27 Jul 2025 18:33:39 +0000 Subject: [PATCH] migrate processing to a pipeline approach where each listing is processed in a pipeline in parallel and status reported back to track progress --- crawler/3_dump_images.py | 2 +- crawler/api/app.py | 2 + crawler/listing_processor.py | 184 +++++++++++++++++++++++++++++++++ crawler/main.py | 1 + crawler/notifications.py | 2 +- crawler/tasks/listing_tasks.py | 182 +++++++++++++++++++++++++++----- 6 files changed, 347 insertions(+), 26 deletions(-) create mode 100644 crawler/listing_processor.py diff --git a/crawler/3_dump_images.py b/crawler/3_dump_images.py index f54f24f..1c8f3f5 100644 --- a/crawler/3_dump_images.py +++ b/crawler/3_dump_images.py @@ -27,7 +27,7 @@ async def dump_images( @retry(wait=wait_random(min=1, max=2), stop=stop_after_attempt(3)) async def dump_images_for_listing(listing: Listing, base_path: Path) -> Listing | None: - all_floorplans = listing.additional_info["property"]["floorplans"] + all_floorplans = listing.additional_info.get("property", {}).get("floorplans", []) for floorplan in all_floorplans: url = floorplan["url"] picname = url.split("/")[-1] diff --git a/crawler/api/app.py b/crawler/api/app.py index fb48a4a..6f6cffd 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -91,6 +91,8 @@ async def refresh_listings( await send_notification( f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}" ) + # await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync + # return {} # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py new file mode 100644 index 0000000..6955050 --- /dev/null +++ b/crawler/listing_processor.py @@ -0,0 +1,184 @@ +from __future__ import annotations +from abc import abstractmethod +import asyncio +from datetime import datetime +import logging +import multiprocessing +from pathlib import Path +import aiohttp +from models.listing import FurnishType, Listing, ListingSite, RentListing +from rec import floorplan +from rec.query import detail_query +from repositories.listing_repository import ListingRepository + +logger = logging.getLogger("uvicorn.error") + + +class ListingProcessor: + semaphore: asyncio.Semaphore + process_steps: list[Step] + + def __init__(self, listing_repository: ListingRepository): + self.semaphore = asyncio.Semaphore(20) + # Register new processing steps here + # Order is important + self.process_steps = [ + FetchListingDetailsStep(listing_repository), + FetchImagesStep(listing_repository), + DetectFloorplanStep(listing_repository), + ] + + async def process_listing(self, listing_id: int) -> Listing | None: + listing = None + for step in self.process_steps: + if await step.needs_processing(listing_id): + async with self.semaphore: + listing = await step.process(listing_id) + return listing + + async def listing_exists(self, listing_id: int) -> bool: ... + + +class Step: + listing_repository: ListingRepository + + def __init__(self, listing_repository: ListingRepository): + self.listing_repository = listing_repository + + @abstractmethod + async def process(self, listing_id: int) -> Listing: ... + + @abstractmethod + async def needs_processing(self, listing_id: int) -> bool: + return True + + +class FetchListingDetailsStep(Step): + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Fetching details for {listing_id=}") + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + now = datetime.now() + if len(existing_listings) > 0: + # listing exists, do not refresh + return existing_listings[0] + listing_details = await detail_query(listing_id) + + furnish_type_str = listing_details["property"].get("letFurnishType", "unknown") + if furnish_type_str is None: + furnish_type_str = "unknown" + elif "landlord" in furnish_type_str.lower(): + furnish_type_str = "ask landlord" + else: + furnish_type_str = furnish_type_str.lower() + furnish_type = FurnishType(furnish_type_str) + + available_from: datetime | None = None + available_from_str: str | None = listing_details["property"]["letDateAvailable"] + if available_from_str is None: + available_from = None + elif available_from_str.lower() == "now": + available_from = datetime.now() + else: + try: + available_from = datetime.strptime(available_from_str, "%d/%m/%Y") + except ValueError: + # If the date format is not as expected, return None + available_from = None + + photos = listing_details["property"]["photos"] + # listing = Listing( + listing = RentListing( # TODO: should pick based on price? + id=listing_id, + price=listing_details["property"]["price"], + number_of_bedrooms=listing_details["property"]["bedrooms"], + square_meters=None, # populated later + agency=listing_details["property"]["branch"]["brandName"], + council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][ + 0 + ]["value"], + longtitude=listing_details["property"]["longitude"], + latitude=listing_details["property"]["latitude"], + price_history_json="{}", # TODO: should upsert from existing + listing_site=ListingSite.RIGHTMOVE, + last_seen=now, + photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, + furnish_type=furnish_type, + available_from=available_from, + additional_info=listing_details, + ) + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed fetching details for {listing_id=}") + # TODO: dump to filesystem + return listing + + +class FetchImagesStep(Step): + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Fetching images for {listing_id=}") + existing_listings = await self.listing_repository.get_listings( + only_ids=[listing_id] + ) + if len(existing_listings) == 0: + raise ValueError(f"Listing {listing_id} not found") + listing = existing_listings[0] + + base_path = Path("data/rs/") + all_floorplans = listing.additional_info.get("property", {}).get( + "floorplans", [] + ) + for floorplan in all_floorplans: + url = floorplan["url"] + picname = url.split("/")[-1] + floorplan_path = Path(base_path, str(listing.id), "floorplans", picname) + if floorplan_path.exists(): + continue + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 404: + return listing + if response.status != 200: + raise Exception(f"Error for {url}: {response.status}") + floorplan_path.parent.mkdir(parents=True, exist_ok=True) + with open(floorplan_path, "wb") as f: + f.write(await response.read()) + listing.floorplan_image_paths.append(str(floorplan_path)) + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed fetching images for {listing_id=}") + return listing + + +class DetectFloorplanStep(Step): + ocr_semaphore: asyncio.Semaphore + + def __init__(self, listing_repository: ListingRepository): + super().__init__(listing_repository) + self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4) + + async def needs_processing(self, listing_id: int) -> bool: + listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + if len(listings) == 0: + return False + return listings[0].square_meters is None + + async def process(self, listing_id: int) -> Listing: + logger.debug(f"Running floorplan detection for {listing_id=}") + listings = await self.listing_repository.get_listings(only_ids=[listing_id]) + if len(listings) == 0: + raise ValueError(f"Listing {listing_id} does not exist") + listing = listings[0] + sqms = [] + for floorplan_path in listing.floorplan_image_paths: + async with self.ocr_semaphore: + estimated_sqm, _ = await asyncio.to_thread( + floorplan.calculate_ocr, floorplan_path + ) + if estimated_sqm is not None: + sqms.append(estimated_sqm) + max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 + # if max_sqm is not None: + listing.square_meters = max_sqm + await self.listing_repository.upsert_listings([listing]) + logger.debug(f"Completed running floorplan detection for {listing_id=}") + return listing diff --git a/crawler/main.py b/crawler/main.py index 1135278..fbe10ea 100644 --- a/crawler/main.py +++ b/crawler/main.py @@ -6,6 +6,7 @@ import pathlib import click import importlib +import listing_processor from models.listing import FurnishType, ListingType, QueryParameters from rec.districts import get_districts from data_access import Listing diff --git a/crawler/notifications.py b/crawler/notifications.py index 735fd7e..50c7ee3 100644 --- a/crawler/notifications.py +++ b/crawler/notifications.py @@ -17,7 +17,7 @@ class Slack(Surface): @lru_cache(maxsize=None) def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise: - surfaces = surfaces or [Slack()] + surfaces = surfaces or list[Surface]([Slack()]) obj = apprise.Apprise() for surface in surfaces: if conn := surface.connection_string(): diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index f31a0dd..a64b5bf 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -1,16 +1,17 @@ import asyncio import importlib +import itertools import logging -from pathlib import Path from typing import Any -from celery import Celery, Task +from celery import Task from celery_app import app -from models.listing import FurnishType, Listing, ListingType, QueryParameters +from listing_processor import ListingProcessor +from models.listing import Listing, ListingType, QueryParameters +from rec.districts import get_districts +from rec.query import listing_query from repositories.listing_repository import ListingRepository from database import engine -from tasks.task_state import TaskStatus -dump_listings_module = importlib.import_module("1_dump_listings") dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") @@ -20,28 +21,61 @@ logger = logging.getLogger("uvicorn.error") @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: parsed_parameters = QueryParameters.model_validate_json(parameters_json) - asyncio.run(dump_listings_full(self, parsed_parameters)) - return {"progress": 1} + asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) + self.update_state(state="Starting...", meta={"progress": 0}) + return {"progress": 0} -async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Listing]: +async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: + parsed_parameters = QueryParameters.model_validate_json(parameters_json) + await dump_listings_full(task=Task(), parameters=parsed_parameters) + return {"progress": 0} + + +async def dump_listings_full( + *, task: Task, parameters: QueryParameters +) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" - self.update_state(state="FETCHING_LISTINGS", meta={"progress": 0.1}) repository = ListingRepository(engine) - new_listings = await dump_listings_module.dump_listings(parameters, repository) - self.update_state(state="FETCHING_FLOORPLANS", meta={"progress": 0.3}) - logger.debug(f"Upserted {len(new_listings)} new listings") - logger.debug("Starting to fetch floorplans") - await dump_images_module.dump_images(repository) - self.update_state(state="RUNNING_OCR_ON_FLOORPLANS", meta={"progress": 0.6}) - logger.debug("Completed fetching floorplans") - logger.debug("Starting floorplan detection") - await detect_floorplan_module.detect_floorplan(repository) - logger.debug("Completed floorplan detection") - # refresh listings - listings = await repository.get_listings(parameters) # this can be better - new_listings = [l for l in listings if l.id in new_listings] - return new_listings + + missing_ids = await get_missing_listing_ids(parameters, repository) + logger.info(f"Found {len(missing_ids)} missing listings") + + listing_processor = ListingProcessor(repository) + logger.info(f"Starting processing {len(missing_ids)} new listings") + return await dump_listings_and_monitor( + task=task, listing_processor=listing_processor, missing_ids=missing_ids + ) + + +async def dump_listings_and_monitor( + *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] +) -> list[Listing]: + task_progress = {missing_id: 0 for missing_id in missing_ids} + + async def process(missing_id: int) -> Listing | None: + listing = await listing_processor.process_listing(missing_id) + task_progress[missing_id] = 1 + return listing + + async def monitor() -> None: + while (progress := sum(task_progress.values())) < len(missing_ids): + progress_ratio = progress / len(missing_ids) + logger.error( + f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})" + ) + task.update_state( + state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", + meta={"progress": progress_ratio}, + ) + await asyncio.sleep(1) + + processed_listings = await asyncio.gather( + *[process(id) for id in missing_ids], *[monitor()] + ) + filtered_listings = [l for l in processed_listings if l is not None] + + return filtered_listings @app.on_after_finalize.connect @@ -57,5 +91,105 @@ def setup_periodic_tasks(sender, **kwargs): max_price=4000, ).model_dump_json() ), - name='Daily dump of interesting rent listings', + name="Daily dump of interesting rent listings", ) + + +async def get_missing_listing_ids( + parameters: QueryParameters, + repository: ListingRepository, +) -> set[int]: + semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections + districts = await get_valid_districts_to_scrape(parameters.district_names) + json_responses: list[list[dict[str, Any]]] = await asyncio.gather( + *[ + _fetch_listings_with_semaphore(semaphore, parameters, district) + for district in districts.keys() + ], + ) + json_responses_flat = list(itertools.chain.from_iterable(json_responses)) + logger.debug(f"Total listings fetched {len(json_responses_flat)}") + + identifiers: set[int] = set() + for response_json in json_responses_flat: + if response_json == {}: + continue + if response_json["totalAvailableResults"] == 0: + continue + for property in response_json["properties"]: + identifier = property["identifier"] + identifiers.add(identifier) + + # if listing is already in db, do not fetch details again + all_listing_ids = {l.id for l in await repository.get_listings()} + missing_ids = all_listing_ids - identifiers + return missing_ids + + +async def get_valid_districts_to_scrape( + district_names: set[str] | None, +) -> dict[str, str]: + if district_names: + districts = { + district: locid + for district, locid in get_districts().items() + if district in district_names + } + else: + districts = get_districts() + return districts + + +async def _fetch_listings_with_semaphore( + semaphore: asyncio.Semaphore, + parameters: QueryParameters, + district: str, +) -> list[dict[str, Any]]: + result = [] + # split the price in N bands to avoid the 1.5k capping by rightmove + # basically instead of 1 query with price between 1k and 5k that is capped at 1500 results + # we do 10 queries each with an increment in price range so we send more queries but each + # has a smaller chance of returning more than 1.5k results + + number_of_steps = 1 + price_step = parameters.max_price // number_of_steps + + for step in range(number_of_steps): + min_price = step * price_step + max_price = (step + 1) * price_step + logger.debug( + f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}" + ) + + for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1): + for page_id in range( + 1, + 3, # seems like all searches stop at 1500 entries (page_id * page_size) + ): + logger.debug(f"Processing {page_id=} for {district=}") + + async with semaphore: + try: + listing_query_result = await listing_query( + page=page_id, + channel=parameters.listing_type, + # min_bedrooms=parameters.min_bedrooms, + # max_bedrooms=parameters.max_bedrooms, + min_bedrooms=num_bedrooms, + max_bedrooms=num_bedrooms, + radius=parameters.radius, + min_price=min_price, + max_price=max_price, + district=district, + page_size=parameters.page_size, + max_days_since_added=parameters.max_days_since_added, + furnish_types=parameters.furnish_types or [], + ) + + except Exception as e: + if "GENERIC_ERROR" in str(e): # Too big page id + logger.debug(f"Max page id for {district=}: {page_id-1}") + break + raise e + result.append(listing_query_result) + return result