wrongmove/crawler/listing_processor.py

201 lines
7.9 KiB
Python

from __future__ import annotations
from abc import abstractmethod
import asyncio
from datetime import datetime
import logging
import multiprocessing
from pathlib import Path
import aiohttp
from models.listing import FurnishType, Listing, ListingSite, RentListing
from rec import floorplan
from rec.query import detail_query
from repositories.listing_repository import ListingRepository
logger = logging.getLogger("uvicorn.error")
class ListingProcessor:
semaphore: asyncio.Semaphore
process_steps: list[Step]
def __init__(self, listing_repository: ListingRepository):
self.semaphore = asyncio.Semaphore(20)
# Register new processing steps here
# Order is important
self.process_steps = [
FetchListingDetailsStep(listing_repository),
FetchImagesStep(listing_repository),
DetectFloorplanStep(listing_repository),
]
async def process_listing(self, listing_id: int) -> Listing | None:
listing = None
for step in self.process_steps:
if await step.needs_processing(listing_id):
async with self.semaphore:
listing = await step.process(listing_id)
return listing
async def listing_exists(self, listing_id: int) -> bool: ...
class Step:
listing_repository: ListingRepository
def __init__(self, listing_repository: ListingRepository):
self.listing_repository = listing_repository
@abstractmethod
async def process(self, listing_id: int) -> Listing: ...
@abstractmethod
async def needs_processing(self, listing_id: int) -> bool:
return True
class FetchListingDetailsStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if (existing_listings) == 0:
return True
return False
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching details for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
now = datetime.now()
if len(existing_listings) > 0:
# listing exists, do not refresh
return existing_listings[0]
listing_details = await detail_query(listing_id)
furnish_type_str = listing_details["property"].get("letFurnishType", "unknown")
if furnish_type_str is None:
furnish_type_str = "unknown"
elif "landlord" in furnish_type_str.lower():
furnish_type_str = "ask landlord"
else:
furnish_type_str = furnish_type_str.lower()
furnish_type = FurnishType(furnish_type_str)
available_from: datetime | None = None
available_from_str: str | None = listing_details["property"]["letDateAvailable"]
if available_from_str is None:
available_from = None
elif available_from_str.lower() == "now":
available_from = datetime.now()
else:
try:
available_from = datetime.strptime(available_from_str, "%d/%m/%Y")
except ValueError:
# If the date format is not as expected, return None
available_from = None
photos = listing_details["property"]["photos"]
# listing = Listing(
listing = RentListing( # TODO: should pick based on price?
id=listing_id,
price=listing_details["property"]["price"],
number_of_bedrooms=listing_details["property"]["bedrooms"],
square_meters=None, # populated later
agency=listing_details["property"]["branch"]["brandName"],
council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][
0
]["value"],
longtitude=listing_details["property"]["longitude"],
latitude=listing_details["property"]["latitude"],
price_history_json="{}", # TODO: should upsert from existing
listing_site=ListingSite.RIGHTMOVE,
last_seen=now,
photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None,
furnish_type=furnish_type,
available_from=available_from,
additional_info=listing_details,
)
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching details for {listing_id=}")
# TODO: dump to filesystem
return listing
class FetchImagesStep(Step):
async def needs_processing(self, listing_id: int) -> bool:
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
return False # if listing doesn't exist, we can't process it
listing = existing_listings[0]
return len(listing.floorplan_image_paths) == 0
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching images for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
raise ValueError(f"Listing {listing_id} not found")
listing = existing_listings[0]
base_path = Path("data/rs/")
all_floorplans = listing.additional_info.get("property", {}).get(
"floorplans", []
)
for floorplan in all_floorplans:
url = floorplan["url"]
picname = url.split("/")[-1]
floorplan_path = Path(base_path, str(listing.id), "floorplans", picname)
if floorplan_path.exists():
continue
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 404:
return listing
if response.status != 200:
raise Exception(f"Error for {url}: {response.status}")
floorplan_path.parent.mkdir(parents=True, exist_ok=True)
with open(floorplan_path, "wb") as f:
f.write(await response.read())
listing.floorplan_image_paths.append(str(floorplan_path))
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching images for {listing_id=}")
return listing
class DetectFloorplanStep(Step):
ocr_semaphore: asyncio.Semaphore
def __init__(self, listing_repository: ListingRepository):
super().__init__(listing_repository)
self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4)
async def needs_processing(self, listing_id: int) -> bool:
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
return False
return listings[0].square_meters is None
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Running floorplan detection for {listing_id=}")
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
raise ValueError(f"Listing {listing_id} does not exist")
listing = listings[0]
sqms = []
for floorplan_path in listing.floorplan_image_paths:
async with self.ocr_semaphore:
estimated_sqm, _ = await asyncio.to_thread(
floorplan.calculate_ocr, floorplan_path
)
if estimated_sqm is not None:
sqms.append(estimated_sqm)
max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0
# if max_sqm is not None:
listing.square_meters = max_sqm
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed running floorplan detection for {listing_id=}")
return listing