Add self-hosted routing clients and distance calculator

RoutingConfig loads OSRM/OTP URLs from env vars. OSRM client uses the
/table endpoint for batch NxM distance matrices (walk/cycle). OTP client
uses GraphQL API for transit routes. POI distance calculator orchestrates
both, skipping already-computed distances and reporting progress.
This commit is contained in:
Viktor Barzin 2026-02-08 13:14:37 +00:00
parent 8a31e5449c
commit da0a56895d
No known key found for this signature in database
GPG key ID: 0EB088298288D958
4 changed files with 538 additions and 0 deletions

52
config/routing_config.py Normal file
View file

@ -0,0 +1,52 @@
"""Routing engine configuration with environment variable loading."""
from __future__ import annotations
import os
from dataclasses import dataclass
from typing import Self
@dataclass(frozen=True)
class RoutingConfig:
"""Configuration for self-hosted routing engines (OSRM + OTP).
Attributes:
osrm_foot_url: URL for OSRM walking profile instance.
osrm_bicycle_url: URL for OSRM cycling profile instance.
otp_url: URL for OpenTripPlanner instance.
osrm_batch_size: Number of origins per OSRM /table request.
otp_max_concurrent: Max concurrent OTP requests.
"""
osrm_foot_url: str = "http://osrm-foot:5000"
osrm_bicycle_url: str = "http://osrm-bicycle:5000"
otp_url: str = "http://otp:8080"
osrm_batch_size: int = 50
otp_max_concurrent: int = 10
@classmethod
def from_env(cls) -> Self:
"""Load configuration from environment variables.
Environment variables:
OSRM_FOOT_URL: OSRM walking instance URL (default: http://osrm-foot:5000)
OSRM_BICYCLE_URL: OSRM cycling instance URL (default: http://osrm-bicycle:5000)
OTP_URL: OpenTripPlanner URL (default: http://otp:8080)
OSRM_BATCH_SIZE: Origins per /table request (default: 50)
OTP_MAX_CONCURRENT: Max concurrent OTP requests (default: 10)
"""
return cls(
osrm_foot_url=os.environ.get("OSRM_FOOT_URL", "http://osrm-foot:5000"),
osrm_bicycle_url=os.environ.get("OSRM_BICYCLE_URL", "http://osrm-bicycle:5000"),
otp_url=os.environ.get("OTP_URL", "http://otp:8080"),
osrm_batch_size=int(os.environ.get("OSRM_BATCH_SIZE", "50")),
otp_max_concurrent=int(os.environ.get("OTP_MAX_CONCURRENT", "10")),
)
def get_osrm_url(self, profile: str) -> str:
"""Get the OSRM URL for a given profile."""
if profile == "foot":
return self.osrm_foot_url
elif profile == "bicycle":
return self.osrm_bicycle_url
raise ValueError(f"Unknown OSRM profile: {profile}")

143
rec/osrm_client.py Normal file
View file

