"""OpenTripPlanner 2.x GraphQL client for transit routing. Uses OTP's GTFS GraphQL API to compute transit routes between points. Since OTP has no matrix endpoint, individual requests are made with concurrency controlled via asyncio.Semaphore. """ import logging from dataclasses import dataclass import aiohttp from config.routing_config import RoutingConfig from rec.utils import nextMonday logger = logging.getLogger(__name__) # OTP 2.x GraphQL query for transit plan _PLAN_QUERY = """ query Plan($fromLat: Float!, $fromLon: Float!, $toLat: Float!, $toLon: Float!, $dateTime: DateTime!) { plan( from: {lat: $fromLat, lon: $fromLon} to: {lat: $toLat, lon: $toLon} dateTime: $dateTime transportModes: [{mode: TRANSIT}, {mode: WALK}] numItineraries: 1 ) { itineraries { duration walkDistance legs { mode duration distance } } } } """ @dataclass(frozen=True) class OTPResult: """Result of an OTP transit route calculation.""" duration_seconds: int distance_meters: int async def otp_transit_route( origin_lat: float, origin_lon: float, dest_lat: float, dest_lon: float, config: RoutingConfig, session: aiohttp.ClientSession | None = None, ) -> OTPResult | None: """Compute a transit route using OTP 2.x GraphQL API. Uses next Monday 9AM as departure time for consistent results. Args: origin_lat: Origin latitude. origin_lon: Origin longitude. dest_lat: Destination latitude. dest_lon: Destination longitude. config: Routing configuration. session: Optional aiohttp session. Returns: OTPResult or None if no transit route was found. """ url = f"{config.otp_url}/otp/gtfs/v1" departure_time = nextMonday().isoformat() payload = { "query": _PLAN_QUERY, "variables": { "fromLat": origin_lat, "fromLon": origin_lon, "toLat": dest_lat, "toLon": dest_lon, "dateTime": departure_time, }, } should_close = session is None if session is None: session = aiohttp.ClientSession() try: async with session.post(url, json=payload) as resp: if resp.status != 200: logger.error(f"OTP returned {resp.status}: {await resp.text()}") return None data = await resp.json() plan = data.get("data", {}).get("plan") if not plan: errors = data.get("errors") if errors: logger.warning(f"OTP GraphQL errors: {errors}") return None itineraries = plan.get("itineraries", []) if not itineraries: return None best = itineraries[0] total_distance = sum( leg.get("distance", 0) for leg in best.get("legs", []) ) return OTPResult( duration_seconds=int(best["duration"]), distance_meters=int(total_distance), ) finally: if should_close: await session.close()