import asyncio from collections import defaultdict from dataclasses import dataclass import json import pathlib from typing import Any, List, Dict from models.listing import ListingSite, PriceHistoryItem from rec import floorplan, routing import re import datetime @dataclass() class Listing: identifier: int _details_object: dict[str, Any] | None = None _listing_object: dict[str, Any] | None = None data_dir: pathlib.Path = pathlib.Path("data/rs/") ALL_COLUMNS = [ "identifier", "sqm_ocr", "price", "price_per_sqm", "url", "bedrooms", "travel_time_fastest", "travel_time_second", "lease_left", "service_charge", "development", "tenure_type", "updated_days", "status", "last_seen", "agency", "council_tax_band", ] @staticmethod def get_all_listings( listing_paths: list[pathlib.Path], seen_in_the_last_n_days: int = 30, ) -> List["Listing"]: identifiers = [] for listing_path in listing_paths: with open(listing_path) as f: d = json.load(f) # data_dir is the first directory before the listing_path data_dir = pathlib.Path(listing_path) while str(d["identifier"]) in str(data_dir.resolve().absolute()): data_dir = data_dir.parent listing = Listing(d["identifier"], data_dir=data_dir) if ( listing.last_seen is not None and listing.last_seen < seen_in_the_last_n_days ): identifiers.append(listing) return identifiers def path_listing(self) -> pathlib.Path: p = self.data_dir / str(self.identifier) p.mkdir(parents=True, exist_ok=True) return p def path_listing_json(self) -> pathlib.Path: return self.path_listing() / "listing.json" def path_detail_json(self) -> pathlib.Path: return self.path_listing() / "detail.json" def path_routing_json(self) -> pathlib.Path: return self.path_listing() / "routing.json" def path_floorplan_model_json(self) -> pathlib.Path: return self.path_listing() / "floorplan_model.json" def path_floorplan_ocr_json(self) -> pathlib.Path: return self.path_listing() / "floorplan_ocr.json" def path_pic_folder(self) -> pathlib.Path: return self.path_listing() / "pics" def path_pic_file(self, order, name) -> pathlib.Path: self.path_pic_folder().mkdir(parents=True, exist_ok=True) return self.path_pic_folder() / f"{order}_{name}" def path_floorplan_folder(self) -> pathlib.Path: return self.path_listing() / "floorplans" def path_floorplan_file(self, order, name) -> pathlib.Path: self.path_floorplan_folder().mkdir(parents=True, exist_ok=True) return self.path_floorplan_folder() / f"{order}_{name}" def path_last_seen_listing(self) -> pathlib.Path: return self.path_listing() / "last_seen.json" def path_price_history(self) -> pathlib.Path: return self.path_listing() / "price_history.json" def dump_listing(self) -> None: if self._listing_object is None: raise ValueError("No listing data provided to dump.") with open(self.path_listing_json(), "w") as f: json.dump(self._listing_object, f) with open(self.path_last_seen_listing(), "w") as f: dt = datetime.datetime.now().isoformat() json.dump(dt, f) # some places list pw in price and others pcm price = max( self._listing_object["price"], self._listing_object.get("monthlyRent", 0) or 0, ) self.append_price_history(price) def append_price_history(self, price: float) -> None: """Append the price history to the listing's price history file.""" existing_price_history = ( json.loads(self.path_price_history().read_text()) if self.path_price_history().exists() else [] ) now = datetime.datetime.now().isoformat() # if the last price is the same, just update the date if len(existing_price_history) > 0: last_price = existing_price_history[-1]["price"] if last_price == price: existing_price_history[-1]["last_seen"] = now else: existing_price_history.append( { "first_seen": now, "last_seen": now, "price": price, } ) with open(self.path_price_history(), "w") as f: json.dump(existing_price_history, f, indent=4) def list_floorplans(self): images = list(self.path_floorplan_folder().glob("*")) # todo add check if return is image return images def calculate_sqm_model(self): objs = [] for floorplan_path in self.list_floorplans(): estimated_sqm, model_output, predictions = floorplan.calculate_model( floorplan_path ) objs.append( { "floorplan_path": str(floorplan_path), "estimated_sqm": estimated_sqm, "model_output": model_output, "no_predictions": len( predictions ), # cant serialize the predictions itself since its a tensor } ) with open(self.path_floorplan_model_json(), "w") as f: json.dump(objs, f) @property def sqm_model(self, recalculate=True) -> float: if not self.path_floorplan_model_json().exists() or recalculate: self.calculate_sqm_model() with open(self.path_floorplan_json()) as f: objs = json.load(f) max_sqm = max( [o["estimated_sqm"] for o in objs if o is None] ) # filter out Nones return max_sqm async def calculate_sqm_ocr(self, recalculate=True): objs = [] if self.path_floorplan_ocr_json().exists(): with open(self.path_floorplan_ocr_json()) as f: objs = json.load(f) if not recalculate and len(objs) > 0: return for floorplan_path in self.list_floorplans(): estimated_sqm, model_output = await asyncio.to_thread( floorplan.calculate_ocr, floorplan_path ) objs.append( { "floorplan_path": str(floorplan_path), "estimated_sqm": estimated_sqm, "text": model_output, } ) with open(self.path_floorplan_ocr_json(), "w") as f: json.dump(objs, f) async def sqm_ocr(self, recalculate=False) -> float | None: if not self.path_floorplan_ocr_json().exists() or recalculate: await self.calculate_sqm_ocr() with open(self.path_floorplan_ocr_json()) as f: objs = json.load(f) sqms = [o["estimated_sqm"] for o in objs if o["estimated_sqm"] is not None] if len(sqms) == 0: return None max_sqm = max(sqms) return max_sqm def calculate_route( self, dest_address: str, travel_mode: routing.TravelMode, recalculate=False ) -> dict[str, Any]: routing_cache = self.__get_routing_cache() cache_key = self.__routing_cache_key(dest_address, travel_mode) if ( route_cache := routing_cache.get(cache_key) ) is not None and not recalculate: return {cache_key: route_cache} result = routing.transit_route( self.latitude, self.longitude, dest_address, travel_mode, ) if not result: raise Exception( ( f"Error calculating route from {self.identifier} " f"to '{dest_address}' by {travel_mode}" ) ) result = {**{cache_key: result}, **routing_cache} with open(self.path_routing_json(), "w") as f: json.dump(result, f) return result def travel_time( self, destination_address: str, travel_mode: routing.TravelMode, ) -> list[dict[str, Any]]: data = self.calculate_route(destination_address, travel_mode) return self.__extract_travel_times(data, destination_address, travel_mode) @property def url(self): return f"https://www.rightmove.co.uk/properties/{self.identifier}" @property def listingobject(self): with open(self.path_listing_json()) as f: return json.load(f) @property def detailobject(self) -> dict[str, Any]: if self._details_object is not None: return self._details_object if ( self.path_detail_json().exists() and json.load(self.path_detail_json().open()).get("property") is not None ): with open(self.path_detail_json()) as f: self._details_object = json.load(f) return self._details_object # type: ignore raise ValueError(f"Detail object for listing {self.identifier} not found.") @property def price(self) -> float: return self.detailobject["property"]["price"] @property def tenure_type(self) -> str: return self.detailobject["property"]["tenureType"] async def price_per_sqm(self) -> float: sqm_ocr = await self.sqm_ocr() if sqm_ocr is None or sqm_ocr == 0: return -1 return self.price / sqm_ocr @property def bedrooms(self) -> int: return self.detailobject["property"]["bedrooms"] @property def latitude(self) -> float: return self.detailobject["property"]["latitude"] @property def longitude(self) -> float: return self.detailobject["property"]["longitude"] @property def leaseLeft(self) -> float | None: ds = self.detailobject["property"].get("tenureInfo", {}).get("content", []) for d in ds: if d["type"] == "lengthOfLease": matches = re.findall(r"(\d+\.?\d*)", d["value"]) if len(matches): return float(matches[0]) return None @property def updateDaysAgo(self) -> int: ts = self.detailobject["property"]["updateDate"] / 1000 now = datetime.datetime.now() ds = datetime.datetime.fromtimestamp(ts) return (now - ds).days @property def last_seen(self) -> int: with open(self.path_last_seen_listing(), "r") as f: datetime_str = json.load(f) dt = datetime.datetime.fromisoformat(datetime_str) return (datetime.datetime.now() - dt).days @property def serviceCharge(self) -> float | None: ds = self.detailobject["property"].get("tenureInfo", {}).get("content", []) for d in ds: if d["type"] == "annualServiceCharge": matches = re.findall(r"([\d,.]+)", d["value"]) if len(matches): # remove separators (e.g. 6,395.76) match = matches[0].replace(",", "") return float(match) return None @property def development(self) -> bool: # aka new home try: return self.detailobject["property"]["development"] except Exception: return False @property def isRemoved(self) -> bool: return not self.detailobject["property"]["visible"] @property def status(self) -> str: if self.isRemoved: return "removed" status = self.detailobject["property"]["status"] return status @property def agency(self) -> str: return self.detailobject["property"]["branch"]["brandName"] @property def councilTaxBand(self) -> str: return self.detailobject["property"]["councilTaxInfo"]["content"][0]["value"] @property def photoThumbnail(self) -> str | None: # options are: 'url', 'thumbnailUrl', 'maxSizeUrl' photos = self.detailobject["property"]["photos"] if len(photos) > 0: return photos[0]["url"] return None @property def letDateAvailable(self) -> datetime.datetime | None: # options are: 'url', 'thumbnailUrl', 'maxSizeUrl' let_date_available: str | None = self.detailobject["property"][ "letDateAvailable" ] # Seems null for all assets? if let_date_available is None: return None if let_date_available == "Now": return datetime.datetime.now() try: return datetime.datetime.strptime(let_date_available, "%d/%m/%Y") except ValueError: # If the date format is not as expected, return None return None @property def priceHistory(self) -> list[PriceHistoryItem]: if not self.path_price_history().exists(): return [] with open(self.path_price_history(), "r") as f: data = json.load(f) return [ PriceHistoryItem( first_seen=datetime.datetime.fromisoformat(item["first_seen"]), last_seen=datetime.datetime.fromisoformat(item["last_seen"]), price=item["price"], ) for item in data ] @property def longtitude(self) -> float: return self.detailobject["property"]["longitude"] @property def latitude(self) -> float: return self.detailobject["property"]["latitude"] @property def listing_site(self) -> ListingSite: return ListingSite.RIGHTMOVE # this class supports only right move async def dict_nicely(self): travel_time_fastest = {} travel_time_second = {} if self.path_routing_json().exists(): with open(self.path_routing_json(), "r") as f: travel_times = json.load(f) for destination_mode in travel_times.keys(): destination_mode_clean = destination_mode.replace(" ", "_").replace( ",", "_" ) destination, travel_mode = self.__from_routing_cache_key( destination_mode ) travel_time_fastest[destination_mode_clean] = self.travel_time( destination, travel_mode )[0]["duration"] travel_time_second[destination_mode_clean] = self.travel_time( destination, travel_mode )[1]["duration"] return { "identifier": self.identifier, "sqm_ocr": await self.sqm_ocr(), "price": self.price, "price_per_sqm": await self.price_per_sqm(), "url": self.url, "bedrooms": self.bedrooms, "travel_time_fastest": ":".join( sorted( f"{dest} in {travel_mode//60}min" for dest, travel_mode in travel_time_fastest.items() ) ), "travel_time_second": ":".join( sorted( f"{dest} in {travel_mode//60}min" for dest, travel_mode in travel_time_second.items() ) ), "lease_left": self.leaseLeft, "service_charge": self.serviceCharge, "development": self.development, "tenure_type": self.tenure_type, "updated_days": self.updateDaysAgo, "status": self.status, "last_seen": self.last_seen, "agency": self.agency, "council_tax_band": self.councilTaxBand, "photo_thumbnail": self.photoThumbnail, "let_date_available": ( self.letDateAvailable.strftime("%d/%m/%Y") if self.letDateAvailable else "Ask agent" ), "price_history": self.priceHistory, } def __routing_cache_key( self, dest_address: str, travel_mode: routing.TravelMode, ) -> str: return f"{dest_address} by {travel_mode}" def __from_routing_cache_key( self, cache_key: str, ) -> tuple[str, routing.TravelMode]: match = re.match(r"(.+) by (.+)", cache_key) if not match: raise ValueError(f"Invalid cache key: {cache_key}") return match.group(1), routing.TravelMode[match.group(2)] def __extract_travel_times( self, routing_data: dict[str, Any], destination_address: str, travel_mode: routing.TravelMode, limit: int = 2, ) -> list[dict[str, Any]]: res = [] cache_key = self.__routing_cache_key(destination_address, travel_mode) for route in routing_data[cache_key]["routes"]: distance = route["distanceMeters"] duration = int(route["duration"].strip("s")) duration_static = int(route["staticDuration"].strip("s")) steps = route["legs"][0]["steps"] initial_walk_duration = 0 used_transit = False duration_per_transit = defaultdict(lambda: 0) distance_per_transit = defaultdict(lambda: 0) number_of_transit_stops = 0 for step in steps: if not used_transit and step["travelMode"] == "WALK": initial_walk_duration += int(step["staticDuration"].strip("s")) else: used_transit = True duration_per_transit[step["travelMode"]] += int( step["staticDuration"].strip("s") ) distance_per_transit[step["travelMode"]] += step.get( "distanceMeters", 0 ) if step["travelMode"] == "TRANSIT": number_of_transit_stops += 1 res.append( { "duration": duration, "distance": distance, "duration_static": duration_static, "initial_walk_duration": initial_walk_duration, "duration_per_transit": dict(duration_per_transit), "distance_per_transit": dict(distance_per_transit), "number_of_transit_stops": number_of_transit_stops, } ) return res[:limit] def __get_routing_cache(self) -> dict[str, Any]: try: with open(self.path_routing_json(), "x") as f: json.dump({}, f) return {} except FileExistsError: pass with open(self.path_routing_json(), "r") as f: return json.load(f)