from __future__ import annotations from abc import abstractmethod import asyncio from collections.abc import Callable from datetime import datetime import logging import multiprocessing from pathlib import Path from urllib.parse import urlparse import aiohttp from models.listing import FurnishType, Listing, ListingSite, RentListing from rec import floorplan from rec.query import detail_query from repositories.listing_repository import ListingRepository logger = logging.getLogger("uvicorn.error") # Limit OCR threads to 25% of available cores to avoid starving other work. MAX_OCR_WORKERS = max(1, multiprocessing.cpu_count() // 4) def _parse_furnish_type(raw: str | None) -> FurnishType: """Normalise the raw furnish-type string from the API into a FurnishType enum.""" if raw is None: return FurnishType.UNKNOWN if "landlord" in raw.lower(): return FurnishType.ASK_LANDLORD lowered = raw.lower() try: return FurnishType(lowered) except ValueError: return FurnishType.UNKNOWN def _parse_available_from(raw: str | None) -> datetime | None: """Parse the available-from date string into a datetime, or None.""" if raw is None: return None if raw.lower() == "now": return datetime.now() try: return datetime.strptime(raw, "%d/%m/%Y") except ValueError: return None class ListingProcessor: semaphore: asyncio.Semaphore process_steps: list[Step] listing_repository: ListingRepository # Map step class names to short names for progress reporting STEP_NAMES: dict[str, str] = { "FetchListingDetailsStep": "details", "FetchImagesStep": "images", "DetectFloorplanStep": "ocr", } def __init__(self, listing_repository: ListingRepository): self.semaphore = asyncio.Semaphore(20) self.listing_repository = listing_repository # Register new processing steps here # Order is important self.process_steps = [ FetchListingDetailsStep(listing_repository), FetchImagesStep(listing_repository), DetectFloorplanStep(listing_repository), ] async def process_listing( self, listing_id: int, on_step_complete: Callable[[str], None] | None = None, ) -> Listing | None: await self.listing_repository.mark_seen(listing_id) listing = None for step in self.process_steps: if await step.needs_processing(listing_id): async with self.semaphore: step_class_name = step.__class__.__name__ try: listing = await step.process(listing_id) logger.debug(f"[{listing_id}] {step_class_name} completed") if on_step_complete: short_name = self.STEP_NAMES.get( step_class_name, step_class_name ) on_step_complete(short_name) except Exception as e: logger.error(f"[{listing_id}] {step_class_name} failed: {e}") return None return listing class Step: listing_repository: ListingRepository def __init__(self, listing_repository: ListingRepository): self.listing_repository = listing_repository @abstractmethod async def process(self, listing_id: int) -> Listing: ... @abstractmethod async def needs_processing(self, listing_id: int) -> bool: return True class FetchListingDetailsStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if len(existing_listings) == 0: return True return False async def process(self, listing_id: int) -> Listing: logger.debug(f"[{listing_id}] Fetching property details from API") logger.info(f"[{listing_id}] Fetching details...") existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) now = datetime.now() if len(existing_listings) > 0: # listing exists, do not refresh logger.debug(f"[{listing_id}] Already exists, skipping refresh") return existing_listings[0] listing_details = await detail_query(listing_id) furnish_type = _parse_furnish_type( listing_details["property"].get("letFurnishType", "unknown") ) available_from = _parse_available_from( listing_details["property"]["letDateAvailable"] ) photos = listing_details["property"]["photos"] listing = RentListing( # TODO: should pick based on price? id=listing_id, price=listing_details["property"]["price"], number_of_bedrooms=listing_details["property"]["bedrooms"], square_meters=None, # populated later agency=listing_details["property"]["branch"]["brandName"], council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][ 0 ]["value"], longitude=listing_details["property"]["longitude"], latitude=listing_details["property"]["latitude"], price_history_json="{}", # TODO: should upsert from existing listing_site=ListingSite.RIGHTMOVE, last_seen=now, photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None, furnish_type=furnish_type, available_from=available_from, additional_info=listing_details, ) await self.listing_repository.upsert_listings([listing]) logger.info( f"[{listing_id}] Details fetched: £{listing.price}, " f"{listing.number_of_bedrooms}BR, {listing.agency}" ) logger.debug(f"[{listing_id}] Details fetch complete") # TODO: dump to filesystem return listing class FetchImagesStep(Step): async def needs_processing(self, listing_id: int) -> bool: existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if len(existing_listings) == 0: return False # if listing doesn't exist, we can't process it listing = existing_listings[0] return len(listing.floorplan_image_paths) == 0 async def process(self, listing_id: int) -> Listing: logger.debug(f"[{listing_id}] Fetching floorplan images") existing_listings = await self.listing_repository.get_listings( only_ids=[listing_id] ) if len(existing_listings) == 0: raise ValueError(f"Listing {listing_id} not found") listing = existing_listings[0] base_path = Path("data/rs/") all_floorplans = listing.additional_info.get("property", {}).get( "floorplans", [] ) if len(all_floorplans) == 0: logger.debug(f"[{listing_id}] No floorplans available") return listing downloaded = 0 client_timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession() as session: for floorplan_obj in all_floorplans: url = floorplan_obj["url"] picname = Path(urlparse(url).path).name floorplan_path = Path(base_path, str(listing.id), "floorplans", picname) if floorplan_path.exists(): continue async with session.get(url, timeout=client_timeout) as response: if response.status == 404: return listing if response.status != 200: raise Exception(f"Error for {url}: {response.status}") floorplan_path.parent.mkdir(parents=True, exist_ok=True) with open(floorplan_path, "wb") as f: f.write(await response.read()) listing.floorplan_image_paths.append(str(floorplan_path)) downloaded += 1 await self.listing_repository.upsert_listings([listing]) logger.info(f"[{listing_id}] Downloaded {downloaded} floorplan images") logger.debug(f"[{listing_id}] Image fetch complete") return listing class DetectFloorplanStep(Step): ocr_semaphore: asyncio.Semaphore def __init__(self, listing_repository: ListingRepository): super().__init__(listing_repository) self.ocr_semaphore = asyncio.Semaphore(MAX_OCR_WORKERS) async def needs_processing(self, listing_id: int) -> bool: listings = await self.listing_repository.get_listings(only_ids=[listing_id]) if len(listings) == 0: return False return listings[0].square_meters is None async def process(self, listing_id: int) -> Listing: logger.debug(f"[{listing_id}] Running OCR on floorplans") listings = await self.listing_repository.get_listings(only_ids=[listing_id]) if len(listings) == 0: raise ValueError(f"Listing {listing_id} does not exist") listing = listings[0] if len(listing.floorplan_image_paths) == 0: logger.debug(f"[{listing_id}] No floorplan images to process") listing.square_meters = 0 await self.listing_repository.upsert_listings([listing]) return listing sqms = [] for floorplan_path in listing.floorplan_image_paths: async with self.ocr_semaphore: estimated_sqm, _ = await asyncio.to_thread( floorplan.calculate_ocr, floorplan_path ) if estimated_sqm is not None: sqms.append(estimated_sqm) max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 listing.square_meters = max_sqm await self.listing_repository.upsert_listings([listing]) if max_sqm > 0: logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm") else: logger.debug(f"[{listing_id}] OCR: no square meters detected") logger.debug(f"[{listing_id}] OCR complete") return listing