451 lines
15 KiB
Python
451 lines
15 KiB
Python
import asyncio
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
import json
|
|
import pathlib
|
|
from typing import Any, List, Dict
|
|
from rec import floorplan, routing
|
|
import re
|
|
import datetime
|
|
|
|
|
|
@dataclass()
|
|
class Listing:
|
|
identifier: int
|
|
_cached: Dict | None = None
|
|
data_dir: pathlib.Path = pathlib.Path("data/rs/")
|
|
ALL_COLUMNS = [
|
|
"identifier",
|
|
"sqm_ocr",
|
|
"price",
|
|
"price_per_sqm",
|
|
"url",
|
|
"bedrooms",
|
|
"travel_time_fastest",
|
|
"travel_time_second",
|
|
"lease_left",
|
|
"service_charge",
|
|
"development",
|
|
"tenure_type",
|
|
"updated_days",
|
|
"status",
|
|
"last_seen",
|
|
"agency",
|
|
"council_tax_band",
|
|
]
|
|
|
|
@staticmethod
|
|
def get_all_listings(
|
|
listing_paths: list[str],
|
|
seen_in_the_last_n_days: int = 30,
|
|
) -> List["Listing"]:
|
|
identifiers = []
|
|
for listing_path in listing_paths:
|
|
with open(listing_path) as f:
|
|
d = json.load(f)
|
|
|
|
# data_dir is the first directory before the listing_path
|
|
data_dir = pathlib.Path(listing_path)
|
|
while str(d['identifier']) in str(data_dir.resolve().absolute()):
|
|
data_dir = data_dir.parent
|
|
listing = Listing(d["identifier"], data_dir=data_dir)
|
|
if (listing.last_seen is not None
|
|
and listing.last_seen < seen_in_the_last_n_days):
|
|
identifiers.append(listing)
|
|
|
|
return identifiers
|
|
|
|
def path_listing(self) -> pathlib.Path:
|
|
p = self.data_dir / str(self.identifier)
|
|
p.mkdir(parents=True, exist_ok=True)
|
|
return p
|
|
|
|
def path_listing_json(self) -> pathlib.Path:
|
|
return self.path_listing() / "listing.json"
|
|
|
|
def path_detail_json(self) -> pathlib.Path:
|
|
return self.path_listing() / "detail.json"
|
|
|
|
def path_routing_json(self) -> pathlib.Path:
|
|
return self.path_listing() / "routing.json"
|
|
|
|
def path_floorplan_model_json(self) -> pathlib.Path:
|
|
return self.path_listing() / "floorplan_model.json"
|
|
|
|
def path_floorplan_ocr_json(self) -> pathlib.Path:
|
|
return self.path_listing() / "floorplan_ocr.json"
|
|
|
|
def path_pic_folder(self) -> pathlib.Path:
|
|
return self.path_listing() / "pics"
|
|
|
|
def path_pic_file(self, order, name) -> pathlib.Path:
|
|
self.path_pic_folder().mkdir(parents=True, exist_ok=True)
|
|
return self.path_pic_folder() / f"{order}_{name}"
|
|
|
|
def path_floorplan_folder(self) -> pathlib.Path:
|
|
return self.path_listing() / "floorplans"
|
|
|
|
def path_floorplan_file(self, order, name) -> pathlib.Path:
|
|
self.path_floorplan_folder().mkdir(parents=True, exist_ok=True)
|
|
return self.path_floorplan_folder() / f"{order}_{name}"
|
|
|
|
def path_last_seen_listing(self) -> pathlib.Path:
|
|
return self.path_listing() / "last_seen.json"
|
|
|
|
def dump_listing(self, d: dict):
|
|
with open(self.path_listing_json(), "w") as f:
|
|
json.dump(d, f)
|
|
with open(self.path_last_seen_listing(), "w") as f:
|
|
dt = datetime.datetime.now().isoformat()
|
|
json.dump(dt, f)
|
|
|
|
def list_floorplans(self):
|
|
images = list(self.path_floorplan_folder().glob("*"))
|
|
# todo add check if return is image
|
|
return images
|
|
|
|
def calculate_sqm_model(self):
|
|
objs = []
|
|
for floorplan_path in self.list_floorplans():
|
|
estimated_sqm, model_output, predictions = (
|
|
floorplan.calculate_model(floorplan_path))
|
|
objs.append({
|
|
"floorplan_path": str(floorplan_path),
|
|
"estimated_sqm": estimated_sqm,
|
|
"model_output": model_output,
|
|
"no_predictions": len(
|
|
predictions
|
|
), # cant serialize the predictions itself since its a tensor
|
|
})
|
|
|
|
with open(self.path_floorplan_model_json(), "w") as f:
|
|
json.dump(objs, f)
|
|
|
|
@property
|
|
def sqm_model(self, recalculate=True):
|
|
if not self.path_floorplan_model_json().exists() or recalculate:
|
|
self.calculate_sqm_model()
|
|
|
|
with open(self.path_floorplan_json()) as f:
|
|
objs = json.load(f)
|
|
|
|
max_sqm = max([o["estimated_sqm"] for o in objs
|
|
if o is None]) # filter out Nones
|
|
return max_sqm
|
|
|
|
async def calculate_sqm_ocr(self, recalculate=True):
|
|
if not recalculate and self.path_floorplan_ocr_json().exists():
|
|
return
|
|
|
|
objs = []
|
|
for floorplan_path in self.list_floorplans():
|
|
estimated_sqm, model_output = await asyncio.to_thread(
|
|
floorplan.calculate_ocr, floorplan_path)
|
|
objs.append({
|
|
"floorplan_path": str(floorplan_path),
|
|
"estimated_sqm": estimated_sqm,
|
|
"text": model_output,
|
|
})
|
|
|
|
with open(self.path_floorplan_ocr_json(), "w") as f:
|
|
json.dump(objs, f)
|
|
|
|
async def sqm_ocr(self, recalculate=False):
|
|
if not self.path_floorplan_ocr_json().exists() or recalculate:
|
|
await self.calculate_sqm_ocr()
|
|
|
|
with open(self.path_floorplan_ocr_json()) as f:
|
|
objs = json.load(f)
|
|
|
|
sqms = [
|
|
o["estimated_sqm"] for o in objs if o["estimated_sqm"] is not None
|
|
]
|
|
if len(sqms) == 0:
|
|
return None
|
|
max_sqm = max(sqms)
|
|
return max_sqm
|
|
|
|
def calculate_route(self,
|
|
dest_address: str,
|
|
travel_mode: routing.TravelMode,
|
|
recalculate=False) -> dict[str, Any]:
|
|
routing_cache = self.__get_routing_cache()
|
|
cache_key = self.__routing_cache_key(dest_address, travel_mode)
|
|
if (route_cache :=
|
|
routing_cache.get(cache_key)) is not None and not recalculate:
|
|
return {cache_key: route_cache}
|
|
|
|
result = routing.transit_route(
|
|
self.latitude,
|
|
self.longitude,
|
|
dest_address,
|
|
travel_mode,
|
|
)
|
|
if not result:
|
|
raise Exception((f"Error calculating route from {self.identifier} "
|
|
f"to '{dest_address}' by {travel_mode}"))
|
|
result = {**{cache_key: result}, **routing_cache}
|
|
with open(self.path_routing_json(), "w") as f:
|
|
json.dump(result, f)
|
|
return result
|
|
|
|
def travel_time(
|
|
self,
|
|
destination_address: str,
|
|
travel_mode: routing.TravelMode,
|
|
) -> list[dict[str, Any]]:
|
|
data = self.calculate_route(destination_address, travel_mode)
|
|
return self.__extract_travel_times(data, destination_address,
|
|
travel_mode)
|
|
|
|
@property
|
|
def url(self):
|
|
return f"https://www.rightmove.co.uk/properties/{self.identifier}"
|
|
|
|
@property
|
|
def listingobject(self):
|
|
if self._cached is None:
|
|
with open(self.path_listing_json()) as f:
|
|
return json.load(f)
|
|
|
|
@property
|
|
def detailobject(self) -> dict[str, Any]:
|
|
if self._cached is None:
|
|
with open(self.path_detail_json()) as f:
|
|
self._cached = json.load(f)
|
|
return self._cached # type: ignore
|
|
|
|
@property
|
|
def price(self) -> float:
|
|
return self.detailobject["property"]["price"]
|
|
|
|
@property
|
|
def tenure_type(self) -> str:
|
|
return self.detailobject["property"]["tenureType"]
|
|
|
|
async def price_per_sqm(self) -> float:
|
|
sqm_ocr = await self.sqm_ocr()
|
|
if sqm_ocr is None or sqm_ocr == 0:
|
|
return -1
|
|
return self.price / sqm_ocr
|
|
|
|
@property
|
|
def bedrooms(self) -> int:
|
|
return self.detailobject["property"]["bedrooms"]
|
|
|
|
@property
|
|
def latitude(self) -> float:
|
|
return self.detailobject["property"]["latitude"]
|
|
|
|
@property
|
|
def longitude(self) -> float:
|
|
return self.detailobject["property"]["longitude"]
|
|
|
|
@property
|
|
def leaseLeft(self) -> float | None:
|
|
ds = self.detailobject["property"].get("tenureInfo",
|
|
{}).get("content", [])
|
|
for d in ds:
|
|
if d["type"] == "lengthOfLease":
|
|
matches = re.findall(r"(\d+\.?\d*)", d["value"])
|
|
if len(matches):
|
|
return float(matches[0])
|
|
return None
|
|
|
|
@property
|
|
def updateDaysAgo(self) -> int:
|
|
ts = self.detailobject["property"]["updateDate"] / 1000
|
|
now = datetime.datetime.now()
|
|
ds = datetime.datetime.fromtimestamp(ts)
|
|
return (now - ds).days
|
|
|
|
@property
|
|
def last_seen(self) -> int | None:
|
|
if not self.path_last_seen_listing().exists():
|
|
return None
|
|
|
|
with open(self.path_last_seen_listing(), 'r') as f:
|
|
datetime_str = json.load(f)
|
|
dt = datetime.datetime.fromisoformat(datetime_str)
|
|
return (datetime.datetime.now() - dt).days
|
|
|
|
@property
|
|
def serviceCharge(self) -> float | None:
|
|
ds = self.detailobject["property"].get("tenureInfo",
|
|
{}).get("content", [])
|
|
for d in ds:
|
|
if d["type"] == "annualServiceCharge":
|
|
matches = re.findall(r"([\d,.]+)", d["value"])
|
|
if len(matches):
|
|
# remove separators (e.g. 6,395.76)
|
|
match = matches[0].replace(",", "")
|
|
return float(match)
|
|
return None
|
|
|
|
@property
|
|
def development(self) -> bool:
|
|
# aka new home
|
|
try:
|
|
return self.detailobject["property"]["development"]
|
|
except Exception:
|
|
return False
|
|
|
|
@property
|
|
def isRemoved(self) -> bool:
|
|
return not self.detailobject["property"]["visible"]
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
if self.isRemoved:
|
|
return 'removed'
|
|
status = self.detailobject["property"]["status"]
|
|
return status
|
|
|
|
@property
|
|
def agency(self) -> str:
|
|
return self.detailobject['property']["branch"]["brandName"]
|
|
|
|
@property
|
|
def councilTaxBand(self) -> str:
|
|
return self.detailobject['property']["councilTaxInfo"]["content"][0][
|
|
"value"]
|
|
|
|
@property
|
|
def photoThumbnail(self) -> str | None:
|
|
# options are: 'url', 'thumbnailUrl', 'maxSizeUrl'
|
|
photos = self.detailobject['property']['photos']
|
|
if len(photos) > 0:
|
|
return photos[0]['url']
|
|
return None
|
|
|
|
async def dict_nicely(self):
|
|
travel_time_fastest = {}
|
|
travel_time_second = {}
|
|
if self.path_routing_json().exists():
|
|
with open(self.path_routing_json(), "r") as f:
|
|
travel_times = json.load(f)
|
|
for destination_mode in travel_times.keys():
|
|
destination_mode_clean = destination_mode.replace(" ",
|
|
"_").replace(
|
|
",", "_")
|
|
destination, travel_mode = self.__from_routing_cache_key(
|
|
destination_mode)
|
|
travel_time_fastest[destination_mode_clean] = self.travel_time(
|
|
destination, travel_mode)[0]['duration']
|
|
travel_time_second[destination_mode_clean] = self.travel_time(
|
|
destination, travel_mode)[1]['duration']
|
|
|
|
return {
|
|
"identifier":
|
|
self.identifier,
|
|
"sqm_ocr":
|
|
await self.sqm_ocr(),
|
|
"price":
|
|
self.price,
|
|
"price_per_sqm":
|
|
await self.price_per_sqm(),
|
|
"url":
|
|
self.url,
|
|
"bedrooms":
|
|
self.bedrooms,
|
|
"travel_time_fastest":
|
|
":".join(
|
|
sorted(f'{dest} in {travel_mode//60}min'
|
|
for dest, travel_mode in travel_time_fastest.items())),
|
|
"travel_time_second":
|
|
":".join(
|
|
sorted(f'{dest} in {travel_mode//60}min'
|
|
for dest, travel_mode in travel_time_second.items())),
|
|
"lease_left":
|
|
self.leaseLeft,
|
|
"service_charge":
|
|
self.serviceCharge,
|
|
"development":
|
|
self.development,
|
|
"tenure_type":
|
|
self.tenure_type,
|
|
"updated_days":
|
|
self.updateDaysAgo,
|
|
"status":
|
|
self.status,
|
|
"last_seen":
|
|
self.last_seen,
|
|
"agency":
|
|
self.agency,
|
|
"council_tax_band":
|
|
self.councilTaxBand,
|
|
"photo_thumbnail":
|
|
self.photoThumbnail,
|
|
}
|
|
|
|
def __routing_cache_key(
|
|
self,
|
|
dest_address: str,
|
|
travel_mode: routing.TravelMode,
|
|
) -> str:
|
|
return f"{dest_address} by {travel_mode}"
|
|
|
|
def __from_routing_cache_key(
|
|
self,
|
|
cache_key: str,
|
|
) -> tuple[str, routing.TravelMode]:
|
|
match = re.match(r"(.+) by (.+)", cache_key)
|
|
if not match:
|
|
raise ValueError(f"Invalid cache key: {cache_key}")
|
|
return match.group(1), routing.TravelMode[match.group(2)]
|
|
|
|
def __extract_travel_times(
|
|
self,
|
|
routing_data: dict[str, Any],
|
|
destination_address: str,
|
|
travel_mode: routing.TravelMode,
|
|
limit: int = 2,
|
|
) -> list[dict[str, Any]]:
|
|
res = []
|
|
cache_key = self.__routing_cache_key(destination_address, travel_mode)
|
|
for route in routing_data[cache_key]["routes"]:
|
|
distance = route["distanceMeters"]
|
|
duration = int(route["duration"].strip("s"))
|
|
duration_static = int(route["staticDuration"].strip("s"))
|
|
|
|
steps = route["legs"][0]["steps"]
|
|
initial_walk_duration = 0
|
|
used_transit = False
|
|
duration_per_transit = defaultdict(lambda: 0)
|
|
distance_per_transit = defaultdict(lambda: 0)
|
|
number_of_transit_stops = 0
|
|
|
|
for step in steps:
|
|
if not used_transit and step["travelMode"] == "WALK":
|
|
initial_walk_duration += int(
|
|
step["staticDuration"].strip("s"))
|
|
else:
|
|
used_transit = True
|
|
duration_per_transit[step["travelMode"]] += int(
|
|
step["staticDuration"].strip("s"))
|
|
distance_per_transit[step["travelMode"]] += step.get(
|
|
"distanceMeters", 0)
|
|
if step["travelMode"] == "TRANSIT":
|
|
number_of_transit_stops += 1
|
|
|
|
res.append({
|
|
"duration": duration,
|
|
"distance": distance,
|
|
"duration_static": duration_static,
|
|
"initial_walk_duration": initial_walk_duration,
|
|
"duration_per_transit": dict(duration_per_transit),
|
|
"distance_per_transit": dict(distance_per_transit),
|
|
"number_of_transit_stops": number_of_transit_stops,
|
|
})
|
|
|
|
return res[:limit]
|
|
|
|
def __get_routing_cache(self) -> dict[str, Any]:
|
|
try:
|
|
with open(self.path_routing_json(), 'x') as f:
|
|
json.dump({}, f)
|
|
return {}
|
|
except FileExistsError:
|
|
pass
|
|
with open(self.path_routing_json(), 'r') as f:
|
|
return json.load(f)
|