allow caching routing by destination and travel mode; also export all travel methods to csv column

This commit is contained in:
Viktor Barzin 2025-05-20 21:58:08 +00:00
parent f2118c9bc4
commit 10ae25e0d3
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
5 changed files with 190 additions and 84 deletions

View file

@ -3,29 +3,26 @@ from tqdm import tqdm
from rec import routing from rec import routing
def calculate_route( async def calculate_route(
listing_paths: list[str], listing_paths: list[str],
destination_address: str, destination_address: str,
travel_mode: routing.TravelMode, travel_mode: routing.TravelMode,
): ) -> None:
listings = Listing.get_all_listings(listing_paths) listings = Listing.get_all_listings(listing_paths)
# reduce listings to everything within 7 miles # reduce listings to everything within 7 miles
filtered_listings = [] filtered_listings = []
for listing in listings: for listing in listings:
print(f'Processing {listing.identifier}')
if listing.isRemoved: if listing.isRemoved:
print(f"Removed-Skip: Skipping {listing.identifier} " print(f"Removed-Skip: Skipping {listing.identifier} "
"is already removed.") "is already removed.")
continue continue
if listing.path_routing_json().exists(): sqm_ocr = await listing.sqm_ocr()
print(f"Path-Skip: Skipping {listing.identifier} as path routing " if (sqm_ocr is None or sqm_ocr < 30 or sqm_ocr > 200):
"already exists")
continue
if (listing.sqm_ocr is None or listing.sqm_ocr < 30
or listing.sqm_ocr > 200):
print((f"Floorplan-Skip: Skipping {listing.identifier} as " print((f"Floorplan-Skip: Skipping {listing.identifier} as "
f"sqm_ocr is {listing.sqm_ocr}")) f"sqm_ocr is {sqm_ocr}"))
continue continue
filtered_listings.append(listing) filtered_listings.append(listing)
@ -38,7 +35,49 @@ def calculate_route(
travel_mode, travel_mode,
recalculate=False, recalculate=False,
) )
traveltime = listing.travel_time[0] traveltime = listing.travel_time(destination_address, travel_mode)[0]
duration_minutes = traveltime["duration"] / 60.0 duration_minutes = traveltime["duration"] / 60.0
tqdm.write(f"{listing.identifier} {duration_minutes}") tqdm.write(f"{listing.identifier} {duration_minutes}")
# async def geocode_address(
# address: str,
# geocoding_cache: pathlib.Path,
# ) -> tuple[int, int]:
# cache = get_geocoding_cache(geocoding_cache)
# cached_results = cache.get(address)
# if cached_results is None:
# # resolve
# async with aiohttp.ClientSession() as session:
# async with session.get(
# ("https://maps.googleapis.com/maps/api/geocode/json"
# f"?address={address}"
# f"&key={API_KEY_ENVIRONMENT_VARIABLE}")) as response:
# if response.status != 200:
# raise Exception(
# f"Error {response.status} from geocoding API")
# cached_results = await response.json()
# with open(geocoding_cache, 'w') as f:
# json.dump({
# **{
# address: cached_results,
# },
# **cache
# }, f)
# # API format
# lat = cached_results["results"][0]["geometry"]["location"]["lat"]
# lng = cached_results["results"][0]["geometry"]["location"]["lng"]
# cache[address] = (lat, lng)
# with open(geocoding_cache, 'w') as f:
# json.dump(cache, f)
# return lat, lng
# def get_geocoding_cache(geocoding_cache: pathlib.Path) -> dict[str, Any]:
# try:
# with open(geocoding_cache, 'x') as f:
# json.dump({}, f)
# return {}
# except FileExistsError:
# pass # File already exists
# with open(geocoding_cache, 'r') as f:
# return json.load(f)

View file

@ -1,14 +1,15 @@
import asyncio
from pathlib import Path from pathlib import Path
from data_access import Listing from data_access import Listing
import pandas as pd import pandas as pd
def export_to_csv( async def export_to_csv(
listings: list[Listing], listings: list[Listing],
output_file: Path, output_file: Path,
columns: list[str], columns: list[str],
) -> None: ) -> None:
ds = [listing.dict_nicely() for listing in listings] ds = await asyncio.gather(*[listing.dict_nicely() for listing in listings])
df = pd.DataFrame(ds) df = pd.DataFrame(ds)
# read decisions on file # read decisions on file
decisions_path = 'data/decisions.json' decisions_path = 'data/decisions.json'

View file

