from models.listing import DestinationMode, Route, RouteLegStep from repositories.listing_repository import ListingRepository from tqdm.asyncio import tqdm from rec import routing from models import Listing async def calculate_route( repository: ListingRepository, destination_address: str, travel_mode: routing.TravelMode, limit: int | None = None, ) -> None: listings = await repository.get_listings() if limit is not None: listings = listings[:limit] destimation_mode = DestinationMode(destination_address, travel_mode) updated_listings = await tqdm.gather( *[update_routing_info(listing, destimation_mode) for listing in listings], total=len(listings), desc="Updating routing info", ) await repository.upsert_listings( [listing for listing in updated_listings if listing is not None] ) async def update_routing_info( listing: Listing, destination_mode: DestinationMode ) -> Listing | None: if listing.routing_info.get(destination_mode) is not None: # already calculated, do not recompute to save API calls return None routes_data = routing.transit_route( listing.latitude, listing.longtitude, destination_mode.destination_address, destination_mode.travel_mode, ) route_data = routes_data["routes"][0] routes = [] for route_data in routes_data["routes"]: duration_s = int(route_data["duration"].split("s")[0]) route = Route( legs=[ RouteLegStep( distance_meters=step_data["distanceMeters"], duration_s=int(step_data["staticDuration"].split("s")[0]), travel_mode=routing.TravelMode(step_data["travelMode"]), ) for step_data in route_data["legs"][0]["steps"] ], distance_meters=route_data["distanceMeters"], duration_s=duration_s, ) routes.append(route) listing.routing_info_json = listing.serialize_routing_info( {**listing.routing_info, **{destination_mode: routes}} ) return listing # async def geocode_address( # address: str, # geocoding_cache: pathlib.Path, # ) -> tuple[int, int]: # cache = get_geocoding_cache(geocoding_cache) # cached_results = cache.get(address) # if cached_results is None: # # resolve # async with aiohttp.ClientSession() as session: # async with session.get( # ("https://maps.googleapis.com/maps/api/geocode/json" # f"?address={address}" # f"&key={API_KEY_ENVIRONMENT_VARIABLE}")) as response: # if response.status != 200: # raise Exception( # f"Error {response.status} from geocoding API") # cached_results = await response.json() # with open(geocoding_cache, 'w') as f: # json.dump({ # **{ # address: cached_results, # }, # **cache # }, f) # # API format # lat = cached_results["results"][0]["geometry"]["location"]["lat"] # lng = cached_results["results"][0]["geometry"]["location"]["lng"] # cache[address] = (lat, lng) # with open(geocoding_cache, 'w') as f: # json.dump(cache, f) # return lat, lng # def get_geocoding_cache(geocoding_cache: pathlib.Path) -> dict[str, Any]: # try: # with open(geocoding_cache, 'x') as f: # json.dump({}, f) # return {} # except FileExistsError: # pass # File already exists # with open(geocoding_cache, 'r') as f: # return json.load(f)