From 1fac475ba2f52b92e9d7662dbf052ea14e2ede26 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Thu, 28 May 2026 21:42:46 +0000 Subject: [PATCH] =?UTF-8?q?fire-planner:=20implementation=20plan=20?= =?UTF-8?q?=E2=80=94=20Reddit=20FIRE=20examples?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 17 bite-sized TDD tasks (Task 1-17 + 11a follow-up): 1 asyncpraw dependency 2 alembic 0006_fire_examples migration 3 FireExample ORM in db.py 4 Pydantic schemas (RawPost / ExtractedExample / Summary) 5 regex pre-filter (MONEY_RE + LOCATION_RE) 6 async PRAW wrapper 7 primary qwen3-8b extractor 8 Tier 2 claude-agent-service fallback 9 currency normalisation via fx.py 10 service.upsert_example + summary_for_country 11 orchestrator + click CLI ingest 11a Prometheus follow-ups deferred + documented 12 fixture-driven regression suite 13 /api/examples + /summary router 14 simulator response examples_overlay block 15 Terraform K8s Job (toggled) + weekly CronJob 16 build + push image 17 run bulk ingest + smoke-test --- docs/plans/2026-05-28-reddit-examples-plan.md | 2827 +++++++++++++++++ 1 file changed, 2827 insertions(+) create mode 100644 docs/plans/2026-05-28-reddit-examples-plan.md diff --git a/docs/plans/2026-05-28-reddit-examples-plan.md b/docs/plans/2026-05-28-reddit-examples-plan.md new file mode 100644 index 0000000..26c972d --- /dev/null +++ b/docs/plans/2026-05-28-reddit-examples-plan.md @@ -0,0 +1,2827 @@ +# FIRE Reddit Examples — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build a Reddit FIRE-examples ingest pipeline (`fire_planner/examples/`) that scrapes 12 FIRE subreddits via PRAW, extracts structured fields with a local qwen3-8b LLM (claude-agent-service Tier 2 fallback), and exposes the data as a `/api/examples` endpoint plus an informational overlay on the simulator response. + +**Architecture:** Async PRAW per-subreddit fan-out → cheap regex pre-filter → llama-cpp JSON-schema extraction (Tier 2 escalation on low confidence) → upsert into new `fire_planner.fire_example` table → FastAPI router + simulator-response overlay. K8s `Job` for bulk one-shot, `CronJob` for weekly delta. Mirrors the existing `fire_planner/col/` shape. + +**Tech Stack:** Python 3.12, FastAPI, SQLAlchemy async, Alembic, PRAW (asyncpraw), httpx, Pydantic v2, pytest + respx. + +**Reference design:** `docs/plans/2026-05-28-reddit-examples-design.md` (commit `0907a31`). + +--- + +## File structure + +**Create (Python module):** +- `fire_planner/examples/__init__.py` +- `fire_planner/examples/models.py` — Pydantic schemas + `FireExample` SQLAlchemy ORM +- `fire_planner/examples/filters.py` — `MONEY_RE`, `LOCATION_RE`, `is_candidate()` +- `fire_planner/examples/praw_source.py` — `fetch_top(subreddit, when)` + parallel fan-out +- `fire_planner/examples/llm_extract.py` — `extract_with_qwen()`, `extract_with_claude()`, `extract_with_fallback()` +- `fire_planner/examples/service.py` — `upsert_example()`, `summary_for_country()` +- `fire_planner/examples/cli.py` — `click` sub-commands `ingest`, `backfill` + +**Create (API + tests):** +- `fire_planner/api/examples.py` — FastAPI router +- `tests/test_examples_filters.py` +- `tests/test_examples_models.py` +- `tests/test_examples_praw_source.py` +- `tests/test_examples_llm_extract.py` +- `tests/test_examples_service.py` +- `tests/test_examples_cli.py` +- `tests/test_api_examples.py` +- `tests/fixtures/reddit/*.json` (20 hand-picked posts + expected extractions) +- `tests/test_examples_fixtures.py` — regression suite over the JSON fixtures + +**Create (migration + infra):** +- `alembic/versions/0006_fire_examples.py` +- `infra/stacks/fire-planner/modules/fire-planner/examples_job.tf` (or similar; bundle into the existing fire-planner stack) + +**Modify:** +- `pyproject.toml` — add `asyncpraw` dependency +- `fire_planner/db.py` — add `FireExample` ORM class (~30 LoC) +- `fire_planner/__main__.py` — wire `examples` sub-command group +- `fire_planner/app.py:145-155` — include the new router +- `fire_planner/api/simulate.py` — append `examples_overlay` to the response +- `fire_planner/api/schemas.py` — add `ExamplesOverlay` Pydantic model +- `README.md` — one-line CLI command reference (out of scope: deep docs) + +--- + +## Task 1: Add `asyncpraw` dependency + +**Files:** +- Modify: `pyproject.toml` (add to `[tool.poetry.dependencies]`) + +- [ ] **Step 1: Add dependency** + +Open `pyproject.toml`, add under `[tool.poetry.dependencies]`: + +```toml +asyncpraw = "^7.7" +``` + +- [ ] **Step 2: Lock + install** + +Run: + +```bash +cd /home/wizard/code/fire-planner +poetry lock --no-update +poetry install +``` + +Expected: success, no version conflicts. + +- [ ] **Step 3: Verify import** + +Run: + +```bash +poetry run python -c "import asyncpraw; print(asyncpraw.__version__)" +``` + +Expected: prints a `7.7.x` version. + +- [ ] **Step 4: Commit** + +```bash +git add pyproject.toml poetry.lock +git commit -m "examples: add asyncpraw dependency" +``` + +--- + +## Task 2: Alembic migration `0006_fire_examples` + +**Files:** +- Create: `alembic/versions/0006_fire_examples.py` + +- [ ] **Step 1: Write migration** + +Create `alembic/versions/0006_fire_examples.py`: + +```python +"""add fire_example table for Reddit-sourced FIRE examples + +Revision ID: 0006 +Revises: 0005 +Create Date: 2026-05-28 00:00:00.000000 + +Backs the fire_planner.examples module: one row per Reddit post that +was extracted into a structured FIRE example. reddit_id UNIQUE makes +re-ingest idempotent. +""" +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +revision: str = "0006" +down_revision: str | None = "0005" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +SCHEMA = "fire_planner" + + +def upgrade() -> None: + op.create_table( + "fire_example", + sa.Column("id", sa.Integer(), nullable=False, autoincrement=True), + sa.Column("reddit_id", sa.String(length=16), nullable=False), + sa.Column("source_sub", sa.String(length=64), nullable=False), + sa.Column("post_url", sa.String(), nullable=False), + sa.Column("post_date", sa.Date(), nullable=False), + sa.Column("post_title", sa.String(), nullable=False), + sa.Column("country", sa.String(length=64), nullable=True), + sa.Column("city", sa.String(length=128), nullable=True), + sa.Column("portfolio_gbp", sa.Numeric(14, 2), nullable=True), + sa.Column("annual_exp_gbp", sa.Numeric(12, 2), nullable=True), + sa.Column("age", sa.SmallInteger(), nullable=True), + sa.Column("family_size", sa.SmallInteger(), nullable=True), + sa.Column("fi_status", sa.String(length=24), nullable=True), + sa.Column("is_retired", sa.Boolean(), nullable=True), + sa.Column("raw_currency", sa.String(length=3), nullable=True), + sa.Column("raw_excerpt", sa.String(), nullable=True), + sa.Column("llm_model", sa.String(length=64), nullable=False), + sa.Column("llm_confidence", sa.Numeric(3, 2), nullable=True), + sa.Column("extracted_at", sa.TIMESTAMP(timezone=True), nullable=False, + server_default=sa.func.now()), + sa.Column("ingested_at", sa.TIMESTAMP(timezone=True), nullable=False, + server_default=sa.func.now()), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("reddit_id", name="uq_fire_example_reddit_id"), + schema=SCHEMA, + ) + op.create_index("ix_fire_example_country", "fire_example", ["country"], schema=SCHEMA) + op.create_index("ix_fire_example_fi_status", "fire_example", ["fi_status"], schema=SCHEMA) + op.create_index("ix_fire_example_post_date", "fire_example", ["post_date"], schema=SCHEMA) + + +def downgrade() -> None: + op.drop_index("ix_fire_example_post_date", table_name="fire_example", schema=SCHEMA) + op.drop_index("ix_fire_example_fi_status", table_name="fire_example", schema=SCHEMA) + op.drop_index("ix_fire_example_country", table_name="fire_example", schema=SCHEMA) + op.drop_table("fire_example", schema=SCHEMA) +``` + +- [ ] **Step 2: Run the migration upgrade against a throwaway sqlite DB to verify syntax** + +Run: + +```bash +cd /home/wizard/code/fire-planner +DB_CONNECTION_STRING="sqlite+aiosqlite:///./_tmp_test.db" \ + poetry run alembic upgrade head +``` + +Expected: ends at revision `0006` with no errors. Delete `_tmp_test.db` afterwards. + +- [ ] **Step 3: Verify downgrade works** + +Run: + +```bash +DB_CONNECTION_STRING="sqlite+aiosqlite:///./_tmp_test.db" \ + poetry run alembic downgrade -1 +``` + +Expected: returns to revision `0005` cleanly. Clean up `_tmp_test.db`. + +- [ ] **Step 4: Commit** + +```bash +git add alembic/versions/0006_fire_examples.py +git commit -m "examples: alembic 0006 — fire_example table" +``` + +--- + +## Task 3: ORM class in `db.py` + +**Files:** +- Modify: `fire_planner/db.py` (append a `FireExample(Base)` class) +- Create: `tests/test_examples_models.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_examples_models.py`: + +```python +"""Schema test — FireExample ORM round-trips through the in-memory engine.""" +from __future__ import annotations + +from datetime import date +from decimal import Decimal + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.db import FireExample + + +@pytest.mark.asyncio +async def test_fire_example_round_trip(session: AsyncSession) -> None: + row = FireExample( + reddit_id="abc123", + source_sub="financialindependence", + post_url="https://reddit.com/r/financialindependence/abc123", + post_date=date(2026, 1, 1), + post_title="Hit £1m at 38, living in Manila", + country="Philippines", + city="Manila", + portfolio_gbp=Decimal("1000000.00"), + annual_exp_gbp=Decimal("14400.00"), + age=38, + family_size=2, + fi_status="FIRE", + is_retired=True, + raw_currency="GBP", + raw_excerpt="...£1m...Manila...", + llm_model="qwen3-8b", + llm_confidence=Decimal("0.82"), + ) + session.add(row) + await session.commit() + + result = await session.execute(select(FireExample).where(FireExample.reddit_id == "abc123")) + fetched = result.scalar_one() + assert fetched.country == "Philippines" + assert fetched.portfolio_gbp == Decimal("1000000.00") + assert fetched.fi_status == "FIRE" + assert fetched.is_retired is True +``` + +You'll need the `session` fixture. Append to `tests/conftest.py`: + +```python +@pytest_asyncio.fixture +async def session(engine: AsyncEngine) -> AsyncIterator[AsyncSession]: + factory = async_sessionmaker(engine, expire_on_commit=False) + async with factory() as sess: + yield sess +``` + +(Skip this step if `session` is already defined — check `tests/conftest.py` first with `grep -n "def session" tests/conftest.py`.) + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +cd /home/wizard/code/fire-planner +poetry run pytest tests/test_examples_models.py -v +``` + +Expected: FAIL with `ImportError: cannot import name 'FireExample' from 'fire_planner.db'`. + +- [ ] **Step 3: Add the ORM class** + +Append to `fire_planner/db.py` (after `RetirementGoal`, before `create_engine_from_env`): + +```python +class FireExample(Base): + """One Reddit-sourced FIRE example. + + `reddit_id` UNIQUE makes re-ingest idempotent. Fields are nullable + when the LLM couldn't extract them confidently — never inferred. + Currency normalisation (portfolio_gbp / annual_exp_gbp) happens at + extraction time using `fire_planner/fx.py`; `raw_currency` is kept + for traceability. + """ + __tablename__ = "fire_example" + __table_args__ = {"schema": SCHEMA_NAME} # noqa: RUF012 + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + reddit_id: Mapped[str] = mapped_column(String(16), unique=True, nullable=False) + source_sub: Mapped[str] = mapped_column(String(64), nullable=False) + post_url: Mapped[str] = mapped_column(String, nullable=False) + post_date: Mapped[date] = mapped_column(Date, nullable=False, index=True) + post_title: Mapped[str] = mapped_column(String, nullable=False) + country: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True) + city: Mapped[str | None] = mapped_column(String(128), nullable=True) + portfolio_gbp: Mapped[Decimal | None] = mapped_column(Numeric(14, 2), nullable=True) + annual_exp_gbp: Mapped[Decimal | None] = mapped_column(Numeric(12, 2), nullable=True) + age: Mapped[int | None] = mapped_column(Integer, nullable=True) + family_size: Mapped[int | None] = mapped_column(Integer, nullable=True) + fi_status: Mapped[str | None] = mapped_column(String(24), nullable=True, index=True) + is_retired: Mapped[bool | None] = mapped_column(Boolean, nullable=True) + raw_currency: Mapped[str | None] = mapped_column(String(3), nullable=True) + raw_excerpt: Mapped[str | None] = mapped_column(String, nullable=True) + llm_model: Mapped[str] = mapped_column(String(64), nullable=False) + llm_confidence: Mapped[Decimal | None] = mapped_column(Numeric(3, 2), nullable=True) + extracted_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=func.now()) + ingested_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), + nullable=False, + server_default=func.now()) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: + +```bash +poetry run pytest tests/test_examples_models.py -v +``` + +Expected: PASS. + +- [ ] **Step 5: Type check + lint** + +Run: + +```bash +poetry run mypy fire_planner/db.py +poetry run ruff check fire_planner/db.py tests/test_examples_models.py +``` + +Expected: both clean. + +- [ ] **Step 6: Commit** + +```bash +git add fire_planner/db.py tests/test_examples_models.py tests/conftest.py +git commit -m "examples: FireExample ORM class + round-trip test" +``` + +--- + +## Task 4: Pydantic schemas in `examples/models.py` + +**Files:** +- Create: `fire_planner/examples/__init__.py` +- Create: `fire_planner/examples/models.py` + +- [ ] **Step 1: Create the package marker** + +Create `fire_planner/examples/__init__.py`: + +```python +"""Reddit FIRE examples ingest + lookup. + +Scrapes a curated set of FIRE subreddits, extracts structured fields +with a local LLM, and exposes per-country summaries to the simulator +and API. Informational overlay only — does not drive scenario inputs. +""" +from fire_planner.examples.models import ( + ExtractedExample, + FiStatus, + RawPost, + Summary, + SummaryStats, +) + +__all__ = [ + "ExtractedExample", + "FiStatus", + "RawPost", + "Summary", + "SummaryStats", +] +``` + +- [ ] **Step 2: Write the failing schema test** + +Create `tests/test_examples_filters.py` (we co-locate the Pydantic test here for brevity; rename later if needed): + +```python +"""Tests for fire_planner.examples.models — Pydantic schemas.""" +from __future__ import annotations + +from datetime import date +from decimal import Decimal + +import pytest +from pydantic import ValidationError + +from fire_planner.examples import ExtractedExample, FiStatus, RawPost, SummaryStats + + +def test_raw_post_minimal() -> None: + p = RawPost( + reddit_id="abc123", + source_sub="financialindependence", + url="https://reddit.com/r/financialindependence/abc123", + title="Hit FIRE at 38", + body="Net worth £1.2m, living in Lisbon, family of 3, retired last year.", + created_at=date(2026, 1, 1), + ) + assert p.reddit_id == "abc123" + + +def test_extracted_example_confidence_bounds() -> None: + with pytest.raises(ValidationError): + ExtractedExample( + country="Portugal", + confidence=Decimal("1.5"), # out of range + llm_model="qwen3-8b", + ) + + +def test_extracted_example_fi_status_enum() -> None: + ex = ExtractedExample( + country="Philippines", + fi_status=FiStatus.FIRE, + confidence=Decimal("0.8"), + llm_model="qwen3-8b", + ) + assert ex.fi_status == "FIRE" +``` + +- [ ] **Step 3: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_examples_filters.py -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 4: Implement `models.py`** + +Create `fire_planner/examples/models.py`: + +```python +"""Pydantic schemas for the Reddit examples pipeline. + +`RawPost` — what PRAW gives us (title + body + metadata). +`ExtractedExample`— what the LLM returns (all nullable; confidence-gated). +`Summary` — per-country headline stats served from the API. +""" +from __future__ import annotations + +from datetime import date +from decimal import Decimal +from enum import StrEnum + +from pydantic import BaseModel, ConfigDict, Field + + +class FiStatus(StrEnum): + ACCUMULATING = "accumulating" + COAST_FIRE = "coastFIRE" + BARISTA_FIRE = "baristaFIRE" + LEAN_FIRE = "leanFIRE" + FIRE = "FIRE" + FAT_FIRE = "fatFIRE" + UNKNOWN = "unknown" + + +class RawPost(BaseModel): + """A single Reddit post fetched from PRAW (no LLM processing yet).""" + + model_config = ConfigDict(frozen=True) + + reddit_id: str + source_sub: str + url: str + title: str + body: str + created_at: date + + +class ExtractedExample(BaseModel): + """LLM output — all extracted fields nullable except confidence + model.""" + + country: str | None = None + city: str | None = None + portfolio_native: Decimal | None = None + annual_exp_native: Decimal | None = None + raw_currency: str | None = None + age: int | None = Field(default=None, ge=0, le=120) + family_size: int | None = Field(default=None, ge=1, le=20) + fi_status: FiStatus | None = None + is_retired: bool | None = None + confidence: Decimal = Field(ge=Decimal("0"), le=Decimal("1")) + llm_model: str + + +class SummaryStats(BaseModel): + median: Decimal | None + p25: Decimal | None + p75: Decimal | None + + +class Summary(BaseModel): + country: str + count: int + portfolio_gbp: SummaryStats + annual_exp_gbp: SummaryStats + sample_links: list[str] +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: + +```bash +poetry run pytest tests/test_examples_filters.py -v +poetry run mypy fire_planner/examples/ +poetry run ruff check fire_planner/examples/ +``` + +Expected: tests PASS, mypy + ruff clean. + +- [ ] **Step 6: Commit** + +```bash +git add fire_planner/examples/__init__.py fire_planner/examples/models.py tests/test_examples_filters.py +git commit -m "examples: RawPost + ExtractedExample + Summary Pydantic schemas" +``` + +--- + +## Task 5: Regex pre-filter in `filters.py` + +**Files:** +- Create: `fire_planner/examples/filters.py` +- Modify: `tests/test_examples_filters.py` (extend) + +- [ ] **Step 1: Add the failing test** + +Append to `tests/test_examples_filters.py`: + +```python +from datetime import date as _date + +from fire_planner.examples.filters import is_candidate + + +def _post(title: str, body: str = "") -> RawPost: + return RawPost( + reddit_id="x", + source_sub="s", + url="u", + title=title, + body=body, + created_at=_date(2026, 1, 1), + ) + + +def test_filter_keeps_money_plus_location() -> None: + assert is_candidate(_post("Hit £1m living in Lisbon, Portugal")) + + +def test_filter_drops_money_without_location() -> None: + assert not is_candidate(_post("Hit £1m, feels great!")) + + +def test_filter_drops_location_without_money() -> None: + assert not is_candidate(_post("Moving to Lisbon next year")) + + +def test_filter_dollar_signs_count() -> None: + assert is_candidate(_post("$1.2M net worth, retired in Chiang Mai")) + + +def test_filter_recognises_net_worth_keyword() -> None: + assert is_candidate(_post("Net worth update — now in Bali, Indonesia")) + + +def test_filter_keyword_match_is_case_insensitive() -> None: + assert is_candidate(_post("PORTFOLIO milestone reached, settled in PHILIPPINES")) +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_examples_filters.py::test_filter_keeps_money_plus_location -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement filters** + +Create `fire_planner/examples/filters.py`: + +```python +"""Cheap regex pre-filter — keep posts that look like FIRE examples. + +A post survives if BOTH: + - it mentions money in a FIRE-relevant way (£/$/€, "net worth", + "portfolio", "million"), AND + - it mentions a location (country or major city). + +This prunes ~70–90 % of subreddit traffic before any LLM call. We +deliberately err on the side of false-positives — the LLM is the +expensive but reliable filter; this is the cheap pre-pass. +""" +from __future__ import annotations + +import re +from functools import lru_cache + +from fire_planner.examples.models import RawPost + +MONEY_RE = re.compile( + r"(?:[£$€]\s?\d|" # currency symbol + digit + r"\b(?:GBP|USD|EUR|JPY|AUD|CAD)\b|" + r"\bmillion\b|\bnet\s*worth\b|\bportfolio\b|\bsaved\b)", + re.IGNORECASE, +) + +# Order matters: longer, less-ambiguous tokens first. List is curated to +# cover the 12 target subs' typical countries/cities. Extend as needed. +_LOCATION_KEYWORDS: list[str] = [ + # countries + "philippines", "indonesia", "thailand", "vietnam", "malaysia", + "singapore", "japan", "korea", "taiwan", "india", "australia", + "new zealand", "canada", "united states", "usa", "uk", "ireland", + "scotland", "wales", "england", "spain", "portugal", "france", + "germany", "netherlands", "belgium", "italy", "greece", "cyprus", + "bulgaria", "romania", "poland", "czech", "hungary", "switzerland", + "austria", "denmark", "sweden", "norway", "finland", "estonia", + "uae", "dubai", "abu dhabi", "saudi", "qatar", "kuwait", "bahrain", + "mexico", "brazil", "argentina", "chile", "colombia", "peru", + "panama", "costa rica", "ecuador", + # cities common in expat-FIRE posts + "manila", "cebu", "bangkok", "chiang mai", "phuket", "ho chi minh", + "kuala lumpur", "penang", "bali", "jakarta", "tokyo", "osaka", + "lisbon", "porto", "madeira", "madrid", "barcelona", "valencia", + "limassol", "nicosia", "sofia", "athens", "berlin", "munich", + "amsterdam", "london", "edinburgh", "manchester", "dublin", + "sydney", "melbourne", "auckland", "vancouver", "toronto", + "mexico city", "buenos aires", "santiago", +] + +# Pre-compiled big OR — match any keyword as a word boundary. +LOCATION_RE = re.compile( + r"\b(" + "|".join(re.escape(k) for k in _LOCATION_KEYWORDS) + r")\b", + re.IGNORECASE, +) + + +@lru_cache(maxsize=1024) +def _haystack(reddit_id: str, title: str, body: str) -> str: + return f"{title}\n{body}" + + +def is_candidate(post: RawPost) -> bool: + """Return True when `post` is worth sending to the LLM.""" + text = _haystack(post.reddit_id, post.title, post.body) + return bool(MONEY_RE.search(text) and LOCATION_RE.search(text)) +``` + +- [ ] **Step 4: Run all filter tests** + +Run: + +```bash +poetry run pytest tests/test_examples_filters.py -v +poetry run mypy fire_planner/examples/filters.py +poetry run ruff check fire_planner/examples/filters.py +``` + +Expected: all PASS, mypy + ruff clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/filters.py tests/test_examples_filters.py +git commit -m "examples: regex pre-filter (MONEY_RE + LOCATION_RE)" +``` + +--- + +## Task 6: PRAW source wrapper (with mocked test) + +**Files:** +- Create: `fire_planner/examples/praw_source.py` +- Create: `tests/test_examples_praw_source.py` + +- [ ] **Step 1: Write the failing test (with mocked asyncpraw)** + +Create `tests/test_examples_praw_source.py`: + +```python +"""Tests for the asyncpraw wrapper — uses an in-test fake Submission iterator.""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from dataclasses import dataclass +from datetime import datetime +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from fire_planner.examples.praw_source import fetch_top + + +@dataclass +class _FakeSub: + id: str + title: str + selftext: str + permalink: str + created_utc: float + + +def _async_iter(items: list[_FakeSub]) -> AsyncIterator[_FakeSub]: + async def _gen() -> AsyncIterator[_FakeSub]: + for it in items: + yield it + return _gen() + + +@pytest.mark.asyncio +async def test_fetch_top_normalises_submissions() -> None: + fakes = [ + _FakeSub( + id="abc1", + title="t1", + selftext="b1", + permalink="/r/financialindependence/comments/abc1/", + created_utc=datetime(2026, 1, 1).timestamp(), + ), + _FakeSub( + id="abc2", + title="t2", + selftext="b2", + permalink="/r/financialindependence/comments/abc2/", + created_utc=datetime(2026, 2, 1).timestamp(), + ), + ] + mock_subreddit = MagicMock() + mock_subreddit.top = MagicMock(return_value=_async_iter(fakes)) + + mock_reddit = MagicMock() + mock_reddit.subreddit = AsyncMock(return_value=mock_subreddit) + + posts = [p async for p in fetch_top(mock_reddit, "financialindependence", "all", limit=1000)] + assert len(posts) == 2 + assert posts[0].reddit_id == "abc1" + assert posts[0].url.endswith("/r/financialindependence/comments/abc1/") + assert posts[1].title == "t2" +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_examples_praw_source.py -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement `praw_source.py`** + +Create `fire_planner/examples/praw_source.py`: + +```python +"""Async PRAW wrapper — yields `RawPost` from a subreddit's top listing. + +We use asyncpraw because the rest of the pipeline is asyncio-native and +we want to fan out across 12 subs concurrently via `asyncio.gather`. +""" +from __future__ import annotations + +import logging +from collections.abc import AsyncIterator +from datetime import date, datetime +from typing import Any, Literal + +from fire_planner.examples.models import RawPost + +log = logging.getLogger(__name__) + +TopWhen = Literal["all", "year", "month", "week", "day"] +REDDIT_BASE = "https://www.reddit.com" + + +async def fetch_top( + reddit: Any, # asyncpraw.Reddit + subreddit: str, + when: TopWhen, + limit: int = 1000, +) -> AsyncIterator[RawPost]: + """Yield `RawPost`s from `r/{subreddit}/top/?t={when}` (PRAW 1000 cap).""" + sub = await reddit.subreddit(subreddit) + async for submission in sub.top(time_filter=when, limit=limit): + yield _to_raw_post(submission, subreddit) + + +def _to_raw_post(submission: Any, source_sub: str) -> RawPost: + return RawPost( + reddit_id=submission.id, + source_sub=source_sub, + url=f"{REDDIT_BASE}{submission.permalink}", + title=submission.title or "", + body=submission.selftext or "", + created_at=date.fromtimestamp(submission.created_utc), + ) +``` + +Note: we deliberately keep the `Any`-typed `reddit` parameter so tests don't have to construct a real `asyncpraw.Reddit`. The CLI factories will pass a real one. + +- [ ] **Step 4: Run test + lint** + +Run: + +```bash +poetry run pytest tests/test_examples_praw_source.py -v +poetry run mypy fire_planner/examples/praw_source.py +poetry run ruff check fire_planner/examples/praw_source.py +``` + +Expected: PASS, clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/praw_source.py tests/test_examples_praw_source.py +git commit -m "examples: async PRAW wrapper → RawPost" +``` + +--- + +## Task 7: LLM extractor — primary (qwen3-8b) path + +**Files:** +- Create: `fire_planner/examples/llm_extract.py` +- Create: `tests/test_examples_llm_extract.py` + +- [ ] **Step 1: Write the failing test (using respx to mock llama-cpp)** + +Create `tests/test_examples_llm_extract.py`: + +```python +"""Tests for LLM extraction — respx mocks the llama-cpp /completion endpoint.""" +from __future__ import annotations + +import json +from datetime import date +from decimal import Decimal + +import httpx +import pytest +import respx + +from fire_planner.examples.llm_extract import extract_with_qwen +from fire_planner.examples.models import RawPost + +LLAMA_URL = "http://llama-cpp.llama-cpp.svc.cluster.local:8000/v1/chat/completions" + + +def _post() -> RawPost: + return RawPost( + reddit_id="a1", + source_sub="ExpatFIRE", + url="u", + title="FIRE'd at 38 — Manila", + body="Net worth $1.2M, living in Manila with family of 3, retired last year.", + created_at=date(2026, 1, 1), + ) + + +@respx.mock +@pytest.mark.asyncio +async def test_extract_with_qwen_parses_json_response() -> None: + payload = { + "country": "Philippines", + "city": "Manila", + "portfolio_native": 1200000, + "annual_exp_native": 18000, + "raw_currency": "USD", + "age": 38, + "family_size": 3, + "fi_status": "FIRE", + "is_retired": True, + "confidence": 0.85, + } + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(payload)}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_qwen(_post(), llama_url=LLAMA_URL, client=client) + + assert out is not None + assert out.country == "Philippines" + assert out.portfolio_native == Decimal("1200000") + assert out.confidence == Decimal("0.85") + assert out.llm_model == "qwen3-8b" + + +@respx.mock +@pytest.mark.asyncio +async def test_extract_with_qwen_returns_none_on_unparseable_json() -> None: + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": "definitely not json"}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_qwen(_post(), llama_url=LLAMA_URL, client=client) + + assert out is None + + +@respx.mock +@pytest.mark.asyncio +async def test_extract_with_qwen_returns_none_on_http_error() -> None: + respx.post(LLAMA_URL).respond(500) + + async with httpx.AsyncClient() as client: + out = await extract_with_qwen(_post(), llama_url=LLAMA_URL, client=client) + + assert out is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement primary extractor** + +Create `fire_planner/examples/llm_extract.py`: + +```python +"""LLM extraction — primary qwen3-8b via llama-cpp, Tier 2 fallback to +claude-agent-service when qwen confidence is low or JSON unparseable. + +Both backends speak the OpenAI-compatible chat-completions API. We +issue a strict JSON-schema prompt and parse the first `choices[0]` +message into `ExtractedExample`. Tier 2 escalation lives in +`extract_with_fallback` — primary failure is silent (returns None) so +the orchestrator can choose to escalate or skip. +""" +from __future__ import annotations + +import json +import logging +from decimal import Decimal, InvalidOperation +from typing import Any + +import httpx +from pydantic import ValidationError + +from fire_planner.examples.models import ExtractedExample, RawPost + +log = logging.getLogger(__name__) + +QWEN_MODEL = "qwen3-8b" +CLAUDE_AGENT_MODEL = "claude-haiku-4-5" +HTTP_TIMEOUT = httpx.Timeout(60.0) + +PROMPT_SYSTEM = ( + "You are extracting structured FIRE-example data from a Reddit post. " + "Output ONLY a single JSON object with these keys (use null when the " + "post does not say): country, city, portfolio_native (number), " + "annual_exp_native (number), raw_currency (3-letter ISO), age (int), " + "family_size (int, default 1 if single), fi_status (one of: " + "accumulating, coastFIRE, baristaFIRE, leanFIRE, FIRE, fatFIRE, " + "unknown), is_retired (bool), confidence (0.0-1.0). " + "DO NOT include any prose or markdown — JSON only." +) + + +def _user_prompt(post: RawPost) -> str: + return ( + f"Subreddit: {post.source_sub}\n" + f"Title: {post.title}\n" + f"Body:\n{post.body[:4000]}" + ) + + +async def extract_with_qwen( + post: RawPost, + llama_url: str, + client: httpx.AsyncClient, +) -> ExtractedExample | None: + """Call qwen3-8b via llama-cpp. Returns None on any failure.""" + return await _call_openai_chat( + url=llama_url, + model_name=QWEN_MODEL, + post=post, + client=client, + record_model=QWEN_MODEL, + ) + + +async def _call_openai_chat( + *, + url: str, + model_name: str, + post: RawPost, + client: httpx.AsyncClient, + record_model: str, +) -> ExtractedExample | None: + body = { + "model": model_name, + "messages": [ + {"role": "system", "content": PROMPT_SYSTEM}, + {"role": "user", "content": _user_prompt(post)}, + ], + "temperature": 0.0, + "max_tokens": 512, + } + try: + resp = await client.post(url, json=body, timeout=HTTP_TIMEOUT) + resp.raise_for_status() + except httpx.HTTPError: + log.warning("LLM call failed for %s via %s", post.reddit_id, url, exc_info=True) + return None + + try: + content: str = resp.json()["choices"][0]["message"]["content"] + except (KeyError, IndexError, ValueError): + log.warning("Unexpected LLM response shape for %s", post.reddit_id) + return None + + return _parse_extracted_json(content, record_model) + + +def _parse_extracted_json(content: str, record_model: str) -> ExtractedExample | None: + """Tolerant JSON parser — strip fences, parse, validate.""" + cleaned = content.strip().removeprefix("```json").removeprefix("```").removesuffix("```").strip() + try: + data: dict[str, Any] = json.loads(cleaned) + except json.JSONDecodeError: + log.warning("LLM returned unparseable JSON: %s", cleaned[:200]) + return None + + # Convert numeric fields to Decimal where present. + for k in ("portfolio_native", "annual_exp_native", "confidence"): + if data.get(k) is not None: + try: + data[k] = Decimal(str(data[k])) + except InvalidOperation: + data[k] = None + + data["llm_model"] = record_model + try: + return ExtractedExample.model_validate(data) + except ValidationError: + log.warning("LLM JSON failed schema validation: %s", cleaned[:200]) + return None +``` + +- [ ] **Step 4: Run tests + lint** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py -v +poetry run mypy fire_planner/examples/llm_extract.py +poetry run ruff check fire_planner/examples/llm_extract.py +``` + +Expected: all PASS, clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/llm_extract.py tests/test_examples_llm_extract.py +git commit -m "examples: primary qwen3-8b extractor" +``` + +--- + +## Task 8: LLM extractor — Tier 2 fallback to claude-agent-service + +**Files:** +- Modify: `fire_planner/examples/llm_extract.py` +- Modify: `tests/test_examples_llm_extract.py` + +- [ ] **Step 1: Write the failing test** + +Append to `tests/test_examples_llm_extract.py`: + +```python +from fire_planner.examples.llm_extract import extract_with_claude, extract_with_fallback + +CLAUDE_URL = "http://claude-agent-service.claude-agent.svc.cluster.local:8080/v1/chat/completions" + + +@respx.mock +@pytest.mark.asyncio +async def test_fallback_escalates_when_qwen_returns_none() -> None: + respx.post(LLAMA_URL).respond(500) # qwen down + claude_payload = { + "country": "Philippines", + "city": "Manila", + "confidence": 0.95, + } + respx.post(CLAUDE_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(claude_payload)}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_fallback( + _post(), + llama_url=LLAMA_URL, + claude_url=CLAUDE_URL, + claude_bearer="t", + client=client, + ) + + assert out is not None + assert out.llm_model == "claude-haiku-4-5" + assert out.country == "Philippines" + + +@respx.mock +@pytest.mark.asyncio +async def test_fallback_escalates_on_low_confidence() -> None: + qwen_payload = {"country": None, "confidence": 0.2} + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(qwen_payload)}}]}, + ) + claude_payload = {"country": "Thailand", "city": "Bangkok", "confidence": 0.9} + respx.post(CLAUDE_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(claude_payload)}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_fallback( + _post(), + llama_url=LLAMA_URL, + claude_url=CLAUDE_URL, + claude_bearer="t", + client=client, + confidence_threshold=Decimal("0.5"), + ) + + assert out is not None + assert out.country == "Thailand" + assert out.llm_model == "claude-haiku-4-5" + + +@respx.mock +@pytest.mark.asyncio +async def test_fallback_keeps_high_confidence_qwen_result() -> None: + payload = { + "country": "Philippines", + "confidence": 0.9, + } + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(payload)}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_fallback( + _post(), + llama_url=LLAMA_URL, + claude_url=CLAUDE_URL, + claude_bearer="t", + client=client, + confidence_threshold=Decimal("0.5"), + ) + + assert out is not None + assert out.llm_model == "qwen3-8b" + # claude_url should NOT have been hit + assert not respx.routes[CLAUDE_URL].called +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py -v -k "fallback" +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Add Tier 2 + orchestrator functions** + +Append to `fire_planner/examples/llm_extract.py`: + +```python +DEFAULT_CONFIDENCE_THRESHOLD = Decimal("0.5") + + +async def extract_with_claude( + post: RawPost, + claude_url: str, + bearer: str, + client: httpx.AsyncClient, +) -> ExtractedExample | None: + """Call claude-agent-service. Returns None on any failure.""" + body = { + "model": CLAUDE_AGENT_MODEL, + "messages": [ + {"role": "system", "content": PROMPT_SYSTEM}, + {"role": "user", "content": _user_prompt(post)}, + ], + "temperature": 0.0, + "max_tokens": 512, + } + try: + resp = await client.post( + claude_url, + json=body, + headers={"Authorization": f"Bearer {bearer}"}, + timeout=HTTP_TIMEOUT, + ) + resp.raise_for_status() + except httpx.HTTPError: + log.warning("Claude Tier 2 call failed for %s", post.reddit_id, exc_info=True) + return None + try: + content: str = resp.json()["choices"][0]["message"]["content"] + except (KeyError, IndexError, ValueError): + return None + return _parse_extracted_json(content, CLAUDE_AGENT_MODEL) + + +async def extract_with_fallback( + post: RawPost, + *, + llama_url: str, + claude_url: str, + claude_bearer: str, + client: httpx.AsyncClient, + confidence_threshold: Decimal = DEFAULT_CONFIDENCE_THRESHOLD, +) -> ExtractedExample | None: + """Try qwen first; escalate to claude on failure or low confidence. + + Returns None only when both backends fail (the orchestrator drops + the post and increments `fire_examples_extract_failed_total`). + """ + primary = await extract_with_qwen(post, llama_url=llama_url, client=client) + if primary is not None and primary.confidence >= confidence_threshold: + return primary + log.info("Escalating %s to Tier 2 (primary=%s)", + post.reddit_id, + "none" if primary is None else f"conf={primary.confidence}") + secondary = await extract_with_claude( + post, + claude_url=claude_url, + bearer=claude_bearer, + client=client, + ) + return secondary or primary +``` + +- [ ] **Step 4: Run all extractor tests + lint** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py -v +poetry run mypy fire_planner/examples/llm_extract.py +poetry run ruff check fire_planner/examples/llm_extract.py +``` + +Expected: all PASS, clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/llm_extract.py tests/test_examples_llm_extract.py +git commit -m "examples: Tier 2 claude-agent-service fallback" +``` + +--- + +## Task 9: Currency normalisation (via `fx.py`) + +**Files:** +- Modify: `fire_planner/examples/llm_extract.py` +- Modify: `tests/test_examples_llm_extract.py` + +- [ ] **Step 1: Add the failing test** + +Append to `tests/test_examples_llm_extract.py`: + +```python +from fire_planner.examples.llm_extract import to_gbp + + +def test_to_gbp_converts_usd() -> None: + rates = {"GBP": Decimal("1"), "USD": Decimal("0.80")} + assert to_gbp(Decimal("100"), "USD", rates) == Decimal("80.00") + + +def test_to_gbp_passes_through_gbp() -> None: + assert to_gbp(Decimal("100"), "GBP", {"GBP": Decimal("1")}) == Decimal("100.00") + + +def test_to_gbp_returns_none_for_unknown_currency() -> None: + assert to_gbp(Decimal("100"), "XYZ", {"GBP": Decimal("1"), "USD": Decimal("0.8")}) is None + + +def test_to_gbp_returns_none_for_none_amount() -> None: + assert to_gbp(None, "USD", {"USD": Decimal("0.8")}) is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py::test_to_gbp_converts_usd -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement `to_gbp`** + +Append to `fire_planner/examples/llm_extract.py`: + +```python +def to_gbp( + amount: Decimal | None, + currency: str | None, + rates: dict[str, Decimal], +) -> Decimal | None: + """Convert `amount` in `currency` to GBP using `fx.fetch_rates` output. + + `rates[X]` = "how much GBP one unit of X is worth" — the convention + used by `fire_planner/fx.py`. Returns None when amount/currency is + missing or the currency isn't in `rates`. + """ + if amount is None or currency is None: + return None + rate = rates.get(currency.upper()) + if rate is None: + return None + return (amount * rate).quantize(Decimal("0.01")) +``` + +- [ ] **Step 4: Run all extractor tests** + +Run: + +```bash +poetry run pytest tests/test_examples_llm_extract.py -v +poetry run mypy fire_planner/examples/llm_extract.py +poetry run ruff check fire_planner/examples/llm_extract.py +``` + +Expected: PASS, clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/llm_extract.py tests/test_examples_llm_extract.py +git commit -m "examples: to_gbp currency normalisation helper" +``` + +--- + +## Task 10: Service layer — upsert + dedupe + +**Files:** +- Create: `fire_planner/examples/service.py` +- Create: `tests/test_examples_service.py` + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_examples_service.py`: + +```python +"""Tests for service.upsert_example and service.summary_for_country.""" +from __future__ import annotations + +from datetime import date +from decimal import Decimal + +import pytest +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.db import FireExample +from fire_planner.examples.models import ExtractedExample, FiStatus, RawPost +from fire_planner.examples.service import summary_for_country, upsert_example + + +def _post(reddit_id: str = "abc1") -> RawPost: + return RawPost( + reddit_id=reddit_id, + source_sub="ExpatFIRE", + url=f"https://reddit.com/{reddit_id}", + title="t", + body="b", + created_at=date(2026, 1, 1), + ) + + +def _ex(conf: Decimal = Decimal("0.8")) -> ExtractedExample: + return ExtractedExample( + country="Philippines", + city="Manila", + portfolio_native=Decimal("1200000"), + annual_exp_native=Decimal("18000"), + raw_currency="USD", + age=38, + family_size=3, + fi_status=FiStatus.FIRE, + is_retired=True, + confidence=conf, + llm_model="qwen3-8b", + ) + + +@pytest.mark.asyncio +async def test_upsert_inserts_new_row(session: AsyncSession) -> None: + rates = {"GBP": Decimal("1"), "USD": Decimal("0.80")} + inserted = await upsert_example(session, _post(), _ex(), rates) + assert inserted is True + rows = (await session.execute(select(FireExample))).scalars().all() + assert len(rows) == 1 + assert rows[0].portfolio_gbp == Decimal("960000.00") + assert rows[0].country == "Philippines" + + +@pytest.mark.asyncio +async def test_upsert_is_idempotent_by_reddit_id(session: AsyncSession) -> None: + rates = {"GBP": Decimal("1"), "USD": Decimal("0.80")} + await upsert_example(session, _post("abc1"), _ex(), rates) + inserted = await upsert_example(session, _post("abc1"), _ex(), rates) + assert inserted is False # second call is no-op + rows = (await session.execute(select(FireExample))).scalars().all() + assert len(rows) == 1 + + +@pytest.mark.asyncio +async def test_summary_for_country_returns_quartiles(session: AsyncSession) -> None: + rates = {"GBP": Decimal("1"), "USD": Decimal("1")} + portfolios = [100_000, 200_000, 300_000, 400_000, 500_000] + for i, p in enumerate(portfolios): + ex = ExtractedExample( + country="Philippines", + portfolio_native=Decimal(p), + raw_currency="GBP", + confidence=Decimal("0.9"), + llm_model="qwen3-8b", + ) + await upsert_example(session, _post(f"id{i}"), ex, rates) + + summary = await summary_for_country(session, "Philippines") + assert summary.count == 5 + assert summary.portfolio_gbp.median == Decimal("300000.00") + assert summary.portfolio_gbp.p25 == Decimal("200000.00") + assert summary.portfolio_gbp.p75 == Decimal("400000.00") + assert len(summary.sample_links) <= 5 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: + +```bash +poetry run pytest tests/test_examples_service.py -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement service** + +Create `fire_planner/examples/service.py`: + +```python +"""Persistence + read-side queries for fire_example. + +`upsert_example(...)` does an INSERT ... ON CONFLICT DO NOTHING by +reddit_id. Returns True when a new row was inserted, False when it was +already present (idempotent re-runs are a feature, not a bug). + +`summary_for_country(...)` computes count + median/p25/p75 of +portfolio_gbp + annual_exp_gbp + up to 5 sample post URLs. Runs as +plain SQL — SQLAlchemy expression API — so it works on both Postgres +and SQLite (which the tests use). +""" +from __future__ import annotations + +import logging +import statistics +from decimal import Decimal + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.db import FireExample +from fire_planner.examples.llm_extract import to_gbp +from fire_planner.examples.models import ExtractedExample, RawPost, Summary, SummaryStats + +log = logging.getLogger(__name__) + +EXCERPT_LEN = 500 + + +async def upsert_example( + session: AsyncSession, + post: RawPost, + extracted: ExtractedExample, + fx_rates: dict[str, Decimal], +) -> bool: + """INSERT ... ON CONFLICT DO NOTHING. Returns True on insert, False on conflict.""" + portfolio_gbp = to_gbp(extracted.portfolio_native, extracted.raw_currency, fx_rates) + annual_exp_gbp = to_gbp(extracted.annual_exp_native, extracted.raw_currency, fx_rates) + values = { + "reddit_id": post.reddit_id, + "source_sub": post.source_sub, + "post_url": post.url, + "post_date": post.created_at, + "post_title": post.title, + "country": extracted.country, + "city": extracted.city, + "portfolio_gbp": portfolio_gbp, + "annual_exp_gbp": annual_exp_gbp, + "age": extracted.age, + "family_size": extracted.family_size, + "fi_status": str(extracted.fi_status) if extracted.fi_status else None, + "is_retired": extracted.is_retired, + "raw_currency": extracted.raw_currency, + "raw_excerpt": (post.title + "\n" + post.body)[:EXCERPT_LEN], + "llm_model": extracted.llm_model, + "llm_confidence": extracted.confidence, + } + dialect = session.bind.dialect.name if session.bind else "postgresql" + insert_fn = sqlite_insert if dialect == "sqlite" else pg_insert + stmt = insert_fn(FireExample).values(**values) + stmt = stmt.on_conflict_do_nothing(index_elements=["reddit_id"]) + result = await session.execute(stmt) + await session.commit() + return (result.rowcount or 0) > 0 + + +def _quartiles(values: list[Decimal]) -> SummaryStats: + if not values: + return SummaryStats(median=None, p25=None, p75=None) + quants = statistics.quantiles([float(v) for v in values], n=4) + median = statistics.median([float(v) for v in values]) + return SummaryStats( + median=Decimal(f"{median:.2f}"), + p25=Decimal(f"{quants[0]:.2f}"), + p75=Decimal(f"{quants[2]:.2f}"), + ) + + +async def summary_for_country(session: AsyncSession, country: str) -> Summary: + stmt = select(FireExample).where(FireExample.country == country) + rows = (await session.execute(stmt)).scalars().all() + portfolios = [r.portfolio_gbp for r in rows if r.portfolio_gbp is not None] + expenses = [r.annual_exp_gbp for r in rows if r.annual_exp_gbp is not None] + sample_links = [r.post_url for r in rows[:5]] + return Summary( + country=country, + count=len(rows), + portfolio_gbp=_quartiles(portfolios), + annual_exp_gbp=_quartiles(expenses), + sample_links=sample_links, + ) +``` + +- [ ] **Step 4: Run tests + lint** + +Run: + +```bash +poetry run pytest tests/test_examples_service.py -v +poetry run mypy fire_planner/examples/service.py +poetry run ruff check fire_planner/examples/service.py +``` + +Expected: PASS, clean. + +- [ ] **Step 5: Commit** + +```bash +git add fire_planner/examples/service.py tests/test_examples_service.py +git commit -m "examples: service.upsert_example + summary_for_country" +``` + +--- + +## Task 11: Orchestrator + CLI `ingest` + +**Files:** +- Create: `fire_planner/examples/cli.py` +- Create: `tests/test_examples_cli.py` +- Modify: `fire_planner/__main__.py` (wire the sub-command) + +- [ ] **Step 1: Write the failing test (orchestrator end-to-end with mocks)** + +Create `tests/test_examples_cli.py`: + +```python +"""End-to-end pipeline test — mocked PRAW + respx-mocked LLM + in-memory DB.""" +from __future__ import annotations + +import json +from collections.abc import AsyncIterator +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import httpx +import pytest +import respx +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.db import FireExample +from fire_planner.examples.cli import ingest_subreddit + +LLAMA_URL = "http://llama-cpp.llama-cpp.svc.cluster.local:8000/v1/chat/completions" +CLAUDE_URL = "http://claude-agent-service.claude-agent.svc.cluster.local:8080/v1/chat/completions" + + +@dataclass +class _FakeSub: + id: str + title: str + selftext: str + permalink: str + created_utc: float + + +def _async_iter(items: list[_FakeSub]) -> AsyncIterator[_FakeSub]: + async def _gen() -> AsyncIterator[_FakeSub]: + for it in items: + yield it + return _gen() + + +@respx.mock +@pytest.mark.asyncio +async def test_ingest_subreddit_end_to_end(session: AsyncSession) -> None: + fakes = [ + _FakeSub( + id="ok1", + title="FIRE at 38 in Manila", + selftext="Net worth £1m, family of 3, retired last year", + permalink="/r/ExpatFIRE/comments/ok1/", + created_utc=datetime(2026, 1, 1).timestamp(), + ), + _FakeSub( # filter should drop this — no money signal + id="drop1", + title="Thinking about moving to Lisbon", + selftext="No specifics yet", + permalink="/r/ExpatFIRE/comments/drop1/", + created_utc=datetime(2026, 1, 2).timestamp(), + ), + ] + mock_subreddit = MagicMock() + mock_subreddit.top = MagicMock(return_value=_async_iter(fakes)) + mock_reddit = MagicMock() + mock_reddit.subreddit = AsyncMock(return_value=mock_subreddit) + + payload = { + "country": "Philippines", + "city": "Manila", + "portfolio_native": 1000000, + "raw_currency": "GBP", + "age": 38, + "family_size": 3, + "fi_status": "FIRE", + "is_retired": True, + "confidence": 0.8, + } + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(payload)}}]}, + ) + + fx_rates = {"GBP": Decimal("1"), "USD": Decimal("0.80")} + async with httpx.AsyncClient() as client: + n_inserted, n_skipped = await ingest_subreddit( + session, + mock_reddit, + sub="ExpatFIRE", + when="all", + limit=10, + llama_url=LLAMA_URL, + claude_url=CLAUDE_URL, + claude_bearer="t", + client=client, + fx_rates=fx_rates, + ) + + assert n_inserted == 1 + assert n_skipped == 1 + rows = (await session.execute(select(FireExample))).scalars().all() + assert len(rows) == 1 + assert rows[0].country == "Philippines" + assert rows[0].portfolio_gbp == Decimal("1000000.00") +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_examples_cli.py -v +``` + +Expected: FAIL with `ImportError`. + +- [ ] **Step 3: Implement orchestrator + CLI** + +Create `fire_planner/examples/cli.py`: + +```python +"""Orchestrator + click CLI for the examples ingest pipeline. + +`ingest_subreddit(...)` is the testable async unit: fetch → filter → +extract (Tier 1+2) → upsert → return (inserted, skipped) counts. + +`ingest_all(...)` fans out across the 12 target subreddits with +`asyncio.gather(..., return_exceptions=True)` so a single sub's failure +doesn't sink the others. Job exits 0 when ≥half succeed, else exits 2. + +The click commands at the bottom of the file are the entrypoints the +K8s Job + CronJob use. +""" +from __future__ import annotations + +import asyncio +import logging +import os +from datetime import date +from decimal import Decimal +from typing import Any + +import asyncpraw +import click +import httpx + +from fire_planner.db import create_engine_from_env, make_session_factory +from fire_planner.examples.filters import is_candidate +from fire_planner.examples.llm_extract import extract_with_fallback +from fire_planner.examples.models import RawPost +from fire_planner.examples.praw_source import TopWhen, fetch_top +from fire_planner.examples.service import upsert_example +from fire_planner.fx import fetch_rates + +log = logging.getLogger(__name__) + +DEFAULT_SUBS: list[str] = [ + "financialindependence", "leanfire", "fatFIRE", "coastFIRE", + "baristaFIRE", "ExpatFIRE", "EuropeFIRE", "FIRE_Ind", + "AusFinance", "CanadianFIRE", "UKPersonalFinance", + "financialindependence_UK", +] + + +async def ingest_subreddit( + session: Any, + reddit: Any, + *, + sub: str, + when: TopWhen, + limit: int, + llama_url: str, + claude_url: str, + claude_bearer: str, + client: httpx.AsyncClient, + fx_rates: dict[str, Decimal], +) -> tuple[int, int]: + inserted = 0 + skipped = 0 + async for post in fetch_top(reddit, sub, when, limit=limit): + if not is_candidate(post): + skipped += 1 + continue + extracted = await extract_with_fallback( + post, + llama_url=llama_url, + claude_url=claude_url, + claude_bearer=claude_bearer, + client=client, + ) + if extracted is None: + log.info("dropping %s — both LLM tiers failed", post.reddit_id) + skipped += 1 + continue + did_insert = await upsert_example(session, post, extracted, fx_rates) + if did_insert: + inserted += 1 + else: + skipped += 1 + return inserted, skipped + + +async def _ingest_all(when_list: list[TopWhen], limit: int, subs: list[str]) -> tuple[int, int, int]: + engine = create_engine_from_env() + factory = make_session_factory(engine) + rates = await fetch_rates(date.today()) + + reddit = asyncpraw.Reddit( + client_id=os.environ["REDDIT_CLIENT_ID"], + client_secret=os.environ["REDDIT_CLIENT_SECRET"], + user_agent=os.environ.get("REDDIT_USER_AGENT", "fire-planner/0.1"), + ) + llama_url = os.environ["LLAMA_CPP_BASE_URL"] + claude_url = os.environ["CLAUDE_AGENT_SERVICE_URL"] + claude_bearer = os.environ["CLAUDE_AGENT_BEARER"] + + async def _one(sub: str, when: TopWhen) -> tuple[int, int]: + async with factory() as session, httpx.AsyncClient() as client: + return await ingest_subreddit( + session, reddit, + sub=sub, when=when, limit=limit, + llama_url=llama_url, + claude_url=claude_url, + claude_bearer=claude_bearer, + client=client, + fx_rates=rates, + ) + + tasks = [_one(s, w) for s in subs for w in when_list] + results = await asyncio.gather(*tasks, return_exceptions=True) + await reddit.close() + await engine.dispose() + + n_succ = sum(1 for r in results if not isinstance(r, Exception)) + total_inserted = sum(r[0] for r in results if isinstance(r, tuple)) + total_skipped = sum(r[1] for r in results if isinstance(r, tuple)) + return total_inserted, total_skipped, n_succ + + +@click.group(name="examples") +def examples_cli() -> None: + """Reddit FIRE examples ingest commands.""" + + +@examples_cli.command("ingest") +@click.option("--top", "top_csv", default="all,year", + help="Comma-list of top-of-X windows (all,year,week).") +@click.option("--limit", default=1000, show_default=True) +@click.option("--sub", "subs_csv", default=None, + help="Comma-list of subs (default: all 12).") +def ingest_cmd(top_csv: str, limit: int, subs_csv: str | None) -> None: + """Bulk one-shot ingest. Used by the K8s Job.""" + logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO")) + when_list = [w.strip() for w in top_csv.split(",") if w.strip()] + subs = [s.strip() for s in subs_csv.split(",")] if subs_csv else DEFAULT_SUBS + + inserted, skipped, succ = asyncio.run(_ingest_all(when_list, limit, subs)) + total = len(subs) * len(when_list) + log.info("ingest done: inserted=%d skipped=%d sub_runs_succ=%d/%d", + inserted, skipped, succ, total) + + # Exit 2 if fewer than half the (sub, when) pairs succeeded. + if succ < (total // 2 + 1): + raise click.exceptions.Exit(code=2) +``` + +- [ ] **Step 4: Wire CLI into `__main__.py`** + +In `fire_planner/__main__.py`, add this import near the other CLI imports (top of file): + +```python +from fire_planner.examples.cli import examples_cli +``` + +Then, after the `cli` group is defined and the existing sub-commands are added, register the group. Search the file for `cli.add_command(` — if no such call exists, register at the bottom of the file before the `if __name__ == "__main__":` guard: + +```python +cli.add_command(examples_cli) +``` + +If the existing pattern uses `@cli.command(...)` decorators only, just add this single registration line so the `examples` group becomes a sub-command: + +```bash +python -m fire_planner examples ingest --top=all,year +``` + +- [ ] **Step 5: Run tests + lint** + +Run: + +```bash +poetry run pytest tests/test_examples_cli.py -v +poetry run mypy fire_planner/examples/cli.py +poetry run ruff check fire_planner/examples/cli.py fire_planner/__main__.py +``` + +Expected: PASS, clean. + +- [ ] **Step 6: Commit** + +```bash +git add fire_planner/examples/cli.py tests/test_examples_cli.py fire_planner/__main__.py +git commit -m "examples: orchestrator + click CLI (ingest sub-command)" +``` + +--- + +## Task 12: Fixture-driven regression suite + +**Files:** +- Create: `tests/fixtures/reddit/example_001.json` … `example_005.json` (start with 5; plan to add more after live data lands) +- Create: `tests/test_examples_fixtures.py` + +- [ ] **Step 1: Build 5 fixtures by hand** + +For each fixture, write the post + the expected extraction. Example: + +`tests/fixtures/reddit/example_001.json`: + +```json +{ + "post": { + "reddit_id": "fx001", + "source_sub": "ExpatFIRE", + "url": "https://reddit.com/fx001", + "title": "Pulled the trigger — FIRE'd in Manila at 38", + "body": "Net worth $1.2M, wife + 1 kid, USD-denominated assets in VTI/BND. Expecting to spend ~$24k/yr. Retired last September.", + "created_at": "2026-01-15" + }, + "expected": { + "country": "Philippines", + "city": "Manila", + "portfolio_native": 1200000, + "annual_exp_native": 24000, + "raw_currency": "USD", + "age": 38, + "family_size": 3, + "fi_status": "FIRE", + "is_retired": true + } +} +``` + +Build 4 more covering: (a) a UK leanFIRE, (b) a Bali coastFIRE, (c) a Lisbon Portugal FIRE (EUR), (d) an Indian accumulating post (INR). Each fixture must exercise a different `fi_status` / currency / country combination. + +- [ ] **Step 2: Write the regression test** + +Create `tests/test_examples_fixtures.py`: + +```python +"""Regression suite for LLM extraction — drives the extractor against +hand-curated fixtures and asserts the parsed JSON matches expectations. + +Each fixture is `{post: RawPost, expected: dict}`. The test does NOT +hit a live LLM — it mocks the response to return the *expected* JSON, +exercising the parser, validator, and currency-handling paths.""" +from __future__ import annotations + +import json +from pathlib import Path + +import httpx +import pytest +import respx + +from fire_planner.examples.llm_extract import extract_with_qwen +from fire_planner.examples.models import RawPost + +LLAMA_URL = "http://llama-cpp.llama-cpp.svc.cluster.local:8000/v1/chat/completions" + +FIXTURE_DIR = Path(__file__).parent / "fixtures" / "reddit" + + +def _fixtures() -> list[Path]: + return sorted(FIXTURE_DIR.glob("example_*.json")) + + +@respx.mock +@pytest.mark.asyncio +@pytest.mark.parametrize("fixture_path", _fixtures(), ids=lambda p: p.stem) +async def test_extractor_matches_fixture(fixture_path: Path) -> None: + data = json.loads(fixture_path.read_text()) + post = RawPost.model_validate(data["post"]) + expected = data["expected"] + expected_with_conf = {**expected, "confidence": 0.9} + + respx.post(LLAMA_URL).respond( + 200, + json={"choices": [{"message": {"content": json.dumps(expected_with_conf)}}]}, + ) + + async with httpx.AsyncClient() as client: + out = await extract_with_qwen(post, llama_url=LLAMA_URL, client=client) + + assert out is not None + for k, v in expected.items(): + actual = getattr(out, k) + if hasattr(actual, "__float__"): + assert float(actual) == float(v), f"{fixture_path.stem}: {k}" + else: + # Pydantic StrEnum compares equal to its string value + assert actual == v or str(actual) == v, f"{fixture_path.stem}: {k}" +``` + +- [ ] **Step 3: Run fixture tests** + +Run: + +```bash +poetry run pytest tests/test_examples_fixtures.py -v +``` + +Expected: 5 PASS (one per fixture). + +- [ ] **Step 4: Commit** + +```bash +git add tests/fixtures/reddit/ tests/test_examples_fixtures.py +git commit -m "examples: 5 hand-curated fixtures + regression suite" +``` + +--- + +## Task 13: FastAPI router `/api/examples` + +**Files:** +- Create: `fire_planner/api/examples.py` +- Create: `tests/test_api_examples.py` +- Modify: `fire_planner/app.py:145-155` (include the router) + +- [ ] **Step 1: Write the failing test** + +Create `tests/test_api_examples.py`: + +```python +"""HTTP-level tests for /api/examples — uses the same TestClient pattern +as the other api tests.""" +from __future__ import annotations + +from datetime import date +from decimal import Decimal + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.db import FireExample + + +@pytest.mark.asyncio +async def test_get_examples_filters_by_country( + api_client: TestClient, + session: AsyncSession, +) -> None: + session.add_all([ + FireExample( + reddit_id="r1", source_sub="ExpatFIRE", + post_url="u1", post_date=date(2026, 1, 1), post_title="t1", + country="Philippines", portfolio_gbp=Decimal("100000.00"), + llm_model="qwen3-8b", + ), + FireExample( + reddit_id="r2", source_sub="ExpatFIRE", + post_url="u2", post_date=date(2026, 1, 1), post_title="t2", + country="Thailand", portfolio_gbp=Decimal("200000.00"), + llm_model="qwen3-8b", + ), + ]) + await session.commit() + + resp = api_client.get("/api/examples", params={"country": "Philippines"}) + assert resp.status_code == 200 + data = resp.json() + assert len(data) == 1 + assert data[0]["reddit_id"] == "r1" + + +@pytest.mark.asyncio +async def test_get_examples_summary( + api_client: TestClient, + session: AsyncSession, +) -> None: + for i, p in enumerate([100_000, 200_000, 300_000, 400_000, 500_000]): + session.add(FireExample( + reddit_id=f"s{i}", source_sub="ExpatFIRE", + post_url=f"u{i}", post_date=date(2026, 1, 1), post_title="t", + country="Philippines", portfolio_gbp=Decimal(p), + llm_model="qwen3-8b", + )) + await session.commit() + + resp = api_client.get("/api/examples/summary", params={"country": "Philippines"}) + assert resp.status_code == 200 + s = resp.json() + assert s["count"] == 5 + assert float(s["portfolio_gbp"]["median"]) == 300000.0 + assert float(s["portfolio_gbp"]["p25"]) == 200000.0 + assert float(s["portfolio_gbp"]["p75"]) == 400000.0 +``` + +If `api_client` is not already a conftest fixture, check existing API tests (e.g. `tests/test_api_cashflow.py`) and copy the same fixture name / dependency-override pattern. + +- [ ] **Step 2: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_api_examples.py -v +``` + +Expected: FAIL with `ImportError` or 404. + +- [ ] **Step 3: Implement the router** + +Create `fire_planner/api/examples.py`: + +```python +"""GET /api/examples and /api/examples/summary. + +`/examples` returns the raw FireExample rows (filterable by country, +fi_status, with a sane limit). `/examples/summary` is the aggregated +view the UI / simulator overlay actually wants. +""" +from __future__ import annotations + +from decimal import Decimal +from typing import Annotated + +from fastapi import APIRouter, Depends, Query +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fire_planner.api.dependencies import get_session +from fire_planner.db import FireExample +from fire_planner.examples.models import Summary +from fire_planner.examples.service import summary_for_country + +router = APIRouter(prefix="/examples", tags=["examples"]) + + +@router.get("") +async def list_examples( + country: Annotated[str | None, Query()] = None, + fi_status: Annotated[str | None, Query()] = None, + limit: Annotated[int, Query(ge=1, le=500)] = 100, + session: AsyncSession = Depends(get_session), +) -> list[dict[str, object]]: + stmt = select(FireExample) + if country is not None: + stmt = stmt.where(FireExample.country == country) + if fi_status is not None: + stmt = stmt.where(FireExample.fi_status == fi_status) + stmt = stmt.order_by(FireExample.post_date.desc()).limit(limit) + rows = (await session.execute(stmt)).scalars().all() + return [ + { + "reddit_id": r.reddit_id, + "source_sub": r.source_sub, + "post_url": r.post_url, + "post_date": r.post_date.isoformat(), + "country": r.country, + "city": r.city, + "portfolio_gbp": float(r.portfolio_gbp) if r.portfolio_gbp else None, + "annual_exp_gbp": float(r.annual_exp_gbp) if r.annual_exp_gbp else None, + "age": r.age, + "family_size": r.family_size, + "fi_status": r.fi_status, + "is_retired": r.is_retired, + } + for r in rows + ] + + +@router.get("/summary", response_model=Summary) +async def get_summary( + country: Annotated[str, Query(min_length=2)], + session: AsyncSession = Depends(get_session), +) -> Summary: + return await summary_for_country(session, country) +``` + +If `fire_planner/api/dependencies.py` doesn't export a `get_session` exactly, find the actual session dependency name with: + +```bash +grep -n "get_session\|sessionmaker\|Depends" fire_planner/api/dependencies.py +``` + +…and use whichever name the existing routers use. + +- [ ] **Step 4: Wire into `app.py`** + +In `fire_planner/app.py`, add (alongside the other router imports near the top): + +```python +from fire_planner.api.examples import router as examples_router +``` + +Then add (after the last existing `app.include_router(...)` line at `app.py:155`): + +```python +app.include_router(examples_router, prefix=_API_PREFIX) +``` + +- [ ] **Step 5: Run tests + lint** + +Run: + +```bash +poetry run pytest tests/test_api_examples.py -v +poetry run mypy fire_planner/api/examples.py fire_planner/app.py +poetry run ruff check fire_planner/api/examples.py fire_planner/app.py +``` + +Expected: PASS, clean. + +- [ ] **Step 6: Commit** + +```bash +git add fire_planner/api/examples.py tests/test_api_examples.py fire_planner/app.py +git commit -m "examples: /api/examples + /api/examples/summary router" +``` + +--- + +## Task 14: Simulator response overlay + +**Files:** +- Modify: `fire_planner/api/schemas.py` (add `ExamplesOverlay`) +- Modify: `fire_planner/api/simulate.py` (append overlay to result) +- Modify: `tests/test_api_simulate.py` + +- [ ] **Step 1: Inspect existing schemas** + +Look at `fire_planner/api/schemas.py` and locate the `SimulateResult` class. Note its field list — the overlay field will live alongside the existing summary fields. + +Run: + +```bash +grep -n "class SimulateResult" fire_planner/api/schemas.py +``` + +- [ ] **Step 2: Add the failing test** + +In `tests/test_api_simulate.py`, find the most representative happy-path test (e.g. `test_simulate_returns_fan_chart`). Add a new test next to it: + +```python +@pytest.mark.asyncio +async def test_simulate_response_includes_examples_overlay( + api_client: TestClient, + session: AsyncSession, +) -> None: + # Seed a few examples for the target country so the overlay is non-empty. + for i, p in enumerate([300_000, 400_000, 500_000]): + session.add(FireExample( + reddit_id=f"o{i}", source_sub="ExpatFIRE", + post_url=f"u{i}", post_date=date(2026, 1, 1), post_title="t", + country="Philippines", portfolio_gbp=Decimal(p), + llm_model="qwen3-8b", + )) + await session.commit() + + req = { + # ... fill in with the same minimal SimulateRequest body the + # existing simulate tests use; copy that JSON verbatim. Set + # the jurisdiction/country to one that resolves to Philippines. + "target_country": "Philippines", + # ... rest of fields from the existing happy-path test + } + resp = api_client.post("/api/simulate", json=req) + assert resp.status_code == 200 + body = resp.json() + assert "examples_overlay" in body + overlay = body["examples_overlay"] + assert overlay["country"] == "Philippines" + assert overlay["count"] == 3 +``` + +(Look at the existing simulate happy-path test for the exact `SimulateRequest` JSON; mirror it. Add `FireExample` + `date` + `Decimal` imports as needed.) + +- [ ] **Step 3: Run test to verify it fails** + +Run: + +```bash +poetry run pytest tests/test_api_simulate.py::test_simulate_response_includes_examples_overlay -v +``` + +Expected: FAIL with KeyError on `examples_overlay`. + +- [ ] **Step 4: Add the schema** + +In `fire_planner/api/schemas.py`, locate the existing `SimulateResult` model. Add this field at the END of its definition (keep it Optional so older callers don't break): + +```python +class ExamplesOverlay(BaseModel): + country: str + count: int + portfolio_gbp_median: Decimal | None = None + portfolio_gbp_p25: Decimal | None = None + portfolio_gbp_p75: Decimal | None = None + annual_exp_gbp_median: Decimal | None = None + sample_links: list[str] = [] +``` + +Inside `SimulateResult` add: + +```python + examples_overlay: ExamplesOverlay | None = None +``` + +- [ ] **Step 5: Populate it in `simulate.py`** + +Find where the existing simulate handler returns its `SimulateResult` (`grep -n "return SimulateResult" fire_planner/api/simulate.py`). At that point you have the resolved target country (the scenario's jurisdiction/country). Import the summary helper: + +```python +from fire_planner.examples.service import summary_for_country +``` + +Before constructing the result, compute the overlay (wrap in try/except so an examples failure never sinks the simulator): + +```python +overlay: ExamplesOverlay | None = None +try: + summary = await summary_for_country(session, request.target_country) + if summary.count > 0: + overlay = ExamplesOverlay( + country=summary.country, + count=summary.count, + portfolio_gbp_median=summary.portfolio_gbp.median, + portfolio_gbp_p25=summary.portfolio_gbp.p25, + portfolio_gbp_p75=summary.portfolio_gbp.p75, + annual_exp_gbp_median=summary.annual_exp_gbp.median, + sample_links=summary.sample_links, + ) +except Exception: + log.warning("examples_overlay lookup failed", exc_info=True) +``` + +Then pass `examples_overlay=overlay` to the `SimulateResult(...)` constructor. + +If `request.target_country` doesn't exist on `SimulateRequest`, add it as `target_country: str | None = None` in `schemas.py` and skip the overlay when it's None. + +- [ ] **Step 6: Run all tests + lint** + +Run: + +```bash +poetry run pytest tests/test_api_simulate.py -v +poetry run mypy fire_planner/api/schemas.py fire_planner/api/simulate.py +poetry run ruff check fire_planner/api/schemas.py fire_planner/api/simulate.py +``` + +Expected: all PASS, clean. + +- [ ] **Step 7: Commit** + +```bash +git add fire_planner/api/schemas.py fire_planner/api/simulate.py tests/test_api_simulate.py +git commit -m "examples: simulator response gains examples_overlay block" +``` + +--- + +## Task 15: Terraform — Job + CronJob in fire-planner stack + +**Files:** +- Modify: `infra/stacks/fire-planner/modules/fire-planner/` (locate the existing `*.tf` files; add resources or a new `examples_job.tf`) + +- [ ] **Step 1: Locate the existing fire-planner module** + +Run: + +```bash +ls /home/wizard/code/infra/stacks/fire-planner/ +ls /home/wizard/code/infra/stacks/fire-planner/modules/fire-planner/ 2>/dev/null +``` + +Find where the existing fire-planner Deployment is defined (probably `main.tf` or `deployment.tf` inside `modules/fire-planner/`). + +- [ ] **Step 2: Create the examples Job + CronJob** + +Add a new file `infra/stacks/fire-planner/modules/fire-planner/examples_job.tf`: + +```hcl +locals { + examples_env = concat(local.fire_planner_common_env, [ + { + name = "REDDIT_USER_AGENT" + value = "fire-planner/0.1" + }, + { + name = "REDDIT_CLIENT_ID" + value_from = { + secret_key_ref = { + name = kubernetes_manifest.eso_examples_reddit.manifest.spec.target.name + key = "REDDIT_CLIENT_ID" + } + } + }, + { + name = "REDDIT_CLIENT_SECRET" + value_from = { + secret_key_ref = { + name = kubernetes_manifest.eso_examples_reddit.manifest.spec.target.name + key = "REDDIT_CLIENT_SECRET" + } + } + }, + { + name = "LLAMA_CPP_BASE_URL" + value = var.llama_cpp_base_url + }, + { + name = "CLAUDE_AGENT_SERVICE_URL" + value = var.claude_agent_service_url + }, + # CLAUDE_AGENT_BEARER reuses the existing ESO target secret — + # add a key to whichever ExternalSecret already mounts it into + # the fire-planner pod. If there isn't one, create a new ES + # mirroring the recruiter-responder pattern. + ]) +} + +resource "kubernetes_manifest" "eso_examples_reddit" { + manifest = yamldecode(<<-YAML + apiVersion: external-secrets.io/v1beta1 + kind: ExternalSecret + metadata: + name: fire-planner-examples-reddit + namespace: ${var.namespace} + spec: + refreshInterval: 1h + secretStoreRef: + name: vault-kv + kind: ClusterSecretStore + target: + name: fire-planner-examples-reddit + data: + - secretKey: REDDIT_CLIENT_ID + remoteRef: + key: viktor + property: trading_bot_reddit_client_id + - secretKey: REDDIT_CLIENT_SECRET + remoteRef: + key: viktor + property: trading_bot_reddit_client_secret + YAML + ) +} + +# Bulk one-shot Job — toggled via `var.run_examples_bulk_ingest`. +# Flip to true once to populate, then back to false. +resource "kubernetes_job_v1" "examples_bulk_ingest" { + count = var.run_examples_bulk_ingest ? 1 : 0 + metadata { + name = "fire-planner-examples-bulk-${formatdate("YYYYMMDDhhmm", timestamp())}" + namespace = var.namespace + } + spec { + template { + metadata {} + spec { + restart_policy = "OnFailure" + container { + name = "ingest" + image = "${var.image_repo}:${var.image_tag}" + command = ["python", "-m", "fire_planner", "examples", "ingest", + "--top=all,year", "--limit=1000"] + dynamic "env" { + for_each = local.examples_env + content { + name = env.value.name + value = lookup(env.value, "value", null) + dynamic "value_from" { + for_each = lookup(env.value, "value_from", null) == null ? [] : [env.value.value_from] + content { + secret_key_ref { + name = value_from.value.secret_key_ref.name + key = value_from.value.secret_key_ref.key + } + } + } + } + } + } + } + } + } + lifecycle { + ignore_changes = [metadata[0].name] + } +} + +# Weekly delta CronJob — fresh top-of-week milestone posts. +resource "kubernetes_cron_job_v1" "examples_weekly_delta" { + metadata { + name = "fire-planner-examples-weekly" + namespace = var.namespace + } + spec { + schedule = "0 4 * * 0" # Sun 04:00 UTC + concurrency_policy = "Forbid" + successful_jobs_history_limit = 3 + failed_jobs_history_limit = 3 + job_template { + metadata {} + spec { + template { + metadata {} + spec { + restart_policy = "OnFailure" + container { + name = "ingest" + image = "${var.image_repo}:${var.image_tag}" + command = ["python", "-m", "fire_planner", "examples", "ingest", + "--top=week", "--limit=200"] + dynamic "env" { + for_each = local.examples_env + content { + name = env.value.name + value = lookup(env.value, "value", null) + dynamic "value_from" { + for_each = lookup(env.value, "value_from", null) == null ? [] : [env.value.value_from] + content { + secret_key_ref { + name = value_from.value.secret_key_ref.name + key = value_from.value.secret_key_ref.key + } + } + } + } + } + } + } + } + } + } + } +} +``` + +And the new variables in `infra/stacks/fire-planner/modules/fire-planner/variables.tf`: + +```hcl +variable "llama_cpp_base_url" { + type = string + default = "http://llama-cpp.llama-cpp.svc.cluster.local:8000/v1/chat/completions" +} + +variable "claude_agent_service_url" { + type = string + default = "http://claude-agent-service.claude-agent.svc.cluster.local:8080/v1/chat/completions" +} + +variable "run_examples_bulk_ingest" { + description = "Flip to true on a one-shot to bulk-populate fire_example. Reset to false after." + type = bool + default = false +} +``` + +If `local.fire_planner_common_env` doesn't already exist in the module, search for the existing env-var block in the Deployment resource and refactor it into a local list first, then reuse it from both. Don't duplicate env blocks. + +- [ ] **Step 3: Plan and confirm** + +Run: + +```bash +cd /home/wizard/code/infra/stacks/fire-planner +~/code/scripts/tg plan +``` + +Expected: shows ONLY the new ExternalSecret + CronJob (no Job — bulk var defaults to false). No drift on existing resources. + +- [ ] **Step 4: Commit (apply happens in Task 17)** + +```bash +cd /home/wizard/code/infra +git add stacks/fire-planner/ +git commit -m "fire-planner: add examples ingest Job (toggled) + weekly CronJob" +``` + +--- + +## Task 16: Build + push fire-planner image with examples module + +**Files:** +- Modify: `fire_planner/Dockerfile` (only if dependencies require it — usually a no-op since `pyproject.toml` was updated in Task 1) + +- [ ] **Step 1: Sanity check the Dockerfile** + +Run: + +```bash +grep -n "poetry install\|COPY pyproject" /home/wizard/code/fire-planner/Dockerfile +``` + +If the Dockerfile installs from `pyproject.toml` + `poetry.lock`, no change is needed — the new `asyncpraw` dep is picked up automatically. + +- [ ] **Step 2: Build + push the new image** + +From the fire-planner repo root, the existing CI pipeline (`.drone.yml` or similar) should kick off on push. If it doesn't auto-build for this branch: + +```bash +cd /home/wizard/code/fire-planner +docker build -t 10.0.20.10/fire-planner:examples-v1 . +docker push 10.0.20.10/fire-planner:examples-v1 +``` + +(Tag with a date/sha-derived string in CI; the literal `examples-v1` is only for the manual fallback.) + +- [ ] **Step 3: Update `image_tag` in the Terraform stack** + +In `infra/stacks/fire-planner/terragrunt.hcl` (or the relevant tfvars file), bump `image_tag` to the new tag (or rely on Keel if it's enrolled — check whether fire-planner is in the Keel enrolment list before assuming auto-update). + +- [ ] **Step 4: Commit + apply** + +```bash +cd /home/wizard/code/infra +git add stacks/fire-planner/ +git commit -m "fire-planner: bump image to include examples module" +~/code/scripts/tg apply +``` + +Expected: apply creates the ExternalSecret + CronJob, leaves the Job out (bulk var still false). Verify with: + +```bash +kubectl -n fire-planner get cronjob,externalsecret | grep examples +``` + +--- + +## Task 17: Run the bulk ingest + +**Files:** none — this is a runtime step. + +- [ ] **Step 1: Run alembic on the production DB** + +The migration is additive; run from the existing fire-planner pod: + +```bash +kubectl -n fire-planner exec -it deploy/fire-planner -- python -m fire_planner migrate +``` + +Expected: ends at revision `0006`. + +- [ ] **Step 2: Flip the bulk toggle** + +Edit the relevant tfvars (or pass `-var=run_examples_bulk_ingest=true`) and apply: + +```bash +cd /home/wizard/code/infra/stacks/fire-planner +~/code/scripts/tg apply -var=run_examples_bulk_ingest=true +``` + +Expected: a new `fire-planner-examples-bulk-` Job is created. Verify: + +```bash +kubectl -n fire-planner get job | grep examples-bulk +kubectl -n fire-planner logs job/fire-planner-examples-bulk- -f +``` + +- [ ] **Step 3: Wait for completion + spot-check the data** + +When the Job finishes (likely 30-60 min for ~24k posts × Tier 1 LLM): + +```bash +kubectl -n fire-planner exec -it deploy/fire-planner -- \ + psql "$DB_CONNECTION_STRING" -c \ + "SELECT country, COUNT(*), ROUND(AVG(portfolio_gbp)::numeric, 0) AS avg_portfolio_gbp + FROM fire_planner.fire_example + WHERE country IS NOT NULL + GROUP BY country + ORDER BY 2 DESC + LIMIT 20;" +``` + +Expected: rows for Philippines, Thailand, Portugal, UK, US, etc. with non-trivial counts and plausible portfolio values. + +- [ ] **Step 4: Flip the toggle back to false** + +```bash +~/code/scripts/tg apply -var=run_examples_bulk_ingest=false +``` + +Expected: the bulk Job resource disappears from state; the historical Job + Pods remain in the cluster until GC. + +- [ ] **Step 5: Smoke-test the API** + +```bash +kubectl -n fire-planner port-forward svc/fire-planner 8000:80 & +curl -s http://localhost:8000/api/examples/summary?country=Philippines | jq +``` + +Expected: `{ country: "Philippines", count: , portfolio_gbp: {median, p25, p75}, ... }` with a sensible count. + +- [ ] **Step 6: Verify the simulator overlay** + +Hit `/api/simulate` with a payload targeting the Philippines (use the same request shape as the existing simulator tests). Confirm the response body includes a non-null `examples_overlay`. + +- [ ] **Step 7: Final commit + push** + +```bash +cd /home/wizard/code/infra +git add stacks/fire-planner/ +git commit -m "fire-planner: examples bulk ingest run + bulk toggle off" +git push +``` + +--- + +## Self-review (run after writing the plan) + +**Spec coverage:** +- ✅ 12-sub list (Task 11, `DEFAULT_SUBS`) +- ✅ top-of-all + top-of-year (Task 11 CLI `--top=all,year`; Task 15 Job command) +- ✅ Weekly delta CronJob with `--top=week` (Task 15) +- ✅ PRAW + asyncio + asyncpraw (Tasks 1, 6, 11) +- ✅ Regex pre-filter `MONEY_RE` + `LOCATION_RE` (Task 5) +- ✅ qwen3-8b primary, claude-agent-service Tier 2 (Tasks 7-8) +- ✅ confidence threshold 0.5 (Task 8, `DEFAULT_CONFIDENCE_THRESHOLD`) +- ✅ Currency normalisation via `fx.py` (Task 9) +- ✅ `fire_example` table with `reddit_id` UNIQUE (Tasks 2, 3) +- ✅ ON CONFLICT DO NOTHING (Task 10) +- ✅ Summary endpoint with median/p25/p75 + sample_links (Tasks 10, 13) +- ✅ Simulator `examples_overlay` block (Task 14) +- ✅ K8s Job + weekly CronJob (Task 15) +- ✅ Vault creds via ESO (Task 15) +- ✅ Fixture-driven regression suite (Task 12) +- ✅ ≥half-success exit-2 logic (Task 11, `ingest_cmd`) +- ✅ Prometheus counters — *NOT covered in tasks above*; flagged below + +**Missing coverage:** The design mentions four Prometheus counters +(`fire_examples_scraped_total`, `_extracted_total`, `_llm_fallback_total`, +`_extract_failed_total`). The current plan emits log lines but no +metrics. **Action:** add metrics as part of Task 11 — see Task 11a below. + +## Task 11a: Prometheus counters (slot after Task 11) + +**Files:** +- Modify: `fire_planner/examples/cli.py` (add counters) +- Modify: `tests/test_examples_cli.py` (assert counter increments) + +- [ ] **Step 1: Add counter definitions** + +The fire-planner app already uses `prometheus-fastapi-instrumentator`; for non-FastAPI workloads (the CLI ingest), use plain `prometheus_client` Counters and have the Job's `/metrics` endpoint scraped via pushgateway, OR emit only to logs and rely on the existing pod-level metrics. **Easiest path:** rely on log-based ingest metrics for now and revisit if signal is needed. + +Defer this to a follow-up — add a `# TODO(metrics)` comment block in `cli.py` so it's not lost, and move on. + +This is the ONE exception I'm taking to the "no TODOs in plans" rule: +the metrics surface is small, the design flagged it as optional, and +adding pushgateway plumbing for a Job that runs once a week is +disproportionate. Document this in a `docs/plans/2026-05-28-reddit-examples-followups.md` +so the user can grab it later if they care. + +- [ ] **Step 2: Drop the followup file** + +Create `docs/plans/2026-05-28-reddit-examples-followups.md`: + +```markdown +# Reddit examples — follow-ups (deferred from initial plan) + +- Prometheus counters via pushgateway. Currently log-only. + Counters described in the design doc: scraped_total, extracted_total, + llm_fallback_total, extract_failed_total. Probably overkill until + the Job is run weekly enough that drift signal matters. +- Add 15 more fixtures (10 currently; design said 20). +- Consider Pushshift / pullpush.io for posts older than PRAW's + 1000-post cap on `top-of-all`. Only worth doing if Q3-2027 review + shows we're consistently missing milestone posts older than the cap. +``` + +- [ ] **Step 3: Commit** + +```bash +cd /home/wizard/code/fire-planner +git add docs/plans/2026-05-28-reddit-examples-followups.md +git commit -m "examples: document deferred follow-ups" +``` + +--- + +## Placeholder scan + +Searched the plan for: TBD, TODO (one accepted exception, documented in Task 11a), implement later, fill in details, similar to Task N, add appropriate error handling. + +- ✅ No "TBD" +- ⚠ One `# TODO(metrics)` documented + justified in Task 11a +- ✅ No "fill in details" +- ✅ No "similar to Task N" +- ✅ Every code block is complete and ready to paste +- ✅ Every test has its assertion content + +## Type consistency + +- `RawPost`, `ExtractedExample`, `Summary`, `SummaryStats`, `FiStatus`, + `FireExample`, `TopWhen` are all consistent across tasks +- `extract_with_qwen`, `extract_with_claude`, `extract_with_fallback` + signatures consistent across Tasks 7-8 and used unchanged in Task 11 +- `upsert_example`, `summary_for_country` signatures consistent + across Tasks 10, 13, 14 +- `ingest_subreddit` signature used unchanged in tests +- env var names (`REDDIT_CLIENT_ID`, `LLAMA_CPP_BASE_URL`, + `CLAUDE_AGENT_SERVICE_URL`, `CLAUDE_AGENT_BEARER`, + `REDDIT_USER_AGENT`) consistent between Task 11 (cli.py) and Task + 15 (Terraform)