"""OSRM HTTP client for walk and cycle routing. Uses the OSRM /table endpoint for efficient batch distance/duration matrix calculations, and /route for single-pair fallback. """ import logging from dataclasses import dataclass import aiohttp from config.routing_config import RoutingConfig logger = logging.getLogger("uvicorn.error") @dataclass(frozen=True) class OSRMResult: """Result of an OSRM route calculation.""" duration_seconds: int distance_meters: int async def osrm_table( origins: list[tuple[float, float]], destinations: list[tuple[float, float]], profile: str, config: RoutingConfig, session: aiohttp.ClientSession | None = None, ) -> list[list[OSRMResult | None]]: """Compute an NxM duration/distance matrix using OSRM /table endpoint. Args: origins: List of (longitude, latitude) pairs for sources. destinations: List of (longitude, latitude) pairs for destinations. profile: OSRM profile ("foot" or "bicycle"). config: Routing configuration. session: Optional aiohttp session for connection reuse. Returns: NxM matrix where result[i][j] is the route from origins[i] to destinations[j], or None if no route was found. """ base_url = config.get_osrm_url(profile) # Build coordinates string: origins first, then destinations all_coords = origins + destinations coords_str = ";".join(f"{lng},{lat}" for lng, lat in all_coords) # Source/destination indices source_indices = ";".join(str(i) for i in range(len(origins))) dest_indices = ";".join(str(i) for i in range(len(origins), len(all_coords))) url = ( f"{base_url}/table/v1/{profile}/{coords_str}" f"?sources={source_indices}" f"&destinations={dest_indices}" f"&annotations=duration,distance" ) should_close = session is None if session is None: session = aiohttp.ClientSession() try: async with session.get(url) as resp: if resp.status != 200: logger.error(f"OSRM /table returned {resp.status}: {await resp.text()}") return [[None] * len(destinations) for _ in origins] data = await resp.json() if data.get("code") != "Ok": logger.error(f"OSRM /table error: {data.get('message', data.get('code'))}") return [[None] * len(destinations) for _ in origins] durations = data["durations"] distances = data["distances"] results: list[list[OSRMResult | None]] = [] for i in range(len(origins)): row: list[OSRMResult | None] = [] for j in range(len(destinations)): dur = durations[i][j] dist = distances[i][j] if dur is None or dist is None: row.append(None) else: row.append(OSRMResult( duration_seconds=int(dur), distance_meters=int(dist), )) results.append(row) return results finally: if should_close: await session.close() async def osrm_route( origin: tuple[float, float], destination: tuple[float, float], profile: str, config: RoutingConfig, session: aiohttp.ClientSession | None = None, ) -> OSRMResult | None: """Compute a single route using OSRM /route endpoint. Args: origin: (longitude, latitude) of the source. destination: (longitude, latitude) of the destination. profile: OSRM profile ("foot" or "bicycle"). config: Routing configuration. session: Optional aiohttp session. Returns: OSRMResult or None if no route was found. """ base_url = config.get_osrm_url(profile) coords_str = f"{origin[0]},{origin[1]};{destination[0]},{destination[1]}" url = f"{base_url}/route/v1/{profile}/{coords_str}?overview=false" should_close = session is None if session is None: session = aiohttp.ClientSession() try: async with session.get(url) as resp: if resp.status != 200: return None data = await resp.json() if data.get("code") != "Ok" or not data.get("routes"): return None route = data["routes"][0] return OSRMResult( duration_seconds=int(route["duration"]), distance_meters=int(route["distance"]), ) finally: if should_close: await session.close()