- Fix silent log loss: replace hardcoded "uvicorn.error" logger with __name__ in osrm_client, otp_client, poi_distance_calculator, and poi_tasks (uvicorn logger has no handlers in Celery worker, so all errors were silently dropped) - Add Celery retry: autoretry_for=(Exception,), max_retries=3, retry_backoff - Add top-level exception handling in task with full traceback logging - Fix upsert_distances: replace session.merge() (PK-based) with proper dialect-aware INSERT ON DUPLICATE KEY UPDATE / ON CONFLICT DO UPDATE - Filter out listings with null/zero coordinates before routing - Raise OSError when all routing engines fail with 0 results computed, distinguishing "nothing to compute" from "all engines unreachable"
120 lines
3 KiB
Python
120 lines
3 KiB
Python
"""OpenTripPlanner 2.x GraphQL client for transit routing.
|
|
|
|
Uses OTP's GTFS GraphQL API to compute transit routes between points.
|
|
Since OTP has no matrix endpoint, individual requests are made with
|
|
concurrency controlled via asyncio.Semaphore.
|
|
"""
|
|
import logging
|
|
from dataclasses import dataclass
|
|
|
|
import aiohttp
|
|
|
|
from config.routing_config import RoutingConfig
|
|
from rec.utils import nextMonday
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# OTP 2.x GraphQL query for transit plan
|
|
_PLAN_QUERY = """
|
|
query Plan($fromLat: Float!, $fromLon: Float!, $toLat: Float!, $toLon: Float!, $dateTime: DateTime!) {
|
|
plan(
|
|
from: {lat: $fromLat, lon: $fromLon}
|
|
to: {lat: $toLat, lon: $toLon}
|
|
dateTime: $dateTime
|
|
transportModes: [{mode: TRANSIT}, {mode: WALK}]
|
|
numItineraries: 1
|
|
) {
|
|
itineraries {
|
|
duration
|
|
walkDistance
|
|
legs {
|
|
mode
|
|
duration
|
|
distance
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OTPResult:
|
|
"""Result of an OTP transit route calculation."""
|
|
duration_seconds: int
|
|
distance_meters: int
|
|
|
|
|
|
async def otp_transit_route(
|
|
origin_lat: float,
|
|
origin_lon: float,
|
|
dest_lat: float,
|
|
dest_lon: float,
|
|
config: RoutingConfig,
|
|
session: aiohttp.ClientSession | None = None,
|
|
) -> OTPResult | None:
|
|
"""Compute a transit route using OTP 2.x GraphQL API.
|
|
|
|
Uses next Monday 9AM as departure time for consistent results.
|
|
|
|
Args:
|
|
origin_lat: Origin latitude.
|
|
origin_lon: Origin longitude.
|
|
dest_lat: Destination latitude.
|
|
dest_lon: Destination longitude.
|
|
config: Routing configuration.
|
|
session: Optional aiohttp session.
|
|
|
|
Returns:
|
|
OTPResult or None if no transit route was found.
|
|
"""
|
|
url = f"{config.otp_url}/otp/gtfs/v1"
|
|
|
|
departure_time = nextMonday().isoformat()
|
|
|
|
payload = {
|
|
"query": _PLAN_QUERY,
|
|
"variables": {
|
|
"fromLat": origin_lat,
|
|
"fromLon": origin_lon,
|
|
"toLat": dest_lat,
|
|
"toLon": dest_lon,
|
|
"dateTime": departure_time,
|
|
},
|
|
}
|
|
|
|
should_close = session is None
|
|
if session is None:
|
|
session = aiohttp.ClientSession()
|
|
|
|
try:
|
|
async with session.post(url, json=payload) as resp:
|
|
if resp.status != 200:
|
|
logger.error(f"OTP returned {resp.status}: {await resp.text()}")
|
|
return None
|
|
|
|
data = await resp.json()
|
|
|
|
plan = data.get("data", {}).get("plan")
|
|
if not plan:
|
|
errors = data.get("errors")
|
|
if errors:
|
|
logger.warning(f"OTP GraphQL errors: {errors}")
|
|
return None
|
|
|
|
itineraries = plan.get("itineraries", [])
|
|
if not itineraries:
|
|
return None
|
|
|
|
best = itineraries[0]
|
|
total_distance = sum(
|
|
leg.get("distance", 0) for leg in best.get("legs", [])
|
|
)
|
|
|
|
return OTPResult(
|
|
duration_seconds=int(best["duration"]),
|
|
distance_meters=int(total_distance),
|
|
)
|
|
finally:
|
|
if should_close:
|
|
await session.close()
|