@ -1,4 +1,5 @@
import asyncio import asyncio
from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
import json import json
import pathlib import pathlib
@ -106,8 +107,8 @@ class Listing:
def calculate_sqm_model(self): def calculate_sqm_model(self):
objs = [] objs = []
for floorplan_path in self.list_floorplans(): for floorplan_path in self.list_floorplans():
estimated_sqm, model_output, predictions = floorplan.calculate_model( estimated_sqm, model_output, predictions = (
floorplan_path) floorplan.calculate_model(floorplan_path))
objs.append({ objs.append({
"floorplan_path": str(floorplan_path), "floorplan_path": str(floorplan_path),
"estimated_sqm": estimated_sqm, "estimated_sqm": estimated_sqm,
@ -149,10 +150,9 @@ class Listing:
with open(self.path_floorplan_ocr_json(), "w") as f: with open(self.path_floorplan_ocr_json(), "w") as f:
json.dump(objs, f) json.dump(objs, f)
@property async def sqm_ocr(self, recalculate=False):
def sqm_ocr(self, recalculate=False):
if not self.path_floorplan_ocr_json().exists() or recalculate: if not self.path_floorplan_ocr_json().exists() or recalculate:
self.calculate_sqm_ocr() await self.calculate_sqm_ocr()
with open(self.path_floorplan_ocr_json()) as f: with open(self.path_floorplan_ocr_json()) as f:
objs = json.load(f) objs = json.load(f)
@ -168,9 +168,12 @@ class Listing:
def calculate_route(self, def calculate_route(self,
dest_address: str, dest_address: str,
travel_mode: routing.TravelMode, travel_mode: routing.TravelMode,
recalculate=False): recalculate=False) -> dict[str, Any]:
if self.path_routing_json().exists() and not recalculate: routing_cache = self.__get_routing_cache()
return cache_key = self.__routing_cache_key(dest_address, travel_mode)
if (route_cache :=
routing_cache.get(cache_key)) is not None and not recalculate:
return {cache_key: route_cache}
result = routing.transit_route( result = routing.transit_route(
self.latitude, self.latitude,
@ -178,17 +181,22 @@ class Listing:
dest_address, dest_address,
travel_mode, travel_mode,
) )
if not result:
raise Exception((f"Error calculating route from {self.identifier} "
f"to '{dest_address}' by {travel_mode}"))
result = {**{cache_key: result}, **routing_cache}
with open(self.path_routing_json(), "w") as f: with open(self.path_routing_json(), "w") as f:
json.dump(result, f) json.dump(result, f)
return result
@property def travel_time(
def travel_time(self) -> List: self,
if not self.path_routing_json().exists(): destination_address: str,
return [] travel_mode: routing.TravelMode,
with open(self.path_routing_json()) as f: ) -> list[dict[str, Any]]:
d = json.load(f) data = self.calculate_route(destination_address, travel_mode)
return self.__extract_travel_times(data, destination_address,
return routing.extract_time(d) travel_mode)
@property @property
def url(self): def url(self):
@ -215,11 +223,11 @@ class Listing:
def tenure_type(self) -> str: def tenure_type(self) -> str:
return self.detailobject["property"]["tenureType"] return self.detailobject["property"]["tenureType"]
@property async def price_per_sqm(self) -> float:
def price_per_sqm(self) -> float: sqm_ocr = await self.sqm_ocr()
if self.sqm_ocr is None or self.sqm_ocr == 0: if sqm_ocr is None or sqm_ocr == 0:
return -1 return -1
return self.price / self.sqm_ocr return self.price / sqm_ocr
@property @property
def bedrooms(self) -> int: def bedrooms(self) -> int:
@ -302,24 +310,44 @@ class Listing:
return self.detailobject['property']["councilTaxInfo"]["content"][0][ return self.detailobject['property']["councilTaxInfo"]["content"][0][
"value"] "value"]
def dict_nicely(self): async def dict_nicely(self):
travel_time_fastest = {}
travel_time_second = {}
if self.path_routing_json().exists():
with open(self.path_routing_json(), "r") as f:
travel_times = json.load(f)
for destination_mode in travel_times.keys():
destination_mode_clean = destination_mode.replace(" ",
"_").replace(
",", "_")
destination, travel_mode = self.__from_routing_cache_key(
destination_mode)
travel_time_fastest[destination_mode_clean] = self.travel_time(
destination, travel_mode)[0]['duration']
travel_time_second[destination_mode_clean] = self.travel_time(
destination, travel_mode)[1]['duration']
return { return {
"identifier": "identifier":
self.identifier, self.identifier,
"sqm_ocr": "sqm_ocr":
self.sqm_ocr, await self.sqm_ocr(),
"price": "price":
self.price, self.price,
"price_per_sqm": "price_per_sqm":
self.price_per_sqm, await self.price_per_sqm(),
"url": "url":
self.url, self.url,
"bedrooms": "bedrooms":
self.bedrooms, self.bedrooms,
"travel_time_fastest": "travel_time_fastest":
None if len(self.travel_time) == 0 else self.travel_time[0], ":".join(
sorted(f'{dest} in {travel_mode//60}min'
for dest, travel_mode in travel_time_fastest.items())),
"travel_time_second": "travel_time_second":
None if len(self.travel_time) < 2 else self.travel_time[1], ":".join(
sorted(f'{dest} in {travel_mode//60}min'
for dest, travel_mode in travel_time_second.items())),
"lease_left": "lease_left":
self.leaseLeft, self.leaseLeft,
"service_charge": "service_charge":
@ -339,3 +367,75 @@ class Listing:
"council_tax_band": "council_tax_band":
self.councilTaxBand, self.councilTaxBand,
} }
def __routing_cache_key(
self,
dest_address: str,
travel_mode: routing.TravelMode,
) -> str:
return f"{dest_address} by {travel_mode}"
def __from_routing_cache_key(
self,
cache_key: str,
) -> tuple[str, routing.TravelMode]:
match = re.match(r"(.+) by (.+)", cache_key)
if not match:
raise ValueError(f"Invalid cache key: {cache_key}")
return match.group(1), routing.TravelMode[match.group(2)]
def __extract_travel_times(
self,
routing_data: dict[str, Any],
destination_address: str,
travel_mode: routing.TravelMode,
limit: int = 2,
) -> list[dict[str, Any]]:
res = []
cache_key = self.__routing_cache_key(destination_address, travel_mode)
for route in routing_data[cache_key]["routes"]:
distance = route["distanceMeters"]
duration = int(route["duration"].strip("s"))
duration_static = int(route["staticDuration"].strip("s"))
steps = route["legs"][0]["steps"]
initial_walk_duration = 0
used_transit = False
duration_per_transit = defaultdict(lambda: 0)
distance_per_transit = defaultdict(lambda: 0)
number_of_transit_stops = 0
for step in steps:
if not used_transit and step["travelMode"] == "WALK":
initial_walk_duration += int(
step["staticDuration"].strip("s"))
else:
used_transit = True
duration_per_transit[step["travelMode"]] += int(
step["staticDuration"].strip("s"))
distance_per_transit[step["travelMode"]] += step.get(
"distanceMeters", 0)
if step["travelMode"] == "TRANSIT":
number_of_transit_stops += 1
res.append({
"duration": duration,
"distance": distance,
"duration_static": duration_static,
"initial_walk_duration": initial_walk_duration,
"duration_per_transit": dict(duration_per_transit),
"distance_per_transit": dict(distance_per_transit),
"number_of_transit_stops": number_of_transit_stops,
})
return res[:limit]
def __get_routing_cache(self) -> dict[str, Any]:
try:
with open(self.path_routing_json(), 'x') as f:
json.dump({}, f)
return {}
except FileExistsError:
pass
with open(self.path_routing_json(), 'r') as f:
return json.load(f)

View file

@ -1,4 +1,5 @@
import asyncio import asyncio
import json
import os import os
import pathlib import pathlib
import click import click
@ -185,11 +186,14 @@ def routing(ctx: click.core.Context, destination_address: str,
raise click.exceptions.MissingParameter( raise click.exceptions.MissingParameter(
f'{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set. ' f'{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set. '
'Please set it to your API key for the routing service.') 'Please set it to your API key for the routing service.')
routing_module.calculate_route(
listing_paths, asyncio.run(
destination_address, routing_module.calculate_route(
TravelMode[travel_mode], listing_paths,
) destination_address,
# destination_address_coordinates,
TravelMode[travel_mode],
))
@cli.command() @cli.command()
@ -223,7 +227,9 @@ def export_csv(ctx: click.core.Context, output_file: str, columns: tuple[str]):
output_file_path = pathlib.Path(output_file) output_file_path = pathlib.Path(output_file)
listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json"))) listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json")))
listings = Listing.get_all_listings([str(path) for path in listing_paths]) listings = Listing.get_all_listings([str(path) for path in listing_paths])
csv_exporter.export_to_csv(listings, output_file_path, list(columns)) asyncio.run(
csv_exporter.export_to_csv(listings, output_file_path,
list(columns)), )
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -3,7 +3,6 @@ import os
from typing import Any from typing import Any
import requests import requests
from rec.utils import nextMonday from rec.utils import nextMonday
from collections import defaultdict
url = "https://routes.googleapis.com/directions/v2:computeRoutes" url = "https://routes.googleapis.com/directions/v2:computeRoutes"
API_KEY_ENVIRONMENT_VARIABLE = "ROUTING_API_KEY" API_KEY_ENVIRONMENT_VARIABLE = "ROUTING_API_KEY"
@ -72,42 +71,3 @@ def transit_route(
return r.json() return r.json()
raise Exception(r.json()) raise Exception(r.json())
def extract_time(d, limit: int = 2):
res = []
for route in d["routes"]:
distance = route["distanceMeters"]
duration = int(route["duration"].strip("s"))
duration_static = int(route["staticDuration"].strip("s"))
steps = route["legs"][0]["steps"]
initial_walk_duration = 0
used_transit = False
duration_per_transit = defaultdict(lambda: 0)
distance_per_transit = defaultdict(lambda: 0)
number_of_transit_stops = 0
for step in steps:
if used_transit == False and step["travelMode"] == "WALK":
initial_walk_duration += int(step["staticDuration"].strip("s"))
else:
used_transit = True
duration_per_transit[step["travelMode"]] += int(
step["staticDuration"].strip("s"))
distance_per_transit[step["travelMode"]] += step.get(
"distanceMeters", 0)
if step["travelMode"] == "TRANSIT":
number_of_transit_stops += 1
res.append({
"duration": duration,
"distance": distance,
"duration_static": duration_static,
"initial_walk_duration": initial_walk_duration,
"duration_per_transit": dict(duration_per_transit),
"distance_per_transit": dict(distance_per_transit),
"number_of_transit_stops": number_of_transit_stops,
})
return res[:limit]