"""COL service — lookup + ratio computation + async cache+scrape orchestration. Sync path (Phase 1 — used by simulator's `_resolve_col_adjustment`): compute_col_ratio(baseline, target) → in-process BASELINES lookup. Fast, no DB roundtrip, no I/O. Async path (Phase 2 — used by refresh CronJob and on-demand fetch): lookup_city_cached(slug, sess) → cache → scrape → upsert. Reconciles Numbeo (primary) + Expatistan (secondary) into a single CityCostIndex per city. Cache TTL 1 year. The simulator deliberately stays on the sync path: it needs sub-ms latency per request and doesn't tolerate transient scraper failures. The async path keeps the cache fresh in the background. """ from __future__ import annotations import logging from decimal import Decimal from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.col import cache as col_cache from fire_planner.col.baseline import BASELINES from fire_planner.col.expatistan import ExpatistanFetchError, ExpatistanScraper from fire_planner.col.models import CityCostIndex from fire_planner.col.numbeo import NumbeoFetchError, NumbeoScraper log = logging.getLogger(__name__) # Each jurisdiction has a single canonical city we anchor on. Picked # to match where most users would live (capital or main expat hub) — # Cyprus → Limassol (the largest expat city), not Nicosia (capital); # UAE → Dubai (the expat economy), not Abu Dhabi. JURISDICTION_REPRESENTATIVE_CITY: dict[str, str] = { "uk": "london", "cyprus": "limassol", "bulgaria": "sofia", "uae": "dubai", "malaysia": "kuala-lumpur", "thailand": "bangkok", # "nomad" is intentionally absent — nomad mode is COL-invariant # because the user is on the road. The caller should skip auto-adjust # when jurisdiction='nomad' and provide a manual spending_gbp. } def lookup_city(city_slug: str) -> CityCostIndex: """Return the cached CityCostIndex for `city_slug`. Raises `KeyError` for unknown cities — the caller decides whether to fall back to baseline or raise to the user. """ normalised = city_slug.strip().lower().replace(" ", "-") try: return BASELINES[normalised] except KeyError as e: raise KeyError( f"No COL baseline for city {city_slug!r}; available: " f"{sorted(BASELINES)}" ) from e def compute_col_ratio(baseline_city: str, target_city: str) -> Decimal: """Ratio `target_total / baseline_total` — the multiplier to apply to a spending figure denominated in `baseline_city` to convert it to local prices in `target_city`. Identity case (same city) returns exactly `Decimal("1")`. Both anchors use the "single person, total with rent" headline — rent is the largest single category and varies most across cities, so excluding it would understate the actual spread. """ if baseline_city == target_city: return Decimal("1") baseline = lookup_city(baseline_city) target = lookup_city(target_city) return target.total_monthly_gbp / baseline.total_monthly_gbp def representative_city_for(jurisdiction: str) -> str | None: """Return the canonical city for a jurisdiction, or None for 'nomad' / unknown jurisdictions where auto-adjust should be skipped.""" return JURISDICTION_REPRESENTATIVE_CITY.get(jurisdiction) # Source-precedence weight when reconciling multiple snapshots — higher # beats lower. Numbeo has the largest contributor base; Expatistan is # a fast-decay cross-check; baseline is the hand-curated fallback. _SOURCE_WEIGHT: dict[str, int] = {"numbeo": 3, "expatistan": 2, "baseline": 1} def reconcile_sources(rows: list[CityCostIndex]) -> CityCostIndex | None: """Pick the canonical CityCostIndex from multiple per-source rows. Today's policy: pick the row with the highest source weight. When weights tie, prefer the most-recent `snapshot_date`. The simulator is cross-checked against the alternates' headline numbers — when they diverge >25%, the cache layer logs a warning so we can audit Numbeo/Expatistan drift over time. """ if not rows: return None sorted_rows = sorted( rows, key=lambda r: (_SOURCE_WEIGHT.get(r.source.name, 0), r.source.snapshot_date), reverse=True, ) chosen = sorted_rows[0] if len(sorted_rows) > 1: primary_total = chosen.total_single_with_rent_gbp for alt in sorted_rows[1:]: divergence = abs(alt.total_single_with_rent_gbp - primary_total) / primary_total if divergence > Decimal("0.25"): log.warning( "col reconcile %s: %s=%s diverges >%s%% from %s=%s", chosen.city_slug, alt.source.name, alt.total_single_with_rent_gbp, int(divergence * 100), chosen.source.name, primary_total, ) return chosen async def lookup_city_cached( sess: AsyncSession, city_slug: str, *, country: str = "", ) -> CityCostIndex: """Cache → scrape → fallback. Async; used by refresh CronJob and any future on-demand fetch path. Returns a CityCostIndex regardless of failure modes — falls back to baseline.BASELINES on scraper failure rather than raising. The only way this raises is if the city has no baseline AND every scraper fails (KeyError). """ cached = await col_cache.read_fresh(sess, city_slug) if cached is not None: return cached # Cache miss or expired — try live sources. fetched: list[CityCostIndex] = [] try: async with NumbeoScraper() as scraper: fetched.append(await scraper.fetch(city_slug, country=country)) except NumbeoFetchError as e: log.warning("numbeo fetch failed for %s: %s", city_slug, e) try: async with ExpatistanScraper() as scraper: fetched.append(await scraper.fetch(city_slug, country=country)) except ExpatistanFetchError as e: log.warning("expatistan fetch failed for %s: %s", city_slug, e) chosen = reconcile_sources(fetched) if chosen is not None: for row in fetched: await col_cache.upsert(sess, row) return chosen # Both scrapers failed — fall back to in-process baseline. if city_slug in BASELINES: baseline = BASELINES[city_slug] await col_cache.upsert(sess, baseline) return baseline raise KeyError( f"COL lookup failed for {city_slug!r}: cache empty, scrapers failed, " f"no baseline" )