diff --git a/config/routing_config.py b/config/routing_config.py new file mode 100644 index 0000000..79a00e6 --- /dev/null +++ b/config/routing_config.py @@ -0,0 +1,52 @@ +"""Routing engine configuration with environment variable loading.""" +from __future__ import annotations + +import os +from dataclasses import dataclass +from typing import Self + + +@dataclass(frozen=True) +class RoutingConfig: + """Configuration for self-hosted routing engines (OSRM + OTP). + + Attributes: + osrm_foot_url: URL for OSRM walking profile instance. + osrm_bicycle_url: URL for OSRM cycling profile instance. + otp_url: URL for OpenTripPlanner instance. + osrm_batch_size: Number of origins per OSRM /table request. + otp_max_concurrent: Max concurrent OTP requests. + """ + + osrm_foot_url: str = "http://osrm-foot:5000" + osrm_bicycle_url: str = "http://osrm-bicycle:5000" + otp_url: str = "http://otp:8080" + osrm_batch_size: int = 50 + otp_max_concurrent: int = 10 + + @classmethod + def from_env(cls) -> Self: + """Load configuration from environment variables. + + Environment variables: + OSRM_FOOT_URL: OSRM walking instance URL (default: http://osrm-foot:5000) + OSRM_BICYCLE_URL: OSRM cycling instance URL (default: http://osrm-bicycle:5000) + OTP_URL: OpenTripPlanner URL (default: http://otp:8080) + OSRM_BATCH_SIZE: Origins per /table request (default: 50) + OTP_MAX_CONCURRENT: Max concurrent OTP requests (default: 10) + """ + return cls( + osrm_foot_url=os.environ.get("OSRM_FOOT_URL", "http://osrm-foot:5000"), + osrm_bicycle_url=os.environ.get("OSRM_BICYCLE_URL", "http://osrm-bicycle:5000"), + otp_url=os.environ.get("OTP_URL", "http://otp:8080"), + osrm_batch_size=int(os.environ.get("OSRM_BATCH_SIZE", "50")), + otp_max_concurrent=int(os.environ.get("OTP_MAX_CONCURRENT", "10")), + ) + + def get_osrm_url(self, profile: str) -> str: + """Get the OSRM URL for a given profile.""" + if profile == "foot": + return self.osrm_foot_url + elif profile == "bicycle": + return self.osrm_bicycle_url + raise ValueError(f"Unknown OSRM profile: {profile}") diff --git a/rec/osrm_client.py b/rec/osrm_client.py new file mode 100644 index 0000000..14359b9 --- /dev/null +++ b/rec/osrm_client.py @@ -0,0 +1,143 @@ +"""OSRM HTTP client for walk and cycle routing. + +Uses the OSRM /table endpoint for efficient batch distance/duration matrix +calculations, and /route for single-pair fallback. +""" +import logging +from dataclasses import dataclass + +import aiohttp + +from config.routing_config import RoutingConfig + +logger = logging.getLogger("uvicorn.error") + + +@dataclass(frozen=True) +class OSRMResult: + """Result of an OSRM route calculation.""" + duration_seconds: int + distance_meters: int + + +async def osrm_table( + origins: list[tuple[float, float]], + destinations: list[tuple[float, float]], + profile: str, + config: RoutingConfig, + session: aiohttp.ClientSession | None = None, +) -> list[list[OSRMResult | None]]: + """Compute an NxM duration/distance matrix using OSRM /table endpoint. + + Args: + origins: List of (longitude, latitude) pairs for sources. + destinations: List of (longitude, latitude) pairs for destinations. + profile: OSRM profile ("foot" or "bicycle"). + config: Routing configuration. + session: Optional aiohttp session for connection reuse. + + Returns: + NxM matrix where result[i][j] is the route from origins[i] to destinations[j], + or None if no route was found. + """ + base_url = config.get_osrm_url(profile) + + # Build coordinates string: origins first, then destinations + all_coords = origins + destinations + coords_str = ";".join(f"{lng},{lat}" for lng, lat in all_coords) + + # Source/destination indices + source_indices = ",".join(str(i) for i in range(len(origins))) + dest_indices = ",".join(str(i) for i in range(len(origins), len(all_coords))) + + url = ( + f"{base_url}/table/v1/{profile}/{coords_str}" + f"?sources={source_indices}" + f"&destinations={dest_indices}" + f"&annotations=duration,distance" + ) + + should_close = session is None + if session is None: + session = aiohttp.ClientSession() + + try: + async with session.get(url) as resp: + if resp.status != 200: + logger.error(f"OSRM /table returned {resp.status}: {await resp.text()}") + return [[None] * len(destinations) for _ in origins] + + data = await resp.json() + + if data.get("code") != "Ok": + logger.error(f"OSRM /table error: {data.get('message', data.get('code'))}") + return [[None] * len(destinations) for _ in origins] + + durations = data["durations"] + distances = data["distances"] + + results: list[list[OSRMResult | None]] = [] + for i in range(len(origins)): + row: list[OSRMResult | None] = [] + for j in range(len(destinations)): + dur = durations[i][j] + dist = distances[i][j] + if dur is None or dist is None: + row.append(None) + else: + row.append(OSRMResult( + duration_seconds=int(dur), + distance_meters=int(dist), + )) + results.append(row) + + return results + finally: + if should_close: + await session.close() + + +async def osrm_route( + origin: tuple[float, float], + destination: tuple[float, float], + profile: str, + config: RoutingConfig, + session: aiohttp.ClientSession | None = None, +) -> OSRMResult | None: + """Compute a single route using OSRM /route endpoint. + + Args: + origin: (longitude, latitude) of the source. + destination: (longitude, latitude) of the destination. + profile: OSRM profile ("foot" or "bicycle"). + config: Routing configuration. + session: Optional aiohttp session. + + Returns: + OSRMResult or None if no route was found. + """ + base_url = config.get_osrm_url(profile) + coords_str = f"{origin[0]},{origin[1]};{destination[0]},{destination[1]}" + url = f"{base_url}/route/v1/{profile}/{coords_str}?overview=false" + + should_close = session is None + if session is None: + session = aiohttp.ClientSession() + + try: + async with session.get(url) as resp: + if resp.status != 200: + return None + data = await resp.json() + + if data.get("code") != "Ok" or not data.get("routes"): + return None + + route = data["routes"][0] + return OSRMResult( + duration_seconds=int(route["duration"]), + distance_meters=int(route["distance"]), + ) + finally: + if should_close: + await session.close() diff --git a/rec/otp_client.py b/rec/otp_client.py new file mode 100644 index 0000000..1b9c165 --- /dev/null +++ b/rec/otp_client.py @@ -0,0 +1,120 @@ +"""OpenTripPlanner 2.x GraphQL client for transit routing. + +Uses OTP's GTFS GraphQL API to compute transit routes between points. +Since OTP has no matrix endpoint, individual requests are made with +concurrency controlled via asyncio.Semaphore. +""" +import logging +from dataclasses import dataclass + +import aiohttp + +from config.routing_config import RoutingConfig +from rec.utils import nextMonday + +logger = logging.getLogger("uvicorn.error") + +# OTP 2.x GraphQL query for transit plan +_PLAN_QUERY = """ +query Plan($fromLat: Float!, $fromLon: Float!, $toLat: Float!, $toLon: Float!, $dateTime: DateTime!) { + plan( + from: {lat: $fromLat, lon: $fromLon} + to: {lat: $toLat, lon: $toLon} + dateTime: $dateTime + transportModes: [{mode: TRANSIT}, {mode: WALK}] + numItineraries: 1 + ) { + itineraries { + duration + walkDistance + legs { + mode + duration + distance + } + } + } +} +""" + + +@dataclass(frozen=True) +class OTPResult: + """Result of an OTP transit route calculation.""" + duration_seconds: int + distance_meters: int + + +async def otp_transit_route( + origin_lat: float, + origin_lon: float, + dest_lat: float, + dest_lon: float, + config: RoutingConfig, + session: aiohttp.ClientSession | None = None, +) -> OTPResult | None: + """Compute a transit route using OTP 2.x GraphQL API. + + Uses next Monday 9AM as departure time for consistent results. + + Args: + origin_lat: Origin latitude. + origin_lon: Origin longitude. + dest_lat: Destination latitude. + dest_lon: Destination longitude. + config: Routing configuration. + session: Optional aiohttp session. + + Returns: + OTPResult or None if no transit route was found. + """ + url = f"{config.otp_url}/otp/gtfs/v1" + + departure_time = nextMonday().isoformat() + + payload = { + "query": _PLAN_QUERY, + "variables": { + "fromLat": origin_lat, + "fromLon": origin_lon, + "toLat": dest_lat, + "toLon": dest_lon, + "dateTime": departure_time, + }, + } + + should_close = session is None + if session is None: + session = aiohttp.ClientSession() + + try: + async with session.post(url, json=payload) as resp: + if resp.status != 200: + logger.error(f"OTP returned {resp.status}: {await resp.text()}") + return None + + data = await resp.json() + + plan = data.get("data", {}).get("plan") + if not plan: + errors = data.get("errors") + if errors: + logger.warning(f"OTP GraphQL errors: {errors}") + return None + + itineraries = plan.get("itineraries", []) + if not itineraries: + return None + + best = itineraries[0] + total_distance = sum( + leg.get("distance", 0) for leg in best.get("legs", []) + ) + + return OTPResult( + duration_seconds=int(best["duration"]), + distance_meters=int(total_distance), + ) + finally: + if should_close: + await session.close() diff --git a/services/poi_distance_calculator.py b/services/poi_distance_calculator.py new file mode 100644 index 0000000..6cfc669 --- /dev/null +++ b/services/poi_distance_calculator.py @@ -0,0 +1,223 @@ +"""POI distance calculator - orchestrates OSRM and OTP for batch distance computation.""" +import asyncio +import logging +from datetime import datetime +from typing import Callable + +import aiohttp + +from config.routing_config import RoutingConfig +from models.listing import BuyListing, ListingType, RentListing +from models.poi import PointOfInterest +from models.poi_distance import POIDistance +from rec.osrm_client import osrm_table +from rec.otp_client import otp_transit_route +from repositories.listing_repository import ListingRepository +from repositories.poi_repository import POIRepository + +logger = logging.getLogger("uvicorn.error") + +# Map travel mode names to OSRM profiles +_OSRM_PROFILES = { + "WALK": "foot", + "BICYCLE": "bicycle", +} + + +async def calculate_poi_distances( + listing_repo: ListingRepository, + poi_repo: POIRepository, + poi: PointOfInterest, + travel_modes: list[str], + listing_type: ListingType, + listing_ids: list[int] | None = None, + config: RoutingConfig | None = None, + on_progress: Callable[[int, int, str], None] | None = None, +) -> int: + """Calculate distances from listings to a POI for given travel modes. + + Args: + listing_repo: Repository for listing access. + poi_repo: Repository for POI and distance storage. + poi: The point of interest to calculate distances to. + travel_modes: List of travel modes (WALK, BICYCLE, TRANSIT). + listing_type: BUY or RENT. + listing_ids: Optional subset of listing IDs. If None, uses all listings. + config: Routing engine configuration. + on_progress: Callback(completed, total, message) for progress updates. + + Returns: + Total number of distances computed. + """ + if config is None: + config = RoutingConfig.from_env() + + # Load listings with coordinates + model = RentListing if listing_type == ListingType.RENT else BuyListing + listings = await listing_repo.get_listings( + only_ids=listing_ids, + query_parameters=None, + ) + if not listings: + logger.info("No listings found for distance calculation") + return 0 + + total_computed = 0 + total_modes = len(travel_modes) + + for mode_idx, mode in enumerate(travel_modes): + mode_upper = mode.upper() + + # Skip listings that already have computed distances + existing = poi_repo.get_existing_distance_keys( + poi.id, mode_upper, listing_type # type: ignore[arg-type] + ) + pending_listings = [l for l in listings if l.id not in existing] + + if not pending_listings: + logger.info(f"All listings already computed for {mode_upper}") + if on_progress: + on_progress( + total_computed, len(listings) * total_modes, + f"Skipped {mode_upper} (already computed)" + ) + continue + + logger.info( + f"Computing {mode_upper} distances for {len(pending_listings)} listings " + f"(skipped {len(existing)} already computed)" + ) + + if mode_upper in _OSRM_PROFILES: + computed = await _compute_osrm( + pending_listings, poi, mode_upper, listing_type, + config, poi_repo, on_progress, + total_computed, len(listings) * total_modes, + ) + elif mode_upper == "TRANSIT": + computed = await _compute_transit( + pending_listings, poi, listing_type, + config, poi_repo, on_progress, + total_computed, len(listings) * total_modes, + ) + else: + logger.warning(f"Unknown travel mode: {mode_upper}") + continue + + total_computed += computed + + return total_computed + + +async def _compute_osrm( + listings: list, + poi: PointOfInterest, + mode: str, + listing_type: ListingType, + config: RoutingConfig, + poi_repo: POIRepository, + on_progress: Callable[[int, int, str], None] | None, + progress_offset: int, + progress_total: int, +) -> int: + """Compute distances using OSRM /table API in batches.""" + profile = _OSRM_PROFILES[mode] + destination = [(poi.longitude, poi.latitude)] + batch_size = config.osrm_batch_size + computed = 0 + + async with aiohttp.ClientSession() as session: + for batch_start in range(0, len(listings), batch_size): + batch = listings[batch_start:batch_start + batch_size] + origins = [(l.longitude, l.latitude) for l in batch] + + results = await osrm_table( + origins, destination, profile, config, session + ) + + distances_to_save: list[POIDistance] = [] + for i, listing in enumerate(batch): + result = results[i][0] if results[i] else None + if result is not None: + distances_to_save.append(POIDistance( + listing_id=listing.id, + listing_type=listing_type, + poi_id=poi.id, # type: ignore[arg-type] + travel_mode=mode, + duration_seconds=result.duration_seconds, + distance_meters=result.distance_meters, + computed_at=datetime.utcnow(), + )) # type: ignore[call-arg] + + if distances_to_save: + poi_repo.upsert_distances(distances_to_save) + computed += len(distances_to_save) + + if on_progress: + on_progress( + progress_offset + computed, progress_total, + f"{mode}: {computed}/{len(listings)}" + ) + + return computed + + +async def _compute_transit( + listings: list, + poi: PointOfInterest, + listing_type: ListingType, + config: RoutingConfig, + poi_repo: POIRepository, + on_progress: Callable[[int, int, str], None] | None, + progress_offset: int, + progress_total: int, +) -> int: + """Compute transit distances using OTP with concurrency control.""" + semaphore = asyncio.Semaphore(config.otp_max_concurrent) + computed = 0 + batch_results: list[POIDistance] = [] + save_interval = 50 # Save every N results + + async with aiohttp.ClientSession() as session: + async def compute_one(listing: object) -> POIDistance | None: + async with semaphore: + result = await otp_transit_route( + listing.latitude, listing.longitude, # type: ignore[union-attr] + poi.latitude, poi.longitude, + config, session, + ) + if result is None: + return None + return POIDistance( + listing_id=listing.id, # type: ignore[union-attr] + listing_type=listing_type, + poi_id=poi.id, # type: ignore[arg-type] + travel_mode="TRANSIT", + duration_seconds=result.duration_seconds, + distance_meters=result.distance_meters, + computed_at=datetime.utcnow(), + ) # type: ignore[call-arg] + + tasks = [compute_one(listing) for listing in listings] + for coro in asyncio.as_completed(tasks): + result = await coro + if result is not None: + batch_results.append(result) + computed += 1 + + # Periodically save results + if len(batch_results) >= save_interval: + poi_repo.upsert_distances(batch_results) + batch_results = [] + + if on_progress: + on_progress( + progress_offset + computed, progress_total, + f"TRANSIT: {computed}/{len(listings)}" + ) + + # Save remaining results + if batch_results: + poi_repo.upsert_distances(batch_results) + + return computed