wrongmove/crawler/data_access.py

513 lines
17 KiB
Python
Raw Normal View History

2025-05-17 22:58:35 +00:00
import asyncio
from collections import defaultdict
from dataclasses import dataclass
import json
import pathlib
2025-05-18 12:27:26 +00:00
from typing import Any, List, Dict
from rec import floorplan, routing
import re
2024-08-11 19:36:25 +01:00
import datetime
@dataclass()
2024-03-25 20:48:48 +00:00
class Listing:
identifier: int
2025-05-18 12:27:26 +00:00
_cached: Dict | None = None
data_dir: pathlib.Path = pathlib.Path("data/rs/")
2025-05-17 20:13:28 +00:00
ALL_COLUMNS = [
"identifier",
"sqm_ocr",
"price",
"price_per_sqm",
"url",
"bedrooms",
"travel_time_fastest",
"travel_time_second",
"lease_left",
"service_charge",
"development",
"tenure_type",
"updated_days",
"status",
"last_seen",
2025-05-18 17:41:50 +00:00
"agency",
2025-05-18 18:02:19 +00:00
"council_tax_band",
2025-05-17 20:13:28 +00:00
]
2024-03-25 20:48:48 +00:00
@staticmethod
2025-05-17 20:13:28 +00:00
def get_all_listings(
listing_paths: list[str],
seen_in_the_last_n_days: int = 30,
) -> List["Listing"]:
identifiers = []
for listing_path in listing_paths:
with open(listing_path) as f:
d = json.load(f)
# data_dir is the first directory before the listing_path
data_dir = pathlib.Path(listing_path)
2025-05-31 23:50:43 +00:00
while str(d["identifier"]) in str(data_dir.resolve().absolute()):
data_dir = data_dir.parent
2025-05-17 20:13:28 +00:00
listing = Listing(d["identifier"], data_dir=data_dir)
2025-05-31 23:50:43 +00:00
if (
listing.last_seen is not None
and listing.last_seen < seen_in_the_last_n_days
):
2025-05-17 20:13:28 +00:00
identifiers.append(listing)
2024-03-25 20:48:48 +00:00
return identifiers
def path_listing(self) -> pathlib.Path:
p = self.data_dir / str(self.identifier)
p.mkdir(parents=True, exist_ok=True)
return p
2024-03-25 20:48:48 +00:00
def path_listing_json(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "listing.json"
def path_detail_json(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "detail.json"
def path_routing_json(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "routing.json"
def path_floorplan_model_json(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "floorplan_model.json"
def path_floorplan_ocr_json(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "floorplan_ocr.json"
def path_pic_folder(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "pics"
def path_pic_file(self, order, name) -> pathlib.Path:
self.path_pic_folder().mkdir(parents=True, exist_ok=True)
2024-03-25 20:48:48 +00:00
return self.path_pic_folder() / f"{order}_{name}"
def path_floorplan_folder(self) -> pathlib.Path:
2024-03-25 20:48:48 +00:00
return self.path_listing() / "floorplans"
def path_floorplan_file(self, order, name) -> pathlib.Path:
self.path_floorplan_folder().mkdir(parents=True, exist_ok=True)
2024-03-25 20:48:48 +00:00
return self.path_floorplan_folder() / f"{order}_{name}"
2025-05-07 21:25:40 +00:00
2025-01-26 21:39:51 +00:00
def path_last_seen_listing(self) -> pathlib.Path:
return self.path_listing() / "last_seen.json"
2025-05-07 21:25:40 +00:00
def path_price_history(self) -> pathlib.Path:
return self.path_listing() / "price_history.json"
2025-01-26 21:39:51 +00:00
def dump_listing(self, d: dict):
with open(self.path_listing_json(), "w") as f:
json.dump(d, f)
with open(self.path_last_seen_listing(), "w") as f:
dt = datetime.datetime.now().isoformat()
json.dump(dt, f)
2024-03-25 20:48:48 +00:00
# some places list pw in price and others pcm
price = max(d["price"], d.get("monthlyRent", 0))
self.append_price_history(price)
def append_price_history(self, price: float) -> None:
"""Append the price history to the listing's price history file."""
existing_price_history = (
json.loads(self.path_price_history().read_text())
if self.path_price_history().exists()
else []
)
now = datetime.datetime.now().isoformat()
# if the last price is the same, just update the date
if len(existing_price_history) > 0:
last_price = existing_price_history[-1]["price"]
if last_price == price:
existing_price_history[-1]["last_seen"] = now
else:
existing_price_history.append(
{
"first_seen": now,
"last_seen": now,
"price": price,
}
)
with open(self.path_price_history(), "w") as f:
json.dump(existing_price_history, f, indent=4)
def list_floorplans(self):
2024-03-25 20:48:48 +00:00
images = list(self.path_floorplan_folder().glob("*"))
# todo add check if return is image
return images
2024-03-25 20:48:48 +00:00
def calculate_sqm_model(self):
objs = []
for floorplan_path in self.list_floorplans():
2025-05-31 23:50:43 +00:00
estimated_sqm, model_output, predictions = floorplan.calculate_model(
floorplan_path
)
objs.append(
{
"floorplan_path": str(floorplan_path),
"estimated_sqm": estimated_sqm,
"model_output": model_output,
"no_predictions": len(
predictions
), # cant serialize the predictions itself since its a tensor
}
)
2024-03-25 20:48:48 +00:00
with open(self.path_floorplan_model_json(), "w") as f:
json.dump(objs, f)
2024-03-25 20:48:48 +00:00
@property
def sqm_model(self, recalculate=True) -> float:
if not self.path_floorplan_model_json().exists() or recalculate:
self.calculate_sqm_model()
2024-03-25 20:48:48 +00:00
with open(self.path_floorplan_json()) as f:
objs = json.load(f)
2024-03-25 20:48:48 +00:00
2025-05-31 23:50:43 +00:00
max_sqm = max(
[o["estimated_sqm"] for o in objs if o is None]
) # filter out Nones
return max_sqm
2024-03-25 20:48:48 +00:00
2025-05-17 22:58:35 +00:00
async def calculate_sqm_ocr(self, recalculate=True):
objs = []
if self.path_floorplan_ocr_json().exists():
with open(self.path_floorplan_ocr_json()) as f:
objs = json.load(f)
if not recalculate and len(objs) > 0:
return
2024-03-25 20:48:48 +00:00
for floorplan_path in self.list_floorplans():
2025-05-17 22:58:35 +00:00
estimated_sqm, model_output = await asyncio.to_thread(
2025-05-31 23:50:43 +00:00
floorplan.calculate_ocr, floorplan_path
)
objs.append(
{
"floorplan_path": str(floorplan_path),
"estimated_sqm": estimated_sqm,
"text": model_output,
}
)
2024-03-25 20:48:48 +00:00
with open(self.path_floorplan_ocr_json(), "w") as f:
json.dump(objs, f)
2024-03-25 20:48:48 +00:00
async def sqm_ocr(self, recalculate=False) -> float | None:
if not self.path_floorplan_ocr_json().exists() or recalculate:
await self.calculate_sqm_ocr()
2024-03-25 20:48:48 +00:00
with open(self.path_floorplan_ocr_json()) as f:
objs = json.load(f)
2024-03-25 20:48:48 +00:00
2025-05-31 23:50:43 +00:00
sqms = [o["estimated_sqm"] for o in objs if o["estimated_sqm"] is not None]
if len(sqms) == 0:
return None
max_sqm = max(sqms)
return max_sqm
2024-03-25 20:48:48 +00:00
2025-05-31 23:50:43 +00:00
def calculate_route(
self, dest_address: str, travel_mode: routing.TravelMode, recalculate=False
) -> dict[str, Any]:
routing_cache = self.__get_routing_cache()
cache_key = self.__routing_cache_key(dest_address, travel_mode)
2025-05-31 23:50:43 +00:00
if (
route_cache := routing_cache.get(cache_key)
) is not None and not recalculate:
return {cache_key: route_cache}
2024-03-25 20:48:48 +00:00
result = routing.transit_route(
self.latitude,
self.longitude,
dest_address,
travel_mode,
)
if not result:
2025-05-31 23:50:43 +00:00
raise Exception(
(
f"Error calculating route from {self.identifier} "
f"to '{dest_address}' by {travel_mode}"
)
)
result = {**{cache_key: result}, **routing_cache}
2024-03-25 20:48:48 +00:00
with open(self.path_routing_json(), "w") as f:
json.dump(result, f)
return result
2024-03-25 20:48:48 +00:00
def travel_time(
self,
destination_address: str,
travel_mode: routing.TravelMode,
) -> list[dict[str, Any]]:
data = self.calculate_route(destination_address, travel_mode)
2025-05-31 23:50:43 +00:00
return self.__extract_travel_times(data, destination_address, travel_mode)
2024-03-25 20:48:48 +00:00
@property
def url(self):
2024-03-25 20:48:48 +00:00
return f"https://www.rightmove.co.uk/properties/{self.identifier}"
2025-02-16 03:02:21 +00:00
@property
def listingobject(self):
if self._cached is None:
with open(self.path_listing_json()) as f:
return json.load(f)
2025-05-07 21:25:40 +00:00
@property
2025-05-18 12:27:26 +00:00
def detailobject(self) -> dict[str, Any]:
if self._cached is None:
with open(self.path_detail_json()) as f:
self._cached = json.load(f)
2025-05-18 12:27:26 +00:00
return self._cached # type: ignore
2024-03-25 20:48:48 +00:00
@property
def price(self) -> float:
2024-03-25 20:48:48 +00:00
return self.detailobject["property"]["price"]
2025-05-07 21:25:40 +00:00
@property
def tenure_type(self) -> str:
return self.detailobject["property"]["tenureType"]
2024-03-25 20:48:48 +00:00
async def price_per_sqm(self) -> float:
sqm_ocr = await self.sqm_ocr()
if sqm_ocr is None or sqm_ocr == 0:
2025-05-18 12:27:26 +00:00
return -1
return self.price / sqm_ocr
2024-03-25 20:48:48 +00:00
@property
def bedrooms(self) -> int:
2024-03-25 20:48:48 +00:00
return self.detailobject["property"]["bedrooms"]
@property
def latitude(self) -> float:
2024-03-25 20:48:48 +00:00
return self.detailobject["property"]["latitude"]
@property
def longitude(self) -> float:
2024-03-25 20:48:48 +00:00
return self.detailobject["property"]["longitude"]
@property
2025-05-18 12:27:26 +00:00
def leaseLeft(self) -> float | None:
2025-05-31 23:50:43 +00:00
ds = self.detailobject["property"].get("tenureInfo", {}).get("content", [])
for d in ds:
2024-03-25 20:48:48 +00:00
if d["type"] == "lengthOfLease":
matches = re.findall(r"(\d+\.?\d*)", d["value"])
if len(matches):
return float(matches[0])
return None
2025-05-07 21:25:40 +00:00
2024-08-11 19:36:25 +01:00
@property
def updateDaysAgo(self) -> int:
ts = self.detailobject["property"]["updateDate"] / 1000
now = datetime.datetime.now()
ds = datetime.datetime.fromtimestamp(ts)
return (now - ds).days
2024-03-25 20:48:48 +00:00
2025-01-26 21:39:51 +00:00
@property
def last_seen(self) -> int:
2025-05-31 23:50:43 +00:00
with open(self.path_last_seen_listing(), "r") as f:
2025-01-26 21:39:51 +00:00
datetime_str = json.load(f)
2025-02-14 21:21:50 +00:00
dt = datetime.datetime.fromisoformat(datetime_str)
return (datetime.datetime.now() - dt).days
2025-01-26 21:39:51 +00:00
@property
2025-05-18 12:27:26 +00:00
def serviceCharge(self) -> float | None:
2025-05-31 23:50:43 +00:00
ds = self.detailobject["property"].get("tenureInfo", {}).get("content", [])
for d in ds:
if d["type"] == "annualServiceCharge":
matches = re.findall(r"([\d,.]+)", d["value"])
if len(matches):
# remove separators (e.g. 6,395.76)
match = matches[0].replace(",", "")
return float(match)
return None
@property
def development(self) -> bool:
# aka new home
try:
return self.detailobject["property"]["development"]
2025-05-18 12:27:26 +00:00
except Exception:
return False
2025-05-07 21:25:40 +00:00
2024-09-22 11:31:32 +01:00
@property
def isRemoved(self) -> bool:
2025-02-16 04:44:42 +00:00
return not self.detailobject["property"]["visible"]
2025-05-07 21:25:40 +00:00
2024-09-22 11:31:32 +01:00
@property
def status(self) -> str:
if self.isRemoved:
2025-05-31 23:50:43 +00:00
return "removed"
2024-09-22 11:31:32 +01:00
status = self.detailobject["property"]["status"]
return status
2024-03-25 20:48:48 +00:00
2025-05-18 17:41:50 +00:00
@property
def agency(self) -> str:
2025-05-31 23:50:43 +00:00
return self.detailobject["property"]["branch"]["brandName"]
2025-05-18 17:41:50 +00:00
2025-05-18 18:02:19 +00:00
@property
def councilTaxBand(self) -> str:
2025-05-31 23:50:43 +00:00
return self.detailobject["property"]["councilTaxInfo"]["content"][0]["value"]
2025-05-18 18:02:19 +00:00
@property
def photoThumbnail(self) -> str | None:
# options are: 'url', 'thumbnailUrl', 'maxSizeUrl'
2025-05-31 23:50:43 +00:00
photos = self.detailobject["property"]["photos"]
if len(photos) > 0:
2025-05-31 23:50:43 +00:00
return photos[0]["url"]
return None
2025-06-01 00:51:44 +00:00
@property
def letDateAvailable(self) -> datetime.datetime | None:
# options are: 'url', 'thumbnailUrl', 'maxSizeUrl'
let_date_available: str | None = self.detailobject["property"][
"letDateAvailable"
] # Seems null for all assets?
if let_date_available is None:
return None
if let_date_available == "Now":
return datetime.datetime.now()
try:
return datetime.datetime.strptime(let_date_available, "%d/%m/%Y")
except ValueError:
# If the date format is not as expected, return None
return None
@property
def priceHistory(self) -> list[dict[str, Any]]:
if not self.path_price_history().exists():
return []
with open(self.path_price_history(), "r") as f:
return json.load(f)
async def dict_nicely(self):
travel_time_fastest = {}
travel_time_second = {}
if self.path_routing_json().exists():
with open(self.path_routing_json(), "r") as f:
travel_times = json.load(f)
for destination_mode in travel_times.keys():
2025-05-31 23:50:43 +00:00
destination_mode_clean = destination_mode.replace(" ", "_").replace(
",", "_"
)
destination, travel_mode = self.__from_routing_cache_key(
2025-05-31 23:50:43 +00:00
destination_mode
)
travel_time_fastest[destination_mode_clean] = self.travel_time(
2025-05-31 23:50:43 +00:00
destination, travel_mode
)[0]["duration"]
travel_time_second[destination_mode_clean] = self.travel_time(
2025-05-31 23:50:43 +00:00
destination, travel_mode
)[1]["duration"]
return {
2025-05-31 23:50:43 +00:00
"identifier": self.identifier,
"sqm_ocr": await self.sqm_ocr(),
"price": self.price,
"price_per_sqm": await self.price_per_sqm(),
"url": self.url,
"bedrooms": self.bedrooms,
"travel_time_fastest": ":".join(
sorted(
f"{dest} in {travel_mode//60}min"
for dest, travel_mode in travel_time_fastest.items()
)
),
"travel_time_second": ":".join(
sorted(
f"{dest} in {travel_mode//60}min"
for dest, travel_mode in travel_time_second.items()
)
),
"lease_left": self.leaseLeft,
"service_charge": self.serviceCharge,
"development": self.development,
"tenure_type": self.tenure_type,
"updated_days": self.updateDaysAgo,
"status": self.status,
"last_seen": self.last_seen,
"agency": self.agency,
"council_tax_band": self.councilTaxBand,
"photo_thumbnail": self.photoThumbnail,
2025-06-01 00:51:44 +00:00
"let_date_available": (
self.letDateAvailable.strftime("%d/%m/%Y")
if self.letDateAvailable
else "Ask agent"
),
"price_history": self.priceHistory,
}
def __routing_cache_key(
self,
dest_address: str,
travel_mode: routing.TravelMode,
) -> str:
return f"{dest_address} by {travel_mode}"
def __from_routing_cache_key(
self,
cache_key: str,
) -> tuple[str, routing.TravelMode]:
match = re.match(r"(.+) by (.+)", cache_key)
if not match:
raise ValueError(f"Invalid cache key: {cache_key}")
return match.group(1), routing.TravelMode[match.group(2)]
def __extract_travel_times(
self,
routing_data: dict[str, Any],
destination_address: str,
travel_mode: routing.TravelMode,
limit: int = 2,
) -> list[dict[str, Any]]:
res = []
cache_key = self.__routing_cache_key(destination_address, travel_mode)
for route in routing_data[cache_key]["routes"]:
distance = route["distanceMeters"]
duration = int(route["duration"].strip("s"))
duration_static = int(route["staticDuration"].strip("s"))
steps = route["legs"][0]["steps"]
initial_walk_duration = 0
used_transit = False
duration_per_transit = defaultdict(lambda: 0)
distance_per_transit = defaultdict(lambda: 0)
number_of_transit_stops = 0
for step in steps:
if not used_transit and step["travelMode"] == "WALK":
2025-05-31 23:50:43 +00:00
initial_walk_duration += int(step["staticDuration"].strip("s"))
else:
used_transit = True
duration_per_transit[step["travelMode"]] += int(
2025-05-31 23:50:43 +00:00
step["staticDuration"].strip("s")
)
distance_per_transit[step["travelMode"]] += step.get(
2025-05-31 23:50:43 +00:00
"distanceMeters", 0
)
if step["travelMode"] == "TRANSIT":
number_of_transit_stops += 1
2025-05-31 23:50:43 +00:00
res.append(
{
"duration": duration,
"distance": distance,
"duration_static": duration_static,
"initial_walk_duration": initial_walk_duration,
"duration_per_transit": dict(duration_per_transit),
"distance_per_transit": dict(distance_per_transit),
"number_of_transit_stops": number_of_transit_stops,
}
)
return res[:limit]
def __get_routing_cache(self) -> dict[str, Any]:
try:
2025-05-31 23:50:43 +00:00
with open(self.path_routing_json(), "x") as f:
json.dump({}, f)
return {}
except FileExistsError:
pass
2025-05-31 23:50:43 +00:00
with open(self.path_routing_json(), "r") as f:
return json.load(f)