From 10ae25e0d37eb3bf3ff0363bcde3c561b08ee559 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Tue, 20 May 2025 21:58:08 +0000 Subject: [PATCH] allow caching routing by destination and travel mode; also export all travel methods to csv column --- crawler/5_routing.py | 61 +++++++++++++--- crawler/csv_exporter.py | 5 +- crawler/data_access.py | 150 +++++++++++++++++++++++++++++++++------- crawler/main.py | 18 +++-- crawler/rec/routing.py | 40 ----------- 5 files changed, 190 insertions(+), 84 deletions(-) diff --git a/crawler/5_routing.py b/crawler/5_routing.py index f9b9404..6226a53 100644 --- a/crawler/5_routing.py +++ b/crawler/5_routing.py @@ -3,29 +3,26 @@ from tqdm import tqdm from rec import routing -def calculate_route( +async def calculate_route( listing_paths: list[str], destination_address: str, travel_mode: routing.TravelMode, -): +) -> None: listings = Listing.get_all_listings(listing_paths) # reduce listings to everything within 7 miles filtered_listings = [] for listing in listings: + print(f'Processing {listing.identifier}') if listing.isRemoved: print(f"Removed-Skip: Skipping {listing.identifier} " "is already removed.") continue - if listing.path_routing_json().exists(): - print(f"Path-Skip: Skipping {listing.identifier} as path routing " - "already exists") - continue - if (listing.sqm_ocr is None or listing.sqm_ocr < 30 - or listing.sqm_ocr > 200): + sqm_ocr = await listing.sqm_ocr() + if (sqm_ocr is None or sqm_ocr < 30 or sqm_ocr > 200): print((f"Floorplan-Skip: Skipping {listing.identifier} as " - f"sqm_ocr is {listing.sqm_ocr}")) + f"sqm_ocr is {sqm_ocr}")) continue filtered_listings.append(listing) @@ -38,7 +35,49 @@ def calculate_route( travel_mode, recalculate=False, ) - traveltime = listing.travel_time[0] + traveltime = listing.travel_time(destination_address, travel_mode)[0] duration_minutes = traveltime["duration"] / 60.0 - tqdm.write(f"{listing.identifier} {duration_minutes}") + + +# async def geocode_address( +# address: str, +# geocoding_cache: pathlib.Path, +# ) -> tuple[int, int]: +# cache = get_geocoding_cache(geocoding_cache) +# cached_results = cache.get(address) +# if cached_results is None: +# # resolve +# async with aiohttp.ClientSession() as session: +# async with session.get( +# ("https://maps.googleapis.com/maps/api/geocode/json" +# f"?address={address}" +# f"&key={API_KEY_ENVIRONMENT_VARIABLE}")) as response: +# if response.status != 200: +# raise Exception( +# f"Error {response.status} from geocoding API") +# cached_results = await response.json() +# with open(geocoding_cache, 'w') as f: +# json.dump({ +# **{ +# address: cached_results, +# }, +# **cache +# }, f) +# # API format +# lat = cached_results["results"][0]["geometry"]["location"]["lat"] +# lng = cached_results["results"][0]["geometry"]["location"]["lng"] +# cache[address] = (lat, lng) +# with open(geocoding_cache, 'w') as f: +# json.dump(cache, f) +# return lat, lng + +# def get_geocoding_cache(geocoding_cache: pathlib.Path) -> dict[str, Any]: +# try: +# with open(geocoding_cache, 'x') as f: +# json.dump({}, f) +# return {} +# except FileExistsError: +# pass # File already exists +# with open(geocoding_cache, 'r') as f: +# return json.load(f) diff --git a/crawler/csv_exporter.py b/crawler/csv_exporter.py index 1c6d023..57130be 100644 --- a/crawler/csv_exporter.py +++ b/crawler/csv_exporter.py @@ -1,14 +1,15 @@ +import asyncio from pathlib import Path from data_access import Listing import pandas as pd -def export_to_csv( +async def export_to_csv( listings: list[Listing], output_file: Path, columns: list[str], ) -> None: - ds = [listing.dict_nicely() for listing in listings] + ds = await asyncio.gather(*[listing.dict_nicely() for listing in listings]) df = pd.DataFrame(ds) # read decisions on file decisions_path = 'data/decisions.json' diff --git a/crawler/data_access.py b/crawler/data_access.py index 7afe0c9..6de400c 100644 --- a/crawler/data_access.py +++ b/crawler/data_access.py @@ -1,4 +1,5 @@ import asyncio +from collections import defaultdict from dataclasses import dataclass import json import pathlib @@ -106,8 +107,8 @@ class Listing: def calculate_sqm_model(self): objs = [] for floorplan_path in self.list_floorplans(): - estimated_sqm, model_output, predictions = floorplan.calculate_model( - floorplan_path) + estimated_sqm, model_output, predictions = ( + floorplan.calculate_model(floorplan_path)) objs.append({ "floorplan_path": str(floorplan_path), "estimated_sqm": estimated_sqm, @@ -149,10 +150,9 @@ class Listing: with open(self.path_floorplan_ocr_json(), "w") as f: json.dump(objs, f) - @property - def sqm_ocr(self, recalculate=False): + async def sqm_ocr(self, recalculate=False): if not self.path_floorplan_ocr_json().exists() or recalculate: - self.calculate_sqm_ocr() + await self.calculate_sqm_ocr() with open(self.path_floorplan_ocr_json()) as f: objs = json.load(f) @@ -168,9 +168,12 @@ class Listing: def calculate_route(self, dest_address: str, travel_mode: routing.TravelMode, - recalculate=False): - if self.path_routing_json().exists() and not recalculate: - return + recalculate=False) -> dict[str, Any]: + routing_cache = self.__get_routing_cache() + cache_key = self.__routing_cache_key(dest_address, travel_mode) + if (route_cache := + routing_cache.get(cache_key)) is not None and not recalculate: + return {cache_key: route_cache} result = routing.transit_route( self.latitude, @@ -178,17 +181,22 @@ class Listing: dest_address, travel_mode, ) + if not result: + raise Exception((f"Error calculating route from {self.identifier} " + f"to '{dest_address}' by {travel_mode}")) + result = {**{cache_key: result}, **routing_cache} with open(self.path_routing_json(), "w") as f: json.dump(result, f) + return result - @property - def travel_time(self) -> List: - if not self.path_routing_json().exists(): - return [] - with open(self.path_routing_json()) as f: - d = json.load(f) - - return routing.extract_time(d) + def travel_time( + self, + destination_address: str, + travel_mode: routing.TravelMode, + ) -> list[dict[str, Any]]: + data = self.calculate_route(destination_address, travel_mode) + return self.__extract_travel_times(data, destination_address, + travel_mode) @property def url(self): @@ -215,11 +223,11 @@ class Listing: def tenure_type(self) -> str: return self.detailobject["property"]["tenureType"] - @property - def price_per_sqm(self) -> float: - if self.sqm_ocr is None or self.sqm_ocr == 0: + async def price_per_sqm(self) -> float: + sqm_ocr = await self.sqm_ocr() + if sqm_ocr is None or sqm_ocr == 0: return -1 - return self.price / self.sqm_ocr + return self.price / sqm_ocr @property def bedrooms(self) -> int: @@ -302,24 +310,44 @@ class Listing: return self.detailobject['property']["councilTaxInfo"]["content"][0][ "value"] - def dict_nicely(self): + async def dict_nicely(self): + travel_time_fastest = {} + travel_time_second = {} + if self.path_routing_json().exists(): + with open(self.path_routing_json(), "r") as f: + travel_times = json.load(f) + for destination_mode in travel_times.keys(): + destination_mode_clean = destination_mode.replace(" ", + "_").replace( + ",", "_") + destination, travel_mode = self.__from_routing_cache_key( + destination_mode) + travel_time_fastest[destination_mode_clean] = self.travel_time( + destination, travel_mode)[0]['duration'] + travel_time_second[destination_mode_clean] = self.travel_time( + destination, travel_mode)[1]['duration'] + return { "identifier": self.identifier, "sqm_ocr": - self.sqm_ocr, + await self.sqm_ocr(), "price": self.price, "price_per_sqm": - self.price_per_sqm, + await self.price_per_sqm(), "url": self.url, "bedrooms": self.bedrooms, "travel_time_fastest": - None if len(self.travel_time) == 0 else self.travel_time[0], + ":".join( + sorted(f'{dest} in {travel_mode//60}min' + for dest, travel_mode in travel_time_fastest.items())), "travel_time_second": - None if len(self.travel_time) < 2 else self.travel_time[1], + ":".join( + sorted(f'{dest} in {travel_mode//60}min' + for dest, travel_mode in travel_time_second.items())), "lease_left": self.leaseLeft, "service_charge": @@ -339,3 +367,75 @@ class Listing: "council_tax_band": self.councilTaxBand, } + + def __routing_cache_key( + self, + dest_address: str, + travel_mode: routing.TravelMode, + ) -> str: + return f"{dest_address} by {travel_mode}" + + def __from_routing_cache_key( + self, + cache_key: str, + ) -> tuple[str, routing.TravelMode]: + match = re.match(r"(.+) by (.+)", cache_key) + if not match: + raise ValueError(f"Invalid cache key: {cache_key}") + return match.group(1), routing.TravelMode[match.group(2)] + + def __extract_travel_times( + self, + routing_data: dict[str, Any], + destination_address: str, + travel_mode: routing.TravelMode, + limit: int = 2, + ) -> list[dict[str, Any]]: + res = [] + cache_key = self.__routing_cache_key(destination_address, travel_mode) + for route in routing_data[cache_key]["routes"]: + distance = route["distanceMeters"] + duration = int(route["duration"].strip("s")) + duration_static = int(route["staticDuration"].strip("s")) + + steps = route["legs"][0]["steps"] + initial_walk_duration = 0 + used_transit = False + duration_per_transit = defaultdict(lambda: 0) + distance_per_transit = defaultdict(lambda: 0) + number_of_transit_stops = 0 + + for step in steps: + if not used_transit and step["travelMode"] == "WALK": + initial_walk_duration += int( + step["staticDuration"].strip("s")) + else: + used_transit = True + duration_per_transit[step["travelMode"]] += int( + step["staticDuration"].strip("s")) + distance_per_transit[step["travelMode"]] += step.get( + "distanceMeters", 0) + if step["travelMode"] == "TRANSIT": + number_of_transit_stops += 1 + + res.append({ + "duration": duration, + "distance": distance, + "duration_static": duration_static, + "initial_walk_duration": initial_walk_duration, + "duration_per_transit": dict(duration_per_transit), + "distance_per_transit": dict(distance_per_transit), + "number_of_transit_stops": number_of_transit_stops, + }) + + return res[:limit] + + def __get_routing_cache(self) -> dict[str, Any]: + try: + with open(self.path_routing_json(), 'x') as f: + json.dump({}, f) + return {} + except FileExistsError: + pass + with open(self.path_routing_json(), 'r') as f: + return json.load(f) diff --git a/crawler/main.py b/crawler/main.py index c82a00e..7a24d83 100644 --- a/crawler/main.py +++ b/crawler/main.py @@ -1,4 +1,5 @@ import asyncio +import json import os import pathlib import click @@ -185,11 +186,14 @@ def routing(ctx: click.core.Context, destination_address: str, raise click.exceptions.MissingParameter( f'{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set. ' 'Please set it to your API key for the routing service.') - routing_module.calculate_route( - listing_paths, - destination_address, - TravelMode[travel_mode], - ) + + asyncio.run( + routing_module.calculate_route( + listing_paths, + destination_address, + # destination_address_coordinates, + TravelMode[travel_mode], + )) @cli.command() @@ -223,7 +227,9 @@ def export_csv(ctx: click.core.Context, output_file: str, columns: tuple[str]): output_file_path = pathlib.Path(output_file) listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json"))) listings = Listing.get_all_listings([str(path) for path in listing_paths]) - csv_exporter.export_to_csv(listings, output_file_path, list(columns)) + asyncio.run( + csv_exporter.export_to_csv(listings, output_file_path, + list(columns)), ) if __name__ == '__main__': diff --git a/crawler/rec/routing.py b/crawler/rec/routing.py index ca8e117..bafb8e3 100644 --- a/crawler/rec/routing.py +++ b/crawler/rec/routing.py @@ -3,7 +3,6 @@ import os from typing import Any import requests from rec.utils import nextMonday -from collections import defaultdict url = "https://routes.googleapis.com/directions/v2:computeRoutes" API_KEY_ENVIRONMENT_VARIABLE = "ROUTING_API_KEY" @@ -72,42 +71,3 @@ def transit_route( return r.json() raise Exception(r.json()) - - -def extract_time(d, limit: int = 2): - res = [] - for route in d["routes"]: - distance = route["distanceMeters"] - duration = int(route["duration"].strip("s")) - duration_static = int(route["staticDuration"].strip("s")) - - steps = route["legs"][0]["steps"] - initial_walk_duration = 0 - used_transit = False - duration_per_transit = defaultdict(lambda: 0) - distance_per_transit = defaultdict(lambda: 0) - number_of_transit_stops = 0 - - for step in steps: - if used_transit == False and step["travelMode"] == "WALK": - initial_walk_duration += int(step["staticDuration"].strip("s")) - else: - used_transit = True - duration_per_transit[step["travelMode"]] += int( - step["staticDuration"].strip("s")) - distance_per_transit[step["travelMode"]] += step.get( - "distanceMeters", 0) - if step["travelMode"] == "TRANSIT": - number_of_transit_stops += 1 - - res.append({ - "duration": duration, - "distance": distance, - "duration_static": duration_static, - "initial_walk_duration": initial_walk_duration, - "duration_per_transit": dict(duration_per_transit), - "distance_per_transit": dict(distance_per_transit), - "number_of_transit_stops": number_of_transit_stops, - }) - - return res[:limit]