@ -0,0 +1,143 @@
"""OSRM HTTP client for walk and cycle routing.
Uses the OSRM /table endpoint for efficient batch distance/duration matrix
calculations, and /route for single-pair fallback.
"""
import logging
from dataclasses import dataclass
import aiohttp
from config.routing_config import RoutingConfig
logger = logging.getLogger("uvicorn.error")
@dataclass(frozen=True)
class OSRMResult:
"""Result of an OSRM route calculation."""
duration_seconds: int
distance_meters: int
async def osrm_table(
origins: list[tuple[float, float]],
destinations: list[tuple[float, float]],
profile: str,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> list[list[OSRMResult | None]]:
"""Compute an NxM duration/distance matrix using OSRM /table endpoint.
Args:
origins: List of (longitude, latitude) pairs for sources.
destinations: List of (longitude, latitude) pairs for destinations.
profile: OSRM profile ("foot" or "bicycle").
config: Routing configuration.
session: Optional aiohttp session for connection reuse.
Returns:
NxM matrix where result[i][j] is the route from origins[i] to destinations[j],
or None if no route was found.
"""
base_url = config.get_osrm_url(profile)
# Build coordinates string: origins first, then destinations
all_coords = origins + destinations
coords_str = ";".join(f"{lng},{lat}" for lng, lat in all_coords)
# Source/destination indices
source_indices = ",".join(str(i) for i in range(len(origins)))
dest_indices = ",".join(str(i) for i in range(len(origins), len(all_coords)))
url = (
f"{base_url}/table/v1/{profile}/{coords_str}"
f"?sources={source_indices}"
f"&destinations={dest_indices}"
f"&annotations=duration,distance"
)
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.get(url) as resp:
if resp.status != 200:
logger.error(f"OSRM /table returned {resp.status}: {await resp.text()}")
return [[None] * len(destinations) for _ in origins]
data = await resp.json()
if data.get("code") != "Ok":
logger.error(f"OSRM /table error: {data.get('message', data.get('code'))}")
return [[None] * len(destinations) for _ in origins]
durations = data["durations"]
distances = data["distances"]
results: list[list[OSRMResult | None]] = []
for i in range(len(origins)):
row: list[OSRMResult | None] = []
for j in range(len(destinations)):
dur = durations[i][j]
dist = distances[i][j]
if dur is None or dist is None:
row.append(None)
else:
row.append(OSRMResult(
duration_seconds=int(dur),
distance_meters=int(dist),
))
results.append(row)
return results
finally:
if should_close:
await session.close()
async def osrm_route(
origin: tuple[float, float],
destination: tuple[float, float],
profile: str,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> OSRMResult | None:
"""Compute a single route using OSRM /route endpoint.
Args:
origin: (longitude, latitude) of the source.
destination: (longitude, latitude) of the destination.
profile: OSRM profile ("foot" or "bicycle").
config: Routing configuration.
session: Optional aiohttp session.
Returns:
OSRMResult or None if no route was found.
"""
base_url = config.get_osrm_url(profile)
coords_str = f"{origin[0]},{origin[1]};{destination[0]},{destination[1]}"
url = f"{base_url}/route/v1/{profile}/{coords_str}?overview=false"
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.get(url) as resp:
if resp.status != 200:
return None
data = await resp.json()
if data.get("code") != "Ok" or not data.get("routes"):
return None
route = data["routes"][0]
return OSRMResult(
duration_seconds=int(route["duration"]),
distance_meters=int(route["distance"]),
)
finally:
if should_close:
await session.close()

120
rec/otp_client.py Normal file
View file

@ -0,0 +1,120 @@
"""OpenTripPlanner 2.x GraphQL client for transit routing.
Uses OTP's GTFS GraphQL API to compute transit routes between points.
Since OTP has no matrix endpoint, individual requests are made with
concurrency controlled via asyncio.Semaphore.
"""
import logging
from dataclasses import dataclass
import aiohttp
from config.routing_config import RoutingConfig
from rec.utils import nextMonday
logger = logging.getLogger("uvicorn.error")
# OTP 2.x GraphQL query for transit plan
_PLAN_QUERY = """
query Plan($fromLat: Float!, $fromLon: Float!, $toLat: Float!, $toLon: Float!, $dateTime: DateTime!) {
plan(
from: {lat: $fromLat, lon: $fromLon}
to: {lat: $toLat, lon: $toLon}
dateTime: $dateTime
transportModes: [{mode: TRANSIT}, {mode: WALK}]
numItineraries: 1
) {
itineraries {
duration
walkDistance
legs {
mode
duration
distance
}
}
}
}
"""
@dataclass(frozen=True)
class OTPResult:
"""Result of an OTP transit route calculation."""
duration_seconds: int
distance_meters: int
async def otp_transit_route(
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> OTPResult | None:
"""Compute a transit route using OTP 2.x GraphQL API.
Uses next Monday 9AM as departure time for consistent results.
Args:
origin_lat: Origin latitude.
origin_lon: Origin longitude.
dest_lat: Destination latitude.
dest_lon: Destination longitude.
config: Routing configuration.
session: Optional aiohttp session.
Returns:
OTPResult or None if no transit route was found.
"""
url = f"{config.otp_url}/otp/gtfs/v1"
departure_time = nextMonday().isoformat()
payload = {
"query": _PLAN_QUERY,
"variables": {
"fromLat": origin_lat,
"fromLon": origin_lon,
"toLat": dest_lat,
"toLon": dest_lon,
"dateTime": departure_time,
},
}
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
logger.error(f"OTP returned {resp.status}: {await resp.text()}")
return None
data = await resp.json()
plan = data.get("data", {}).get("plan")
if not plan:
errors = data.get("errors")
if errors:
logger.warning(f"OTP GraphQL errors: {errors}")
return None
itineraries = plan.get("itineraries", [])
if not itineraries:
return None
best = itineraries[0]
total_distance = sum(
leg.get("distance", 0) for leg in best.get("legs", [])
)
return OTPResult(
duration_seconds=int(best["duration"]),
distance_meters=int(total_distance),
)
finally:
if should_close:
await session.close()

View file

@ -0,0 +1,223 @@
"""POI distance calculator - orchestrates OSRM and OTP for batch distance computation."""
import asyncio
import logging
from datetime import datetime
from typing import Callable
import aiohttp
from config.routing_config import RoutingConfig
from models.listing import BuyListing, ListingType, RentListing
from models.poi import PointOfInterest
from models.poi_distance import POIDistance
from rec.osrm_client import osrm_table
from rec.otp_client import otp_transit_route
from repositories.listing_repository import ListingRepository
from repositories.poi_repository import POIRepository
logger = logging.getLogger("uvicorn.error")
# Map travel mode names to OSRM profiles
_OSRM_PROFILES = {
"WALK": "foot",
"BICYCLE": "bicycle",
}
async def calculate_poi_distances(
listing_repo: ListingRepository,
poi_repo: POIRepository,
poi: PointOfInterest,
travel_modes: list[str],
listing_type: ListingType,
listing_ids: list[int] | None = None,
config: RoutingConfig | None = None,
on_progress: Callable[[int, int, str], None] | None = None,
) -> int:
"""Calculate distances from listings to a POI for given travel modes.
Args:
listing_repo: Repository for listing access.
poi_repo: Repository for POI and distance storage.
poi: The point of interest to calculate distances to.
travel_modes: List of travel modes (WALK, BICYCLE, TRANSIT).
listing_type: BUY or RENT.
listing_ids: Optional subset of listing IDs. If None, uses all listings.
config: Routing engine configuration.
on_progress: Callback(completed, total, message) for progress updates.
Returns:
Total number of distances computed.
"""
if config is None:
config = RoutingConfig.from_env()
# Load listings with coordinates
model = RentListing if listing_type == ListingType.RENT else BuyListing
listings = await listing_repo.get_listings(
only_ids=listing_ids,
query_parameters=None,
)
if not listings:
logger.info("No listings found for distance calculation")
return 0
total_computed = 0
total_modes = len(travel_modes)
for mode_idx, mode in enumerate(travel_modes):
mode_upper = mode.upper()
# Skip listings that already have computed distances
existing = poi_repo.get_existing_distance_keys(
poi.id, mode_upper, listing_type # type: ignore[arg-type]
)
pending_listings = [l for l in listings if l.id not in existing]
if not pending_listings:
logger.info(f"All listings already computed for {mode_upper}")
if on_progress:
on_progress(
total_computed, len(listings) * total_modes,
f"Skipped {mode_upper} (already computed)"
)
continue
logger.info(
f"Computing {mode_upper} distances for {len(pending_listings)} listings "
f"(skipped {len(existing)} already computed)"
)
if mode_upper in _OSRM_PROFILES:
computed = await _compute_osrm(
pending_listings, poi, mode_upper, listing_type,
config, poi_repo, on_progress,
total_computed, len(listings) * total_modes,
)
elif mode_upper == "TRANSIT":
computed = await _compute_transit(
pending_listings, poi, listing_type,
config, poi_repo, on_progress,
total_computed, len(listings) * total_modes,
)
else:
logger.warning(f"Unknown travel mode: {mode_upper}")
continue
total_computed += computed
return total_computed
async def _compute_osrm(
listings: list,
poi: PointOfInterest,
mode: str,
listing_type: ListingType,
config: RoutingConfig,
poi_repo: POIRepository,
on_progress: Callable[[int, int, str], None] | None,
progress_offset: int,
progress_total: int,
) -> int:
"""Compute distances using OSRM /table API in batches."""
profile = _OSRM_PROFILES[mode]
destination = [(poi.longitude, poi.latitude)]
batch_size = config.osrm_batch_size
computed = 0
async with aiohttp.ClientSession() as session:
for batch_start in range(0, len(listings), batch_size):
batch = listings[batch_start:batch_start + batch_size]
origins = [(l.longitude, l.latitude) for l in batch]
results = await osrm_table(
origins, destination, profile, config, session
)
distances_to_save: list[POIDistance] = []
for i, listing in enumerate(batch):
result = results[i][0] if results[i] else None
if result is not None:
distances_to_save.append(POIDistance(
listing_id=listing.id,
listing_type=listing_type,
poi_id=poi.id, # type: ignore[arg-type]
travel_mode=mode,
duration_seconds=result.duration_seconds,
distance_meters=result.distance_meters,
computed_at=datetime.utcnow(),
)) # type: ignore[call-arg]
if distances_to_save:
poi_repo.upsert_distances(distances_to_save)
computed += len(distances_to_save)
if on_progress:
on_progress(
progress_offset + computed, progress_total,
f"{mode}: {computed}/{len(listings)}"
)
return computed
async def _compute_transit(
listings: list,
poi: PointOfInterest,
listing_type: ListingType,
config: RoutingConfig,
poi_repo: POIRepository,
on_progress: Callable[[int, int, str], None] | None,
progress_offset: int,
progress_total: int,
) -> int:
"""Compute transit distances using OTP with concurrency control."""
semaphore = asyncio.Semaphore(config.otp_max_concurrent)
computed = 0
batch_results: list[POIDistance] = []
save_interval = 50 # Save every N results
async with aiohttp.ClientSession() as session:
async def compute_one(listing: object) -> POIDistance | None:
async with semaphore:
result = await otp_transit_route(
listing.latitude, listing.longitude, # type: ignore[union-attr]
poi.latitude, poi.longitude,
config, session,
)
if result is None:
return None
return POIDistance(
listing_id=listing.id, # type: ignore[union-attr]
listing_type=listing_type,
poi_id=poi.id, # type: ignore[arg-type]
travel_mode="TRANSIT",
duration_seconds=result.duration_seconds,
distance_meters=result.distance_meters,
computed_at=datetime.utcnow(),
) # type: ignore[call-arg]
tasks = [compute_one(listing) for listing in listings]
for coro in asyncio.as_completed(tasks):
result = await coro
if result is not None:
batch_results.append(result)
computed += 1
# Periodically save results
if len(batch_results) >= save_interval:
poi_repo.upsert_distances(batch_results)
batch_results = []
if on_progress:
on_progress(
progress_offset + computed, progress_total,
f"TRANSIT: {computed}/{len(listings)}"
)
# Save remaining results
if batch_results:
poi_repo.upsert_distances(batch_results)
return computed