* adding ruff auto check for pull requests as well as fixing all ruff errors * More ruff fixes: forgot half of the ruff checks Forgot to do a git add all :D --------- Co-authored-by: Kadir <git@k8n.dev>
107 lines
3.7 KiB
Python
107 lines
3.7 KiB
Python
from models.listing import DestinationMode, Route, RouteLegStep
|
|
from repositories.listing_repository import ListingRepository
|
|
from tqdm.asyncio import tqdm
|
|
from rec import routing
|
|
from models import Listing
|
|
|
|
|
|
async def calculate_route(
|
|
repository: ListingRepository,
|
|
destination_address: str,
|
|
travel_mode: routing.TravelMode,
|
|
limit: int | None = None,
|
|
) -> None:
|
|
listings = await repository.get_listings()
|
|
|
|
if limit is not None:
|
|
listings = listings[:limit]
|
|
|
|
destimation_mode = DestinationMode(destination_address, travel_mode)
|
|
updated_listings = await tqdm.gather(
|
|
*[update_routing_info(listing, destimation_mode) for listing in listings],
|
|
total=len(listings),
|
|
desc="Updating routing info",
|
|
)
|
|
await repository.upsert_listings(
|
|
[listing for listing in updated_listings if listing is not None]
|
|
)
|
|
|
|
|
|
async def update_routing_info(
|
|
listing: Listing, destination_mode: DestinationMode
|
|
) -> Listing | None:
|
|
if listing.routing_info.get(destination_mode) is not None:
|
|
# already calculated, do not recompute to save API calls
|
|
return None
|
|
|
|
routes_data = routing.transit_route(
|
|
listing.latitude,
|
|
listing.longitude,
|
|
destination_mode.destination_address,
|
|
destination_mode.travel_mode,
|
|
)
|
|
|
|
route_data = routes_data["routes"][0]
|
|
routes = []
|
|
for route_data in routes_data["routes"]:
|
|
duration_s = int(route_data["duration"].split("s")[0])
|
|
route = Route(
|
|
legs=[
|
|
RouteLegStep(
|
|
distance_meters=step_data["distanceMeters"],
|
|
duration_s=int(step_data["staticDuration"].split("s")[0]),
|
|
travel_mode=routing.TravelMode(step_data["travelMode"]),
|
|
)
|
|
for step_data in route_data["legs"][0]["steps"]
|
|
],
|
|
distance_meters=route_data["distanceMeters"],
|
|
duration_s=duration_s,
|
|
)
|
|
routes.append(route)
|
|
listing.routing_info_json = listing.serialize_routing_info(
|
|
{**listing.routing_info, **{destination_mode: routes}}
|
|
)
|
|
return listing
|
|
|
|
|
|
# async def geocode_address(
|
|
# address: str,
|
|
# geocoding_cache: pathlib.Path,
|
|
# ) -> tuple[int, int]:
|
|
# cache = get_geocoding_cache(geocoding_cache)
|
|
# cached_results = cache.get(address)
|
|
# if cached_results is None:
|
|
# # resolve
|
|
# async with aiohttp.ClientSession() as session:
|
|
# async with session.get(
|
|
# ("https://maps.googleapis.com/maps/api/geocode/json"
|
|
# f"?address={address}"
|
|
# f"&key={API_KEY_ENVIRONMENT_VARIABLE}")) as response:
|
|
# if response.status != 200:
|
|
# raise Exception(
|
|
# f"Error {response.status} from geocoding API")
|
|
# cached_results = await response.json()
|
|
# with open(geocoding_cache, 'w') as f:
|
|
# json.dump({
|
|
# **{
|
|
# address: cached_results,
|
|
# },
|
|
# **cache
|
|
# }, f)
|
|
# # API format
|
|
# lat = cached_results["results"][0]["geometry"]["location"]["lat"]
|
|
# lng = cached_results["results"][0]["geometry"]["location"]["lng"]
|
|
# cache[address] = (lat, lng)
|
|
# with open(geocoding_cache, 'w') as f:
|
|
# json.dump(cache, f)
|
|
# return lat, lng
|
|
|
|
# def get_geocoding_cache(geocoding_cache: pathlib.Path) -> dict[str, Any]:
|
|
# try:
|
|
# with open(geocoding_cache, 'x') as f:
|
|
# json.dump({}, f)
|
|
# return {}
|
|
# except FileExistsError:
|
|
# pass # File already exists
|
|
# with open(geocoding_cache, 'r') as f:
|
|
# return json.load(f)
|