fire-planner/fire_planner/col/service.py
Viktor Barzin e72fd22a17 col: simulator auto-adjusts spending to local prices via Numbeo+Expatistan
The Monte Carlo used to compare jurisdictions at a flat London-equivalent
spend, which silently overstated the cost-of-living for any move to a
cheaper region. Now every cross-jurisdiction simulation auto-scales
spending_gbp by the real Numbeo/Expatistan ratio between the user's
baseline city and the target city.

Architecture:
- fire_planner/col/baseline.py — 22 cities with headline Numbeo data
  (source URLs + snapshot dates embedded) — fallback when scraper fails
- col/numbeo.py + col/expatistan.py — httpx async scrapers, regex-parsed,
  polite 1.1s rate-limit, EUR/USD anchored
- col/cache.py — PG-backed cache (col_snapshot table, 1-year TTL)
- col/service.py — sync compute_col_ratio() for the simulator; async
  lookup_city_cached() with source reconciliation for the refresh CronJob
- alembic 0005 — col_snapshot table, UNIQUE(city_slug, source_name)

Simulator wiring:
- SimulateRequest gains col_auto_adjust=True (default), col_baseline_city,
  col_target_city. Defaults pick the jurisdiction's representative city.
- _resolve_col_adjustment scales spending_gbp before path-building.
- SimulateResult surfaces col_multiplier_applied + col_adjusted_spending_gbp.

CLIs:
- python -m fire_planner col-seed — loads BASELINES into col_snapshot
  (post-migration seed step)
- python -m fire_planner col-refresh-stale --within-days 7 — used by the
  weekly fire-planner-col-refresh CronJob

268 tests pass. Mypy strict + ruff clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 14:14:57 +00:00

170 lines
6.5 KiB
Python

