wrongmove/crawler/data_access.py

341 lines
10 KiB
Python

import asyncio
from dataclasses import dataclass
import json
import pathlib
from typing import Any, List, Dict
from rec import floorplan, routing
import re
import datetime
@dataclass()
class Listing:
identifier: int
_cached: Dict | None = None
data_dir: pathlib.Path = pathlib.Path("data/rs/")
ALL_COLUMNS = [
"identifier",
"sqm_ocr",
"price",
"price_per_sqm",
"url",
"bedrooms",
"travel_time_fastest",
"travel_time_second",
"lease_left",
"service_charge",
"development",
"tenure_type",
"updated_days",
"status",
"last_seen",
"agency",
"council_tax_band",
]
@staticmethod
def get_all_listings(
listing_paths: list[str],
seen_in_the_last_n_days: int = 30,
) -> List["Listing"]:
identifiers = []
for listing_path in listing_paths:
with open(listing_path) as f:
d = json.load(f)
# data_dir is the first directory before the listing_path
data_dir = pathlib.Path(listing_path)
while str(d['identifier']) in str(data_dir.resolve().absolute()):
data_dir = data_dir.parent
listing = Listing(d["identifier"], data_dir=data_dir)
if (listing.last_seen is not None
and listing.last_seen < seen_in_the_last_n_days):
identifiers.append(listing)
return identifiers
def path_listing(self) -> pathlib.Path:
p = self.data_dir / str(self.identifier)
p.mkdir(parents=True, exist_ok=True)
return p
def path_listing_json(self) -> pathlib.Path:
return self.path_listing() / "listing.json"
def path_detail_json(self) -> pathlib.Path:
return self.path_listing() / "detail.json"
def path_routing_json(self) -> pathlib.Path:
return self.path_listing() / "routing.json"
def path_floorplan_model_json(self) -> pathlib.Path:
return self.path_listing() / "floorplan_model.json"
def path_floorplan_ocr_json(self) -> pathlib.Path:
return self.path_listing() / "floorplan_ocr.json"
def path_pic_folder(self) -> pathlib.Path:
return self.path_listing() / "pics"
def path_pic_file(self, order, name) -> pathlib.Path:
self.path_pic_folder().mkdir(parents=True, exist_ok=True)
return self.path_pic_folder() / f"{order}_{name}"
def path_floorplan_folder(self) -> pathlib.Path:
return self.path_listing() / "floorplans"
def path_floorplan_file(self, order, name) -> pathlib.Path:
self.path_floorplan_folder().mkdir(parents=True, exist_ok=True)
return self.path_floorplan_folder() / f"{order}_{name}"
def path_last_seen_listing(self) -> pathlib.Path:
return self.path_listing() / "last_seen.json"
def dump_listing(self, d: dict):
with open(self.path_listing_json(), "w") as f:
json.dump(d, f)
with open(self.path_last_seen_listing(), "w") as f:
dt = datetime.datetime.now().isoformat()
json.dump(dt, f)
def list_floorplans(self):
images = list(self.path_floorplan_folder().glob("*"))
# todo add check if return is image
return images
def calculate_sqm_model(self):
objs = []
for floorplan_path in self.list_floorplans():
estimated_sqm, model_output, predictions = floorplan.calculate_model(
floorplan_path)
objs.append({
"floorplan_path": str(floorplan_path),
"estimated_sqm": estimated_sqm,
"model_output": model_output,
"no_predictions": len(
predictions
), # cant serialize the predictions itself since its a tensor
})
with open(self.path_floorplan_model_json(), "w") as f:
json.dump(objs, f)
@property
def sqm_model(self, recalculate=True):
if not self.path_floorplan_model_json().exists() or recalculate:
self.calculate_sqm_model()
with open(self.path_floorplan_json()) as f:
objs = json.load(f)
max_sqm = max([o["estimated_sqm"] for o in objs
if o is None]) # filter out Nones
return max_sqm
async def calculate_sqm_ocr(self, recalculate=True):
if not recalculate and self.path_floorplan_ocr_json().exists():
return
objs = []
for floorplan_path in self.list_floorplans():
estimated_sqm, model_output = await asyncio.to_thread(
floorplan.calculate_ocr, floorplan_path)
objs.append({
"floorplan_path": str(floorplan_path),
"estimated_sqm": estimated_sqm,
"text": model_output,
})
with open(self.path_floorplan_ocr_json(), "w") as f:
json.dump(objs, f)
@property
def sqm_ocr(self, recalculate=False):
if not self.path_floorplan_ocr_json().exists() or recalculate:
self.calculate_sqm_ocr()
with open(self.path_floorplan_ocr_json()) as f:
objs = json.load(f)
sqms = [
o["estimated_sqm"] for o in objs if o["estimated_sqm"] is not None
]
if len(sqms) == 0:
return None
max_sqm = max(sqms)
return max_sqm
def calculate_route(self,
dest_address: str,
travel_mode: routing.TravelMode,
recalculate=False):
if self.path_routing_json().exists() and not recalculate:
return
result = routing.transit_route(
self.latitude,
self.longitude,
dest_address,
travel_mode,
)
with open(self.path_routing_json(), "w") as f:
json.dump(result, f)
@property
def travel_time(self) -> List:
if not self.path_routing_json().exists():
return []
with open(self.path_routing_json()) as f:
d = json.load(f)
return routing.extract_time(d)
@property
def url(self):
return f"https://www.rightmove.co.uk/properties/{self.identifier}"
@property
def listingobject(self):
if self._cached is None:
with open(self.path_listing_json()) as f:
return json.load(f)
@property
def detailobject(self) -> dict[str, Any]:
if self._cached is None:
with open(self.path_detail_json()) as f:
self._cached = json.load(f)
return self._cached # type: ignore
@property
def price(self) -> float:
return self.detailobject["property"]["price"]
@property
def tenure_type(self) -> str:
return self.detailobject["property"]["tenureType"]
@property
def price_per_sqm(self) -> float:
if self.sqm_ocr is None or self.sqm_ocr == 0:
return -1
return self.price / self.sqm_ocr
@property
def bedrooms(self) -> int:
return self.detailobject["property"]["bedrooms"]
@property
def latitude(self) -> float:
return self.detailobject["property"]["latitude"]
@property
def longitude(self) -> float:
return self.detailobject["property"]["longitude"]
@property
def leaseLeft(self) -> float | None:
ds = self.detailobject["property"].get("tenureInfo",
{}).get("content", [])
for d in ds:
if d["type"] == "lengthOfLease":
matches = re.findall(r"(\d+\.?\d*)", d["value"])
if len(matches):
return float(matches[0])
return None
@property
def updateDaysAgo(self) -> int:
ts = self.detailobject["property"]["updateDate"] / 1000
now = datetime.datetime.now()
ds = datetime.datetime.fromtimestamp(ts)
return (now - ds).days
@property
def last_seen(self) -> int | None:
if not self.path_last_seen_listing().exists():
return None
with open(self.path_last_seen_listing(), 'r') as f:
datetime_str = json.load(f)
dt = datetime.datetime.fromisoformat(datetime_str)
return (datetime.datetime.now() - dt).days
@property
def serviceCharge(self) -> float | None:
ds = self.detailobject["property"].get("tenureInfo",
{}).get("content", [])
for d in ds:
if d["type"] == "annualServiceCharge":
matches = re.findall(r"([\d,.]+)", d["value"])
if len(matches):
# remove separators (e.g. 6,395.76)
match = matches[0].replace(",", "")
return float(match)
return None
@property
def development(self) -> bool:
# aka new home
try:
return self.detailobject["property"]["development"]
except Exception:
return False
@property
def isRemoved(self) -> bool:
return not self.detailobject["property"]["visible"]
@property
def status(self) -> str:
if self.isRemoved:
return 'removed'
status = self.detailobject["property"]["status"]
return status
@property
def agency(self) -> str:
return self.detailobject['property']["branch"]["brandName"]
@property
def councilTaxBand(self) -> str:
return self.detailobject['property']["councilTaxInfo"]["content"][0][
"value"]
def dict_nicely(self):
return {
"identifier":
self.identifier,
"sqm_ocr":
self.sqm_ocr,
"price":
self.price,
"price_per_sqm":
self.price_per_sqm,
"url":
self.url,
"bedrooms":
self.bedrooms,
"travel_time_fastest":
None if len(self.travel_time) == 0 else self.travel_time[0],
"travel_time_second":
None if len(self.travel_time) < 2 else self.travel_time[1],
"lease_left":
self.leaseLeft,
"service_charge":
self.serviceCharge,
"development":
self.development,
"tenure_type":
self.tenure_type,
"updated_days":
self.updateDaysAgo,
"status":
self.status,
"last_seen":
self.last_seen,
"agency":
self.agency,
"council_tax_band":
self.councilTaxBand,
}