wrongmove/rec/osrm_client.py

144 lines
4.4 KiB
Python
Raw Normal View History

"""OSRM HTTP client for walk and cycle routing.
Uses the OSRM /table endpoint for efficient batch distance/duration matrix
calculations, and /route for single-pair fallback.
"""
import logging
from dataclasses import dataclass
import aiohttp
from config.routing_config import RoutingConfig
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class OSRMResult:
"""Result of an OSRM route calculation."""
duration_seconds: int
distance_meters: int
async def osrm_table(
origins: list[tuple[float, float]],
destinations: list[tuple[float, float]],
profile: str,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> list[list[OSRMResult | None]]:
"""Compute an NxM duration/distance matrix using OSRM /table endpoint.
Args:
origins: List of (longitude, latitude) pairs for sources.
destinations: List of (longitude, latitude) pairs for destinations.
profile: OSRM profile ("foot" or "bicycle").
config: Routing configuration.
session: Optional aiohttp session for connection reuse.
Returns:
NxM matrix where result[i][j] is the route from origins[i] to destinations[j],
or None if no route was found.
"""
base_url = config.get_osrm_url(profile)
# Build coordinates string: origins first, then destinations
all_coords = origins + destinations
coords_str = ";".join(f"{lng},{lat}" for lng, lat in all_coords)
# Source/destination indices
source_indices = ";".join(str(i) for i in range(len(origins)))
dest_indices = ";".join(str(i) for i in range(len(origins), len(all_coords)))
url = (
f"{base_url}/table/v1/{profile}/{coords_str}"
f"?sources={source_indices}"
f"&destinations={dest_indices}"
f"&annotations=duration,distance"
)
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.get(url) as resp:
if resp.status != 200:
logger.error(f"OSRM /table returned {resp.status}: {await resp.text()}")
return [[None] * len(destinations) for _ in origins]
data = await resp.json()
if data.get("code") != "Ok":
logger.error(f"OSRM /table error: {data.get('message', data.get('code'))}")
return [[None] * len(destinations) for _ in origins]
durations = data["durations"]
distances = data["distances"]
results: list[list[OSRMResult | None]] = []
for i in range(len(origins)):
row: list[OSRMResult | None] = []
for j in range(len(destinations)):
dur = durations[i][j]
dist = distances[i][j]
if dur is None or dist is None:
row.append(None)
else:
row.append(OSRMResult(
duration_seconds=int(dur),
distance_meters=int(dist),
))
results.append(row)
return results
finally:
if should_close:
await session.close()
async def osrm_route(
origin: tuple[float, float],
destination: tuple[float, float],
profile: str,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> OSRMResult | None:
"""Compute a single route using OSRM /route endpoint.
Args:
origin: (longitude, latitude) of the source.
destination: (longitude, latitude) of the destination.
profile: OSRM profile ("foot" or "bicycle").
config: Routing configuration.
session: Optional aiohttp session.
Returns:
OSRMResult or None if no route was found.
"""
base_url = config.get_osrm_url(profile)
coords_str = f"{origin[0]},{origin[1]};{destination[0]},{destination[1]}"
url = f"{base_url}/route/v1/{profile}/{coords_str}?overview=false"
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.get(url) as resp:
if resp.status != 200:
return None
data = await resp.json()
if data.get("code") != "Ok" or not data.get("routes"):
return None
route = data["routes"][0]
return OSRMResult(
duration_seconds=int(route["duration"]),
distance_meters=int(route["distance"]),
)
finally:
if should_close:
await session.close()