"""COL service — lookup + ratio computation + async cache+scrape orchestration.
Sync path (Phase 1 — used by simulator's `_resolve_col_adjustment`):
compute_col_ratio(baseline, target) → in-process BASELINES lookup.
Fast, no DB roundtrip, no I/O.
Async path (Phase 2 — used by refresh CronJob and on-demand fetch):
lookup_city_cached(slug, sess) → cache → scrape → upsert.
Reconciles Numbeo (primary) + Expatistan (secondary) into a single
CityCostIndex per city. Cache TTL 1 year.
The simulator deliberately stays on the sync path: it needs sub-ms
latency per request and doesn't tolerate transient scraper failures.
The async path keeps the cache fresh in the background.
"""
from __future__ import annotations
import logging
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.col import cache as col_cache
from fire_planner.col.baseline import BASELINES
from fire_planner.col.expatistan import ExpatistanFetchError, ExpatistanScraper
from fire_planner.col.models import CityCostIndex
from fire_planner.col.numbeo import NumbeoFetchError, NumbeoScraper
log = logging.getLogger(__name__)
# Each jurisdiction has a single canonical city we anchor on. Picked
# to match where most users would live (capital or main expat hub) —
# Cyprus → Limassol (the largest expat city), not Nicosia (capital);
# UAE → Dubai (the expat economy), not Abu Dhabi.
JURISDICTION_REPRESENTATIVE_CITY: dict[str, str] = {
"uk": "london",
"cyprus": "limassol",
"bulgaria": "sofia",
"uae": "dubai",
"malaysia": "kuala-lumpur",
"thailand": "bangkok",
# "nomad" is intentionally absent — nomad mode is COL-invariant
# because the user is on the road. The caller should skip auto-adjust
# when jurisdiction='nomad' and provide a manual spending_gbp.
}
def lookup_city(city_slug: str) -> CityCostIndex:
"""Return the cached CityCostIndex for `city_slug`.
Raises `KeyError` for unknown cities — the caller decides whether to
fall back to baseline or raise to the user.
"""
normalised = city_slug.strip().lower().replace(" ", "-")
try:
return BASELINES[normalised]
except KeyError as e:
raise KeyError(
f"No COL baseline for city {city_slug!r}; available: "
f"{sorted(BASELINES)}"
) from e
def compute_col_ratio(baseline_city: str, target_city: str) -> Decimal:
"""Ratio `target_total / baseline_total` — the multiplier to apply
to a spending figure denominated in `baseline_city` to convert it
to local prices in `target_city`.
Identity case (same city) returns exactly `Decimal("1")`.
Both anchors use the "single person, total with rent" headline —
rent is the largest single category and varies most across cities,
so excluding it would understate the actual spread.
"""
if baseline_city == target_city:
return Decimal("1")
baseline = lookup_city(baseline_city)
target = lookup_city(target_city)
return target.total_monthly_gbp / baseline.total_monthly_gbp
def representative_city_for(jurisdiction: str) -> str | None:
"""Return the canonical city for a jurisdiction, or None for 'nomad'
/ unknown jurisdictions where auto-adjust should be skipped."""
return JURISDICTION_REPRESENTATIVE_CITY.get(jurisdiction)
# Source-precedence weight when reconciling multiple snapshots — higher
# beats lower. Numbeo has the largest contributor base; Expatistan is
# a fast-decay cross-check; baseline is the hand-curated fallback.
_SOURCE_WEIGHT: dict[str, int] = {"numbeo": 3, "expatistan": 2, "baseline": 1}
def reconcile_sources(rows: list[CityCostIndex]) -> CityCostIndex | None:
"""Pick the canonical CityCostIndex from multiple per-source rows.
Today's policy: pick the row with the highest source weight. When
weights tie, prefer the most-recent `snapshot_date`. The simulator
is cross-checked against the alternates' headline numbers — when
they diverge >25%, the cache layer logs a warning so we can
audit Numbeo/Expatistan drift over time.
"""
if not rows:
return None
sorted_rows = sorted(
rows,
key=lambda r: (_SOURCE_WEIGHT.get(r.source.name, 0), r.source.snapshot_date),
reverse=True,
)
chosen = sorted_rows[0]
if len(sorted_rows) > 1:
primary_total = chosen.total_single_with_rent_gbp
for alt in sorted_rows[1:]:
divergence = abs(alt.total_single_with_rent_gbp - primary_total) / primary_total
if divergence > Decimal("0.25"):
log.warning(
"col reconcile %s: %s=%s diverges >%s%% from %s=%s",
chosen.city_slug,
alt.source.name,
alt.total_single_with_rent_gbp,
int(divergence * 100),
chosen.source.name,
primary_total,
)
return chosen
async def lookup_city_cached(
sess: AsyncSession,
city_slug: str,
*,
country: str = "",
) -> CityCostIndex:
"""Cache → scrape → fallback. Async; used by refresh CronJob and any
future on-demand fetch path.
Returns a CityCostIndex regardless of failure modes — falls back to
baseline.BASELINES on scraper failure rather than raising. The only
way this raises is if the city has no baseline AND every scraper
fails (KeyError).
"""
cached = await col_cache.read_fresh(sess, city_slug)
if cached is not None:
return cached
# Cache miss or expired — try live sources.
fetched: list[CityCostIndex] = []
try:
async with NumbeoScraper() as scraper:
fetched.append(await scraper.fetch(city_slug, country=country))
except NumbeoFetchError as e:
log.warning("numbeo fetch failed for %s: %s", city_slug, e)
try:
async with ExpatistanScraper() as scraper:
fetched.append(await scraper.fetch(city_slug, country=country))
except ExpatistanFetchError as e:
log.warning("expatistan fetch failed for %s: %s", city_slug, e)
chosen = reconcile_sources(fetched)
if chosen is not None:
for row in fetched:
await col_cache.upsert(sess, row)
return chosen
# Both scrapers failed — fall back to in-process baseline.
if city_slug in BASELINES:
baseline = BASELINES[city_slug]
await col_cache.upsert(sess, baseline)
return baseline
raise KeyError(
f"COL lookup failed for {city_slug!r}: cache empty, scrapers failed, "
f"no baseline"
)