fire-planner/tests/test_fire_targets_cli_helpers.py

74 lines
2.7 KiB
Python
Raw Normal View History

"""DB helpers behind `recompute-fire-targets` — latest-snapshot net worth split
and COL lookups. Locks the SQL (the WORKPLACE_PENSION filter especially)."""
from __future__ import annotations
from datetime import UTC, date, datetime
from decimal import Decimal
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from fire_planner.__main__ import (
_all_city_slugs,
_current_liquid_and_pension,
_load_col_latest,
)
from fire_planner.db import AccountSnapshot, ColSnapshot
def _acct(ext: str, d: date, atype: str, gbp: str) -> AccountSnapshot:
return AccountSnapshot(
external_id=ext, snapshot_date=d, account_id=ext, account_name=atype,
account_type=atype, currency="GBP",
market_value=Decimal(gbp), market_value_gbp=Decimal(gbp),
)
def _col(slug: str, disp: str, d: date, no_rent: str, rent: str) -> ColSnapshot:
return ColSnapshot(
city_slug=slug, city_display=disp, country=disp, source_name="baseline",
snapshot_date=d, expires_at=datetime(2027, 1, 1, tzinfo=UTC),
total_no_rent_gbp=Decimal(no_rent), total_with_rent_gbp=Decimal(no_rent),
rent_1bed_center_gbp=Decimal(rent),
)
async def test_liquid_and_pension_use_latest_date_and_split_pension(
session: AsyncSession,
) -> None:
# An older snapshot that must be ignored.
session.add(_acct("old:isa", date(2026, 1, 1), "ISA", "1.00"))
# Latest date: two liquid accounts + one locked pension.
session.add_all([
_acct("gia", date(2026, 6, 20), "GIA", "761000.00"),
_acct("isa", date(2026, 6, 20), "ISA", "231000.00"),
_acct("pension", date(2026, 6, 20), "WORKPLACE_PENSION", "139000.00"),
])
await session.commit()
liquid, pension = await _current_liquid_and_pension(session)
assert liquid == pytest.approx(992_000.0)
assert pension == pytest.approx(139_000.0)
async def test_load_col_latest_picks_most_recent(session: AsyncSession) -> None:
session.add_all([
_col("sofia", "Sofia", date(2025, 1, 1), "600", "500"),
_col("sofia", "Sofia", date(2026, 5, 20), "713", "679"),
])
await session.commit()
row = await _load_col_latest(session, "sofia")
assert row is not None
assert row.total_no_rent_gbp == Decimal("713")
assert await _load_col_latest(session, "atlantis") is None
async def test_all_city_slugs_is_distinct_sorted(session: AsyncSession) -> None:
session.add_all([
_col("sofia", "Sofia", date(2026, 5, 1), "713", "679"),
_col("sofia", "Sofia", date(2026, 5, 20), "713", "679"),
_col("lisbon", "Lisbon", date(2026, 5, 1), "900", "1100"),
])
await session.commit()
assert await _all_city_slugs(session) == ["lisbon", "sofia"]