from __future__ import annotations from abc import abstractmethod import asyncio from datetime import datetime import logging import multiprocessing from pathlib import Path import aiohttp from models.listing import FurnishType, Listing, ListingSite, RentListing from rec import floorplan from rec.query import detail_query from repositories.listing_repository import ListingRepository logger = logging.getLogger("uvicorn.error") class ListingProcessor: semaphore: asyncio.Semaphore process_steps: list[Step] listing_repository: ListingRepository def __init__(self, listing_repository: ListingRepository): self.semaphore = asyncio.Semaphore(20) self.listing_repository = listing_repository # Register new processing steps here # Order is important self.process_steps = [ FetchListingDetailsStep(listing_repository), FetchImagesStep(listing_repository), DetectFloorplanStep(listing_repository), ] async def process_listing(self, listing_id: int) -> Listing | None: await self.listing_repository.mark_seen(listing_id) listing = None for step in self.process_steps: if await step.needs_processing(listing_id): async with self.semaphore: try: listing = await step.process(listing_id) except Exception as e: logger.error(f"Failed to process {listing_id=}: {e}") return None return listing async def listing_exists(self, listing_id: int) -> bool: ... class Step: listing_repository: ListingRepository def __init__(self, listing_repository: ListingRepository): self.listing_repository = listing_repository @abstractmethod async def process(self, listing_id: int) -> Listing: ... @abstractmethod async def needs_processing(self, listing_id: int) -> bool: return True class FetchListingDetailsStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if (existing_listings) == 0: return True return False async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching details for {listing_id=}") existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) now = datetime.now() if len(existing_listings) > 0: # listing exists, do not refresh return existing_listings[0] listing_details = await detail_query(listing_id) furnish_type_str = listing_details["property"].get("letFurnishType", "unknown") if furnish_type_str is None: furnish_type_str = "unknown" elif "landlord" in furnish_type_str.lower(): furnish_type_str = "ask landlord" else: furnish_type_str = furnish_type_str.lower() furnish_type = FurnishType(furnish_type_str) available_from: datetime | None = None available_from_str: str | None = listing_details["property"]["letDateAvailable"] if available_from_str is None: available_from = None elif available_from_str.lower() == "now": available_from = datetime.now() else: try: available_from = datetime.strptime(available_from_str, "%d/%m/%Y") except ValueError: # If the date format is not as expected, return None available_from = None photos = listing_details["property"]["photos"] # listing = Listing( listing = RentListing( # TODO: should pick based on price? id=listing_id, price=listing_details["property"]["price"], number_of_bedrooms=listing_details["property"]["bedrooms"], square_meters=None, # populated later agency=listing_details["property"]["branch"]["brandName"], council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][ 0 ]["value"], longitude=listing_details["property"]["longitude"], latitude=listing_details["property"]["latitude"], price_history_json="{}", # TODO: should upsert from existing listing_site=ListingSite.RIGHTMOVE, last_seen=now, photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, furnish_type=furnish_type, available_from=available_from, additional_info=listing_details, ) await self.listing_repository.upsert_listings([listing]) logger.debug(f"Completed fetching details for {listing_id=}") # TODO: dump to filesystem return listing class FetchImagesStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if len(existing_listings) == 0: return False # if listing doesn't exist, we can't process it listing = existing_listings[0] return len(listing.floorplan_image_paths) == 0 async def process(self, listing_id: int) -> Listing: logger.debug(f"Fetching images for {listing_id=}") existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if len(existing_listings) == 0: raise ValueError(f"Listing {listing_id} not found") listing = existing_listings[0] base_path = Path("data/rs/") all_floorplans = listing.additional_info.get("property", {}).get( "floorplans", [] ) client_timeout = aiohttp.ClientTimeout(total=30) for floorplan_obj in all_floorplans: url = floorplan_obj["url"] picname = url.split("/")[-1] floorplan_path = Path(base_path, str(listing.id), "floorplans", picname) if floorplan_path.exists(): continue async with aiohttp.ClientSession() as session: async with session.get(url, timeout=client_timeout) as response: if response.status == 404: return listing if response.status != 200: raise Exception(f"Error for {url}: {response.status}") floorplan_path.parent.mkdir(parents=True, exist_ok=True) with open(floorplan_path, "wb") as f: f.write(await response.read()) listing.floorplan_image_paths.append(str(floorplan_path)) await self.listing_repository.upsert_listings([listing]) logger.debug(f"Completed fetching images for {listing_id=}") return listing class DetectFloorplanStep(Step): ocr_semaphore: asyncio.Semaphore def __init__(self, listing_repository: ListingRepository): super().__init__(listing_repository) self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4) async def needs_processing(self, listing_id: int) -> bool: listings = await self.listing_repository.get_listings(only_ids=[listing_id]) if len(listings) == 0: return False return listings[0].square_meters is None async def process(self, listing_id: int) -> Listing: logger.debug(f"Running floorplan detection for {listing_id=}") listings = await self.listing_repository.get_listings(only_ids=[listing_id]) if len(listings) == 0: raise ValueError(f"Listing {listing_id} does not exist") listing = listings[0] sqms = [] for floorplan_path in listing.floorplan_image_paths: async with self.ocr_semaphore: estimated_sqm, _ = await asyncio.to_thread( floorplan.calculate_ocr, floorplan_path ) if estimated_sqm is not None: sqms.append(estimated_sqm) max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 # if max_sqm is not None: listing.square_meters = max_sqm await self.listing_repository.upsert_listings([listing]) logger.debug(f"Completed running floorplan detection for {listing_id=}") return listing