wrongmove/docs/plans/2026-02-22-debug-cli-plan.md
2026-02-22 15:08:34 +00:00

1214 lines
40 KiB
Markdown

# Debug CLI Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add a `debug` command group to the CLI that mirrors all web UI interactions, with switchable direct/HTTP execution modes and user impersonation.
**Architecture:** New `cli/` package with one module per domain (listings, decisions, pois, tasks, districts). A shared `_context.py` handles user identity resolution, HTTP client with self-signed JWT, and output formatting. The `debug` group is registered in `main.py`.
**Tech Stack:** Click (existing), httpx (existing), PyJWT (existing), no new dependencies.
---
### Task 1: Create `cli/` package with shared context
**Files:**
- Create: `cli/__init__.py`
- Create: `cli/_context.py`
**Step 1: Create `cli/__init__.py`**
```python
"""Debug CLI package — mirrors web UI interactions for debugging."""
```
**Step 2: Create `cli/_context.py` with CliContext, user resolution, JWT minting, and output helpers**
```python
"""Shared context for the debug CLI."""
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from database import engine
from repositories.user_repository import UserRepository
@dataclass
class CliContext:
"""Shared state passed through Click context."""
user_email: str
use_http: bool
json_output: bool
api_base_url: str
def resolve_user_id(email: str) -> int:
"""Get or create a database user by email. Returns the DB user ID."""
user_repo = UserRepository(engine)
db_user = user_repo.get_user_by_email(email)
if db_user is None:
db_user = user_repo.create_user(email)
if db_user.id is None:
raise RuntimeError(f"Failed to resolve user ID for {email}")
return db_user.id
def mint_jwt(email: str) -> str:
"""Create a self-signed JWT for HTTP mode (passkey-style HS256 token)."""
secret = os.getenv("JWT_SECRET", "change-me-in-production")
issuer = os.getenv("JWT_ISSUER", "wrongmove")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
payload = {
"sub": email,
"email": email,
"name": email,
"iss": issuer,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
}
return jwt.encode(payload, secret, algorithm=algorithm)
def get_http_headers(email: str) -> dict[str, str]:
"""Build HTTP headers with Authorization bearer token."""
token = mint_jwt(email)
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
def output(data: Any, json_mode: bool) -> None:
"""Print data in JSON or human-readable format."""
if json_mode:
print(json.dumps(data, indent=2, default=str))
elif isinstance(data, list):
if not data:
print("No results.")
return
# Print as simple table if list of dicts
if isinstance(data[0], dict):
keys = list(data[0].keys())
print(" ".join(f"{k:<20}" for k in keys))
print(" ".join("-" * 20 for _ in keys))
for row in data:
print(" ".join(f"{str(row.get(k, '')):<20}" for k in keys))
else:
for item in data:
print(f" {item}")
elif isinstance(data, dict):
for k, v in data.items():
print(f" {k}: {v}")
else:
print(data)
def error_output(message: str, json_mode: bool) -> None:
"""Print an error message."""
if json_mode:
print(json.dumps({"error": message}), file=sys.stderr)
else:
print(f"Error: {message}", file=sys.stderr)
```
**Step 3: Commit**
```bash
git add cli/__init__.py cli/_context.py
git commit -m "Add cli/ package with shared debug context"
```
---
### Task 2: Create `cli/districts.py`
This is the simplest subcommand — good to wire up the full pattern first.
**Files:**
- Create: `cli/districts.py`
**Step 1: Create `cli/districts.py`**
```python
"""Debug CLI — district commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output
from services import district_service
@click.group("districts")
def districts_group() -> None:
"""District management commands."""
pass
@districts_group.command("list")
@click.pass_context
def list_districts(ctx: click.Context) -> None:
"""List all available districts."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/get_districts",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
data = district_service.get_all_districts()
if cli_ctx.json_output:
output(data, json_mode=True)
else:
print(f"Available districts ({len(data)}):")
for name in sorted(data.keys()):
print(f" {name}: {data[name]}")
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
```
**Step 2: Commit**
```bash
git add cli/districts.py
git commit -m "Add debug CLI districts subcommand"
```
---
### Task 3: Register `debug` group in `main.py`
**Files:**
- Modify: `main.py` (add imports and debug group at bottom, before `if __name__`)
**Step 1: Add the debug group and register all subcommand groups**
Add to `main.py` after the existing commands and before `if __name__ == "__main__":`:
```python
from cli._context import CliContext
from cli.districts import districts_group
@cli.group("debug")
@click.option("--user-email", "-u", required=True, help="Email of user to impersonate")
@click.option("--http", "use_http", is_flag=True, default=False, help="Use HTTP requests instead of direct service calls")
@click.option("--json", "json_output", is_flag=True, default=False, help="Output in JSON format")
@click.option("--api-url", default="http://localhost:8000", help="API base URL for HTTP mode")
@click.pass_context
def debug(ctx: click.Context, user_email: str, use_http: bool, json_output: bool, api_url: str) -> None:
"""Debug CLI — mirrors web UI interactions with superuser access."""
ctx.ensure_object(dict)
ctx.obj["cli_ctx"] = CliContext(
user_email=user_email,
use_http=use_http,
json_output=json_output,
api_base_url=api_url,
)
debug.add_command(districts_group)
```
**Step 2: Write test for debug group registration**
Create `tests/unit/test_debug_cli.py`:
```python
"""Tests for the debug CLI."""
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from main import cli
class TestDebugGroup:
"""Tests for the debug command group."""
@patch("main.engine", new_callable=MagicMock)
def test_debug_help_shows_subcommands(self, mock_engine: MagicMock) -> None:
runner = CliRunner()
result = runner.invoke(cli, ["debug", "--help"])
assert result.exit_code == 0
assert "districts" in result.output
@patch("main.engine", new_callable=MagicMock)
def test_debug_requires_user_email(self, mock_engine: MagicMock) -> None:
runner = CliRunner()
result = runner.invoke(cli, ["debug", "districts", "list"])
assert result.exit_code != 0
assert "user-email" in result.output.lower() or "Missing" in result.output
class TestDistrictsCommand:
"""Tests for debug districts subcommand."""
@patch("cli.districts.district_service.get_all_districts")
@patch("main.engine", new_callable=MagicMock)
def test_list_districts_direct(
self, mock_engine: MagicMock, mock_get: MagicMock
) -> None:
mock_get.return_value = {"London": "R1", "Camden": "R2"}
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "districts", "list"]
)
assert result.exit_code == 0
assert "London" in result.output
@patch("cli.districts.district_service.get_all_districts")
@patch("main.engine", new_callable=MagicMock)
def test_list_districts_json(
self, mock_engine: MagicMock, mock_get: MagicMock
) -> None:
mock_get.return_value = {"London": "R1"}
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "--json", "districts", "list"]
)
assert result.exit_code == 0
import json
data = json.loads(result.output)
assert "London" in data
```
**Step 3: Run tests**
Run: `pytest tests/unit/test_debug_cli.py -v`
Expected: All pass
**Step 4: Commit**
```bash
git add main.py tests/unit/test_debug_cli.py
git commit -m "Register debug group in main.py with districts subcommand"
```
---
### Task 4: Create `cli/decisions.py`
**Files:**
- Create: `cli/decisions.py`
**Step 1: Create `cli/decisions.py`**
```python
"""Debug CLI — decision commands (like/dislike)."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from repositories.decision_repository import DecisionRepository
from services import decision_service
@click.group("decisions")
def decisions_group() -> None:
"""Like/dislike decision commands."""
pass
@decisions_group.command("set")
@click.argument("listing_id", type=int)
@click.argument("decision", type=click.Choice(["liked", "disliked"]))
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def set_decision(ctx: click.Context, listing_id: int, decision: str, listing_type: str) -> None:
"""Set a like/dislike decision for a listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.put(
f"{cli_ctx.api_base_url}/api/decisions/{listing_id}",
headers=get_http_headers(cli_ctx.user_email),
json={"decision": decision, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
result = decision_service.set_decision(repo, user_id, listing_id, listing_type, decision)
data = {
"listing_id": result.listing_id,
"listing_type": result.listing_type,
"decision": result.decision,
"created_at": str(result.created_at),
"updated_at": str(result.updated_at),
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@decisions_group.command("list")
@click.pass_context
def list_decisions(ctx: click.Context) -> None:
"""List all decisions for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/decisions",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
decisions = decision_service.get_user_decisions(repo, user_id)
data = [
{
"listing_id": d.listing_id,
"listing_type": d.listing_type,
"decision": d.decision,
"created_at": str(d.created_at),
"updated_at": str(d.updated_at),
}
for d in decisions
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@decisions_group.command("remove")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def remove_decision(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Remove a decision (un-like/un-dislike)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.delete(
f"{cli_ctx.api_base_url}/api/decisions/{listing_id}",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
deleted = decision_service.remove_decision(repo, user_id, listing_id, listing_type)
if not deleted:
error_output("Decision not found", cli_ctx.json_output)
return
data = {"success": True}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
```
**Step 2: Register in `main.py`**
Add import `from cli.decisions import decisions_group` and `debug.add_command(decisions_group)`.
**Step 3: Add tests to `tests/unit/test_debug_cli.py`**
```python
class TestDecisionsCommand:
"""Tests for debug decisions subcommand."""
@patch("cli.decisions.decision_service.set_decision")
@patch("cli.decisions.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_set_decision_direct(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_set: MagicMock
) -> None:
from datetime import datetime
mock_set.return_value = MagicMock(
listing_id=123, listing_type="RENT", decision="liked",
created_at=datetime(2025, 1, 1), updated_at=datetime(2025, 1, 1),
)
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "decisions", "set", "123", "liked", "-t", "RENT"]
)
assert result.exit_code == 0
mock_set.assert_called_once()
@patch("cli.decisions.decision_service.get_user_decisions")
@patch("cli.decisions.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_list_decisions_direct(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_list: MagicMock
) -> None:
mock_list.return_value = []
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "decisions", "list"]
)
assert result.exit_code == 0
```
**Step 4: Run tests**
Run: `pytest tests/unit/test_debug_cli.py -v`
Expected: All pass
**Step 5: Commit**
```bash
git add cli/decisions.py main.py tests/unit/test_debug_cli.py
git commit -m "Add debug CLI decisions subcommand (set, list, remove)"
```
---
### Task 5: Create `cli/pois.py`
**Files:**
- Create: `cli/pois.py`
**Step 1: Create `cli/pois.py`**
```python
"""Debug CLI — POI commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from models.listing import ListingType
from repositories.poi_repository import POIRepository
from services import poi_service, task_service
@click.group("pois")
def pois_group() -> None:
"""Point of Interest management commands."""
pass
@pois_group.command("list")
@click.pass_context
def list_pois(ctx: click.Context) -> None:
"""List all POIs for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/poi",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
pois = poi_service.get_user_pois(repo, user_id)
data = [
{
"id": p.id,
"name": p.name,
"address": p.address,
"latitude": p.latitude,
"longitude": p.longitude,
"created_at": str(p.created_at),
}
for p in pois
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("create")
@click.option("--name", required=True, help="POI name")
@click.option("--address", required=True, help="Address")
@click.option("--lat", required=True, type=float, help="Latitude")
@click.option("--lon", required=True, type=float, help="Longitude")
@click.pass_context
def create_poi(ctx: click.Context, name: str, address: str, lat: float, lon: float) -> None:
"""Create a new POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/poi",
headers=get_http_headers(cli_ctx.user_email),
json={"name": name, "address": address, "latitude": lat, "longitude": lon},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
result = poi_service.create_poi(repo, user_id, name, address, lat, lon)
data = {
"id": result.poi.id,
"name": result.poi.name,
"address": result.poi.address,
"latitude": result.poi.latitude,
"longitude": result.poi.longitude,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("update")
@click.argument("poi_id", type=int)
@click.option("--name", default=None, help="New name")
@click.option("--address", default=None, help="New address")
@click.option("--lat", default=None, type=float, help="New latitude")
@click.option("--lon", default=None, type=float, help="New longitude")
@click.pass_context
def update_poi(ctx: click.Context, poi_id: int, name: str | None, address: str | None, lat: float | None, lon: float | None) -> None:
"""Update an existing POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
body: dict = {}
if name is not None:
body["name"] = name
if address is not None:
body["address"] = address
if lat is not None:
body["latitude"] = lat
if lon is not None:
body["longitude"] = lon
resp = httpx.put(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}",
headers=get_http_headers(cli_ctx.user_email),
json=body,
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
result = poi_service.update_poi(repo, poi_id, user_id, name, address, lat, lon)
if result is None:
error_output("POI not found", cli_ctx.json_output)
return
data = {
"id": result.poi.id,
"name": result.poi.name,
"address": result.poi.address,
"latitude": result.poi.latitude,
"longitude": result.poi.longitude,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("delete")
@click.argument("poi_id", type=int)
@click.pass_context
def delete_poi(ctx: click.Context, poi_id: int) -> None:
"""Delete a POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.delete(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
deleted = poi_service.delete_poi(repo, poi_id, user_id)
if not deleted:
error_output("POI not found", cli_ctx.json_output)
return
data = {"success": True}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("calculate")
@click.argument("poi_id", type=int)
@click.option("--travel-modes", required=True, help="Comma-separated: WALK,BICYCLE,TRANSIT")
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def calculate_distances(ctx: click.Context, poi_id: int, travel_modes: str, listing_type: str) -> None:
"""Trigger distance calculation for a POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
modes = [m.strip().upper() for m in travel_modes.split(",")]
lt = ListingType[listing_type]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}/calculate",
headers=get_http_headers(cli_ctx.user_email),
json={"travel_modes": modes, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
result = poi_service.trigger_calculation(
poi_id=poi_id,
travel_modes=modes,
listing_type=lt,
user_email=cli_ctx.user_email,
)
if result.task_id:
task_service.add_task_for_user(cli_ctx.user_email, result.task_id)
data = {"task_id": result.task_id or "", "message": result.message}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("distances")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def get_distances(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Get POI distances for a specific listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/poi/distances",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_id": listing_id, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
distances = poi_service.get_distances_for_listing(repo, listing_id, lt, user_id)
data = [
{
"poi_id": d.poi_id,
"travel_mode": d.travel_mode,
"duration_seconds": d.duration_seconds,
"distance_meters": d.distance_meters,
}
for d in distances
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
```
**Step 2: Register in `main.py`**
Add import `from cli.pois import pois_group` and `debug.add_command(pois_group)`.
**Step 3: Add tests**
```python
class TestPOIsCommand:
"""Tests for debug pois subcommand."""
@patch("cli.pois.poi_service.get_user_pois")
@patch("cli.pois.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_list_pois_empty(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_pois: MagicMock
) -> None:
mock_pois.return_value = []
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "pois", "list"]
)
assert result.exit_code == 0
assert "No results" in result.output
```
**Step 4: Run tests and commit**
```bash
pytest tests/unit/test_debug_cli.py -v
git add cli/pois.py main.py tests/unit/test_debug_cli.py
git commit -m "Add debug CLI POI subcommand (list, create, update, delete, calculate, distances)"
```
---
### Task 6: Create `cli/tasks.py`
**Files:**
- Create: `cli/tasks.py`
**Step 1: Create `cli/tasks.py`**
```python
"""Debug CLI — task management commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output
from services import task_service
@click.group("tasks")
def tasks_group() -> None:
"""Background task management commands."""
pass
@tasks_group.command("status")
@click.argument("task_id")
@click.pass_context
def task_status(ctx: click.Context, task_id: str) -> None:
"""Get the status of a background task."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/task_status",
headers=get_http_headers(cli_ctx.user_email),
params={"task_id": task_id},
)
resp.raise_for_status()
data = resp.json()
else:
status = task_service.get_task_status(task_id)
data = {
"task_id": status.task_id,
"status": status.status,
"progress": status.progress,
"processed": status.processed,
"total": status.total,
"message": status.message,
"error": status.error,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("list")
@click.pass_context
def list_tasks(ctx: click.Context) -> None:
"""List all task IDs for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/tasks_for_user",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
data = task_service.get_user_tasks(cli_ctx.user_email)
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("cancel")
@click.argument("task_id")
@click.pass_context
def cancel_task(ctx: click.Context, task_id: str) -> None:
"""Cancel a running task."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/cancel_task",
headers=get_http_headers(cli_ctx.user_email),
params={"task_id": task_id},
)
resp.raise_for_status()
data = resp.json()
else:
task_service.cancel_task(task_id, user_email=cli_ctx.user_email)
data = {"success": True, "message": "Task cancelled"}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("clear")
@click.pass_context
def clear_tasks(ctx: click.Context) -> None:
"""Clear all tasks for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/clear_all_tasks",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
count = task_service.clear_all_tasks(cli_ctx.user_email)
data = {"success": True, "count": count, "message": f"Cleared {count} tasks"}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
```
**Step 2: Register in `main.py`**
Add import `from cli.tasks import tasks_group` and `debug.add_command(tasks_group)`.
**Step 3: Commit**
```bash
git add cli/tasks.py main.py
git commit -m "Add debug CLI tasks subcommand (status, list, cancel, clear)"
```
---
### Task 7: Create `cli/listings.py`
**Files:**
- Create: `cli/listings.py`
**Step 1: Create `cli/listings.py`**
```python
"""Debug CLI — listing commands."""
import asyncio
import json
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from models.listing import ListingType, QueryParameters
from repositories.listing_repository import ListingRepository
from services import listing_service, export_service, task_service
@click.group("listings")
def listings_group() -> None:
"""Listing browsing and management commands."""
pass
@listings_group.command("list")
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.option("--limit", default=10, type=int, help="Max listings to return")
@click.pass_context
def list_listings(ctx: click.Context, listing_type: str, limit: int) -> None:
"""List listings from the database."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/listing",
headers=get_http_headers(cli_ctx.user_email),
params={"limit": limit},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
result = asyncio.run(listing_service.get_listings(repository, limit=limit))
data = {
"total_count": result.total_count,
"listings": [
{
"id": l.id,
"price": l.price,
"bedrooms": l.number_of_bedrooms,
"sqm": l.square_meters,
"url": l.url,
}
for l in result.listings
],
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("detail")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def listing_detail(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Get detailed information for a single listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/listing/{listing_id}/detail",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
lt = ListingType(listing_type)
listings = asyncio.run(repository.get_listings(only_ids=[listing_id], listing_type=lt))
if not listings:
error_output("Listing not found", cli_ctx.json_output)
return
listing = listings[0]
data = {
"id": listing.id,
"price": listing.price,
"bedrooms": listing.number_of_bedrooms,
"sqm": listing.square_meters,
"agency": listing.agency,
"url": listing.url,
"council_tax_band": listing.council_tax_band,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("stream")
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.option("--batch-size", default=50, type=int)
@click.option("--limit", default=None, type=int)
@click.option("--min-bedrooms", default=1, type=int)
@click.option("--max-bedrooms", default=999, type=int)
@click.option("--min-price", default=0, type=int)
@click.option("--max-price", default=10_000_000, type=int)
@click.pass_context
def stream_listings(
ctx: click.Context,
listing_type: str,
batch_size: int,
limit: int | None,
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
) -> None:
"""Stream listings as NDJSON (mirrors the streaming API endpoint)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
qp = QueryParameters(
listing_type=lt,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
)
try:
if cli_ctx.use_http:
params: dict = {
"listing_type": listing_type,
"batch_size": batch_size,
"min_bedrooms": min_bedrooms,
"max_bedrooms": max_bedrooms,
"min_price": min_price,
"max_price": max_price,
}
if limit is not None:
params["limit"] = limit
with httpx.stream(
"GET",
f"{cli_ctx.api_base_url}/api/listing_geojson/stream",
headers=get_http_headers(cli_ctx.user_email),
params=params,
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if line.strip():
print(line)
else:
result = asyncio.run(
export_service.export_to_geojson(repository=ListingRepository(engine=engine), query_parameters=qp, limit=limit)
)
if cli_ctx.json_output:
output(result.data, json_mode=True)
else:
features = result.data.get("features", []) if result.data else []
print(f"Streamed {len(features)} features")
for f in features[:5]:
props = f.get("properties", {})
print(f" [{props.get('id')}] {props.get('displayAddress', 'N/A')} - £{props.get('price', '?')}")
if len(features) > 5:
print(f" ... and {len(features) - 5} more")
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("refresh")
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.option("--min-bedrooms", default=1, type=int)
@click.option("--max-bedrooms", default=10, type=int)
@click.option("--min-price", default=0, type=int)
@click.option("--max-price", default=999_999, type=int)
@click.pass_context
def refresh_listings(
ctx: click.Context,
listing_type: str,
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
) -> None:
"""Trigger a listing refresh (scrape from Rightmove)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
qp = QueryParameters(
listing_type=lt,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
)
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/refresh_listings",
headers=get_http_headers(cli_ctx.user_email),
params={
"listing_type": listing_type,
"min_bedrooms": min_bedrooms,
"max_bedrooms": max_bedrooms,
"min_price": min_price,
"max_price": max_price,
},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
result = asyncio.run(
listing_service.refresh_listings(
repository, qp, async_mode=False,
)
)
data = {"message": result.message, "new_count": result.new_listings_count}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
```
**Step 2: Register in `main.py`**
Add import `from cli.listings import listings_group` and `debug.add_command(listings_group)`.
**Step 3: Commit**
```bash
git add cli/listings.py main.py
git commit -m "Add debug CLI listings subcommand (list, detail, stream, refresh)"
```
---
### Task 8: Final wiring, run full test suite, commit
**Step 1: Verify `main.py` imports are complete**
The final imports section and registration at the bottom of `main.py` should look like:
```python
from cli._context import CliContext
from cli.districts import districts_group
from cli.decisions import decisions_group
from cli.pois import pois_group
from cli.tasks import tasks_group
from cli.listings import listings_group
# ... debug group definition ...
debug.add_command(districts_group)
debug.add_command(decisions_group)
debug.add_command(pois_group)
debug.add_command(tasks_group)
debug.add_command(listings_group)
```
**Step 2: Run full test suite**
Run: `pytest tests/ -v --tb=short`
Expected: All existing tests still pass, plus new debug CLI tests
**Step 3: Final commit**
```bash
git add -A
git commit -m "Complete debug CLI with all subcommands"
```