wrongmove/rec/otp_client.py

121 lines
3 KiB
Python
Raw Normal View History

"""OpenTripPlanner 2.x GraphQL client for transit routing.
Uses OTP's GTFS GraphQL API to compute transit routes between points.
Since OTP has no matrix endpoint, individual requests are made with
concurrency controlled via asyncio.Semaphore.
"""
import logging
from dataclasses import dataclass
import aiohttp
from config.routing_config import RoutingConfig
from rec.utils import nextMonday
logger = logging.getLogger("uvicorn.error")
# OTP 2.x GraphQL query for transit plan
_PLAN_QUERY = """
query Plan($fromLat: Float!, $fromLon: Float!, $toLat: Float!, $toLon: Float!, $dateTime: DateTime!) {
plan(
from: {lat: $fromLat, lon: $fromLon}
to: {lat: $toLat, lon: $toLon}
dateTime: $dateTime
transportModes: [{mode: TRANSIT}, {mode: WALK}]
numItineraries: 1
) {
itineraries {
duration
walkDistance
legs {
mode
duration
distance
}
}
}
}
"""
@dataclass(frozen=True)
class OTPResult:
"""Result of an OTP transit route calculation."""
duration_seconds: int
distance_meters: int
async def otp_transit_route(
origin_lat: float,
origin_lon: float,
dest_lat: float,
dest_lon: float,
config: RoutingConfig,
session: aiohttp.ClientSession | None = None,
) -> OTPResult | None:
"""Compute a transit route using OTP 2.x GraphQL API.
Uses next Monday 9AM as departure time for consistent results.
Args:
origin_lat: Origin latitude.
origin_lon: Origin longitude.
dest_lat: Destination latitude.
dest_lon: Destination longitude.
config: Routing configuration.
session: Optional aiohttp session.
Returns:
OTPResult or None if no transit route was found.
"""
url = f"{config.otp_url}/otp/gtfs/v1"
departure_time = nextMonday().isoformat()
payload = {
"query": _PLAN_QUERY,
"variables": {
"fromLat": origin_lat,
"fromLon": origin_lon,
"toLat": dest_lat,
"toLon": dest_lon,
"dateTime": departure_time,
},
}
should_close = session is None
if session is None:
session = aiohttp.ClientSession()
try:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
logger.error(f"OTP returned {resp.status}: {await resp.text()}")
return None
data = await resp.json()
plan = data.get("data", {}).get("plan")
if not plan:
errors = data.get("errors")
if errors:
logger.warning(f"OTP GraphQL errors: {errors}")
return None
itineraries = plan.get("itineraries", [])
if not itineraries:
return None
best = itineraries[0]
total_distance = sum(
leg.get("distance", 0) for leg in best.get("legs", [])
)
return OTPResult(
duration_seconds=int(best["duration"]),
distance_meters=int(total_distance),
)
finally:
if should_close:
await session.close()