"""DB-backed cache for cost-of-living snapshots. Architecture (Phase 2): lookup_city(slug, sess) → 1. SELECT FROM col_snapshot WHERE city_slug=slug ORDER BY fetched_at DESC LIMIT 1 2. if row and row.expires_at > now → return row, "cache_hit" 3. else fetch via NumbeoScraper, INSERT/UPDATE, return, "scraped" 4. on scrape failure → fall back to baseline.BASELINES[slug], "baseline_fallback" TTL = 1 year (Viktor's choice on 2026-05-21 — Numbeo headline numbers don't move fast enough to need monthly refresh, and the rate-limit risk is real). The Phase-3 CronJob refreshes stale rows nightly in batch so runtime lookups never have to scrape. """ from __future__ import annotations from datetime import UTC, datetime from decimal import Decimal from typing import Final from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from fire_planner.col.models import CategoryBreakdown, CityCostIndex, ColSource from fire_planner.db import ColSnapshot DEFAULT_TTL_DAYS: Final = 365 def _row_to_index(row: ColSnapshot) -> CityCostIndex: return CityCostIndex( city=row.city_display, city_slug=row.city_slug, country=row.country, total_single_no_rent_gbp=row.total_no_rent_gbp, total_single_with_rent_gbp=row.total_with_rent_gbp, breakdown=CategoryBreakdown( rent_1bed_center=row.rent_1bed_center_gbp, rent_1bed_outside=row.rent_1bed_outside_gbp, # by_category_json optional — not loaded into the Pydantic # model in Phase 2; the simulator only needs the headlines. groceries=Decimal("0"), restaurants=Decimal("0"), transport=Decimal("0"), utilities=Decimal("0"), leisure=Decimal("0"), ), source=ColSource( name=row.source_name, # type: ignore[arg-type] url=row.source_url, snapshot_date=row.snapshot_date, raw_currency=row.raw_currency, gbp_per_unit=row.gbp_per_unit, ), ) async def read_fresh( sess: AsyncSession, city_slug: str, *, now: datetime | None = None, ) -> CityCostIndex | None: """Return the freshest non-expired snapshot, or None. Picks the most-recently-fetched row across all sources for the city (Numbeo + Expatistan etc.) — service-layer reconciliation runs when writing, so the cache stores already-reconciled values. """ now = now or datetime.now(UTC) stmt = ( select(ColSnapshot) .where(ColSnapshot.city_slug == city_slug) .where(ColSnapshot.expires_at > now) .order_by(ColSnapshot.fetched_at.desc()) .limit(1) ) row = (await sess.execute(stmt)).scalar_one_or_none() return _row_to_index(row) if row else None async def upsert( sess: AsyncSession, idx: CityCostIndex, *, ttl_days: int = DEFAULT_TTL_DAYS, now: datetime | None = None, ) -> None: """Insert or update a snapshot. Unique on (city_slug, source_name).""" now = now or datetime.now(UTC) from datetime import timedelta expires = now + timedelta(days=ttl_days) values = { "city_slug": idx.city_slug, "city_display": idx.city, "country": idx.country, "source_name": idx.source.name, "source_url": idx.source.url, "snapshot_date": idx.source.snapshot_date, "fetched_at": now, "expires_at": expires, "total_no_rent_gbp": idx.total_single_no_rent_gbp, "total_with_rent_gbp": idx.total_single_with_rent_gbp, "rent_1bed_center_gbp": idx.breakdown.rent_1bed_center, "rent_1bed_outside_gbp": idx.breakdown.rent_1bed_outside, "raw_currency": idx.source.raw_currency, "gbp_per_unit": idx.source.gbp_per_unit, } dialect_name = sess.bind.dialect.name if sess.bind else "postgresql" if dialect_name == "postgresql": stmt = pg_insert(ColSnapshot).values(**values) update_cols = {k: stmt.excluded[k] for k in values if k not in {"city_slug", "source_name"}} stmt = stmt.on_conflict_do_update( constraint="uq_col_snapshot_city_source", set_=update_cols, ) await sess.execute(stmt) else: # SQLite (tests): emulate upsert manually. existing = await sess.execute( select(ColSnapshot).where( ColSnapshot.city_slug == idx.city_slug, ColSnapshot.source_name == idx.source.name, ) ) row = existing.scalar_one_or_none() if row: for k, v in values.items(): setattr(row, k, v) else: sess.add(ColSnapshot(**values)) await sess.commit() def expires_at_for(ttl_days: int = DEFAULT_TTL_DAYS, now: datetime | None = None) -> datetime: """Public helper: when would a row written `now` expire.""" from datetime import timedelta return (now or datetime.now(UTC)) + timedelta(days=ttl_days) __all__ = ["DEFAULT_TTL_DAYS", "expires_at_for", "read_fresh", "upsert"]