import asyncio from collections import defaultdict from dataclasses import dataclass import json import pathlib from typing import Any, List, Dict from rec import floorplan, routing import re import datetime @dataclass() class Listing: identifier: int _cached: Dict | None = None data_dir: pathlib.Path = pathlib.Path("data/rs/") ALL_COLUMNS = [ "identifier", "sqm_ocr", "price", "price_per_sqm", "url", "bedrooms", "travel_time_fastest", "travel_time_second", "lease_left", "service_charge", "development", "tenure_type", "updated_days", "status", "last_seen", "agency", "council_tax_band", ] @staticmethod def get_all_listings( listing_paths: list[str], seen_in_the_last_n_days: int = 30, ) -> List["Listing"]: identifiers = [] for listing_path in listing_paths: with open(listing_path) as f: d = json.load(f) # data_dir is the first directory before the listing_path data_dir = pathlib.Path(listing_path) while str(d["identifier"]) in str(data_dir.resolve().absolute()): data_dir = data_dir.parent listing = Listing(d["identifier"], data_dir=data_dir) if ( listing.last_seen is not None and listing.last_seen < seen_in_the_last_n_days ): identifiers.append(listing) return identifiers def path_listing(self) -> pathlib.Path: p = self.data_dir / str(self.identifier) p.mkdir(parents=True, exist_ok=True) return p def path_listing_json(self) -> pathlib.Path: return self.path_listing() / "listing.json" def path_detail_json(self) -> pathlib.Path: return self.path_listing() / "detail.json" def path_routing_json(self) -> pathlib.Path: return self.path_listing() / "routing.json" def path_floorplan_model_json(self) -> pathlib.Path: return self.path_listing() / "floorplan_model.json" def path_floorplan_ocr_json(self) -> pathlib.Path: return self.path_listing() / "floorplan_ocr.json" def path_pic_folder(self) -> pathlib.Path: return self.path_listing() / "pics" def path_pic_file(self, order, name) -> pathlib.Path: self.path_pic_folder().mkdir(parents=True, exist_ok=True) return self.path_pic_folder() / f"{order}_{name}" def path_floorplan_folder(self) -> pathlib.Path: return self.path_listing() / "floorplans" def path_floorplan_file(self, order, name) -> pathlib.Path: self.path_floorplan_folder().mkdir(parents=True, exist_ok=True) return self.path_floorplan_folder() / f"{order}_{name}" def path_last_seen_listing(self) -> pathlib.Path: return self.path_listing() / "last_seen.json" def path_price_history(self) -> pathlib.Path: return self.path_listing() / "price_history.json" def dump_listing(self, d: dict): with open(self.path_listing_json(), "w") as f: json.dump(d, f) with open(self.path_last_seen_listing(), "w") as f: dt = datetime.datetime.now().isoformat() json.dump(dt, f) # some places list pw in price and others pcm price = max(d["price"], d.get("monthlyRent", 0)) self.append_price_history(price) def append_price_history(self, price: float) -> None: """Append the price history to the listing's price history file.""" existing_price_history = ( json.loads(self.path_price_history().read_text()) if self.path_price_history().exists() else [] ) now = datetime.datetime.now().isoformat() # if the last price is the same, just update the date if len(existing_price_history) > 0: last_price = existing_price_history[-1]["price"] if last_price == price: existing_price_history[-1]["last_seen"] = now else: existing_price_history.append( { "first_seen": now, "last_seen": now, "price": price, } ) with open(self.path_price_history(), "w") as f: json.dump(existing_price_history, f, indent=4) def list_floorplans(self): images = list(self.path_floorplan_folder().glob("*")) # todo add check if return is image return images def calculate_sqm_model(self): objs = [] for floorplan_path in self.list_floorplans(): estimated_sqm, model_output, predictions = floorplan.calculate_model( floorplan_path ) objs.append( { "floorplan_path": str(floorplan_path), "estimated_sqm": estimated_sqm, "model_output": model_output, "no_predictions": len( predictions ), # cant serialize the predictions itself since its a tensor } ) with open(self.path_floorplan_model_json(), "w") as f: json.dump(objs, f) @property def sqm_model(self, recalculate=True) -> float: if not self.path_floorplan_model_json().exists() or recalculate: self.calculate_sqm_model() with open(self.path_floorplan_json()) as f: objs = json.load(f) max_sqm = max( [o["estimated_sqm"] for o in objs if o is None] ) # filter out Nones return max_sqm async def calculate_sqm_ocr(self, recalculate=True): objs = [] if self.path_floorplan_ocr_json().exists(): with open(self.path_floorplan_ocr_json()) as f: objs = json.load(f) if not recalculate and len(objs) > 0: return for floorplan_path in self.list_floorplans(): estimated_sqm, model_output = await asyncio.to_thread( floorplan.calculate_ocr, floorplan_path ) objs.append( { "floorplan_path": str(floorplan_path), "estimated_sqm": estimated_sqm, "text": model_output, } ) with open(self.path_floorplan_ocr_json(), "w") as f: json.dump(objs, f) async def sqm_ocr(self, recalculate=False) -> float | None: if not self.path_floorplan_ocr_json().exists() or recalculate: await self.calculate_sqm_ocr() with open(self.path_floorplan_ocr_json()) as f: objs = json.load(f) sqms = [o["estimated_sqm"] for o in objs if o["estimated_sqm"] is not None] if len(sqms) == 0: return None max_sqm = max(sqms) return max_sqm def calculate_route( self, dest_address: str, travel_mode: routing.TravelMode, recalculate=False ) -> dict[str, Any]: routing_cache = self.__get_routing_cache() cache_key = self.__routing_cache_key(dest_address, travel_mode) if ( route_cache := routing_cache.get(cache_key) ) is not None and not recalculate: return {cache_key: route_cache} result = routing.transit_route( self.latitude, self.longitude, dest_address, travel_mode, ) if not result: raise Exception( ( f"Error calculating route from {self.identifier} " f"to '{dest_address}' by {travel_mode}" ) ) result = {**{cache_key: result}, **routing_cache} with open(self.path_routing_json(), "w") as f: json.dump(result, f) return result def travel_time( self, destination_address: str, travel_mode: routing.TravelMode, ) -> list[dict[str, Any]]: data = self.calculate_route(destination_address, travel_mode) return self.__extract_travel_times(data, destination_address, travel_mode) @property def url(self): return f"https://www.rightmove.co.uk/properties/{self.identifier}" @property def listingobject(self): if self._cached is None: with open(self.path_listing_json()) as f: return json.load(f) @property def detailobject(self) -> dict[str, Any]: if self._cached is None: with open(self.path_detail_json()) as f: self._cached = json.load(f) return self._cached # type: ignore @property def price(self) -> float: return self.detailobject["property"]["price"] @property def tenure_type(self) -> str: return self.detailobject["property"]["tenureType"] async def price_per_sqm(self) -> float: sqm_ocr = await self.sqm_ocr() if sqm_ocr is None or sqm_ocr == 0: return -1 return self.price / sqm_ocr @property def bedrooms(self) -> int: return self.detailobject["property"]["bedrooms"] @property def latitude(self) -> float: return self.detailobject["property"]["latitude"] @property def longitude(self) -> float: return self.detailobject["property"]["longitude"] @property def leaseLeft(self) -> float | None: ds = self.detailobject["property"].get("tenureInfo", {}).get("content", []) for d in ds: if d["type"] == "lengthOfLease": matches = re.findall(r"(\d+\.?\d*)", d["value"]) if len(matches): return float(matches[0]) return None @property def updateDaysAgo(self) -> int: ts = self.detailobject["property"]["updateDate"] / 1000 now = datetime.datetime.now() ds = datetime.datetime.fromtimestamp(ts) return (now - ds).days @property def last_seen(self) -> int: with open(self.path_last_seen_listing(), "r") as f: datetime_str = json.load(f) dt = datetime.datetime.fromisoformat(datetime_str) return (datetime.datetime.now() - dt).days @property def serviceCharge(self) -> float | None: ds = self.detailobject["property"].get("tenureInfo", {}).get("content", []) for d in ds: if d["type"] == "annualServiceCharge": matches = re.findall(r"([\d,.]+)", d["value"]) if len(matches): # remove separators (e.g. 6,395.76) match = matches[0].replace(",", "") return float(match) return None @property def development(self) -> bool: # aka new home try: return self.detailobject["property"]["development"] except Exception: return False @property def isRemoved(self) -> bool: return not self.detailobject["property"]["visible"] @property def status(self) -> str: if self.isRemoved: return "removed" status = self.detailobject["property"]["status"] return status @property def agency(self) -> str: return self.detailobject["property"]["branch"]["brandName"] @property def councilTaxBand(self) -> str: return self.detailobject["property"]["councilTaxInfo"]["content"][0]["value"] @property def photoThumbnail(self) -> str | None: # options are: 'url', 'thumbnailUrl', 'maxSizeUrl' photos = self.detailobject["property"]["photos"] if len(photos) > 0: return photos[0]["url"] return None @property def letDateAvailable(self) -> datetime.datetime | None: # options are: 'url', 'thumbnailUrl', 'maxSizeUrl' let_date_available: str | None = self.detailobject["property"][ "letDateAvailable" ] # Seems null for all assets? if let_date_available is None: return None if let_date_available == "Now": return datetime.datetime.now() try: return datetime.datetime.strptime(let_date_available, "%d/%m/%Y") except ValueError: # If the date format is not as expected, return None return None @property def priceHistory(self) -> list[dict[str, Any]]: if not self.path_price_history().exists(): return [] with open(self.path_price_history(), "r") as f: return json.load(f) async def dict_nicely(self): travel_time_fastest = {} travel_time_second = {} if self.path_routing_json().exists(): with open(self.path_routing_json(), "r") as f: travel_times = json.load(f) for destination_mode in travel_times.keys(): destination_mode_clean = destination_mode.replace(" ", "_").replace( ",", "_" ) destination, travel_mode = self.__from_routing_cache_key( destination_mode ) travel_time_fastest[destination_mode_clean] = self.travel_time( destination, travel_mode )[0]["duration"] travel_time_second[destination_mode_clean] = self.travel_time( destination, travel_mode )[1]["duration"] return { "identifier": self.identifier, "sqm_ocr": await self.sqm_ocr(), "price": self.price, "price_per_sqm": await self.price_per_sqm(), "url": self.url, "bedrooms": self.bedrooms, "travel_time_fastest": ":".join( sorted( f"{dest} in {travel_mode//60}min" for dest, travel_mode in travel_time_fastest.items() ) ), "travel_time_second": ":".join( sorted( f"{dest} in {travel_mode//60}min" for dest, travel_mode in travel_time_second.items() ) ), "lease_left": self.leaseLeft, "service_charge": self.serviceCharge, "development": self.development, "tenure_type": self.tenure_type, "updated_days": self.updateDaysAgo, "status": self.status, "last_seen": self.last_seen, "agency": self.agency, "council_tax_band": self.councilTaxBand, "photo_thumbnail": self.photoThumbnail, "let_date_available": ( self.letDateAvailable.strftime("%d/%m/%Y") if self.letDateAvailable else "Ask agent" ), "price_history": self.priceHistory, } def __routing_cache_key( self, dest_address: str, travel_mode: routing.TravelMode, ) -> str: return f"{dest_address} by {travel_mode}" def __from_routing_cache_key( self, cache_key: str, ) -> tuple[str, routing.TravelMode]: match = re.match(r"(.+) by (.+)", cache_key) if not match: raise ValueError(f"Invalid cache key: {cache_key}") return match.group(1), routing.TravelMode[match.group(2)] def __extract_travel_times( self, routing_data: dict[str, Any], destination_address: str, travel_mode: routing.TravelMode, limit: int = 2, ) -> list[dict[str, Any]]: res = [] cache_key = self.__routing_cache_key(destination_address, travel_mode) for route in routing_data[cache_key]["routes"]: distance = route["distanceMeters"] duration = int(route["duration"].strip("s")) duration_static = int(route["staticDuration"].strip("s")) steps = route["legs"][0]["steps"] initial_walk_duration = 0 used_transit = False duration_per_transit = defaultdict(lambda: 0) distance_per_transit = defaultdict(lambda: 0) number_of_transit_stops = 0 for step in steps: if not used_transit and step["travelMode"] == "WALK": initial_walk_duration += int(step["staticDuration"].strip("s")) else: used_transit = True duration_per_transit[step["travelMode"]] += int( step["staticDuration"].strip("s") ) distance_per_transit[step["travelMode"]] += step.get( "distanceMeters", 0 ) if step["travelMode"] == "TRANSIT": number_of_transit_stops += 1 res.append( { "duration": duration, "distance": distance, "duration_static": duration_static, "initial_walk_duration": initial_walk_duration, "duration_per_transit": dict(duration_per_transit), "distance_per_transit": dict(distance_per_transit), "number_of_transit_stops": number_of_transit_stops, } ) return res[:limit] def __get_routing_cache(self) -> dict[str, Any]: try: with open(self.path_routing_json(), "x") as f: json.dump({}, f) return {} except FileExistsError: pass with open(self.path_routing_json(), "r") as f: return json.load(f)