40 KiB
Debug CLI Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add a debug command group to the CLI that mirrors all web UI interactions, with switchable direct/HTTP execution modes and user impersonation.
Architecture: New cli/ package with one module per domain (listings, decisions, pois, tasks, districts). A shared _context.py handles user identity resolution, HTTP client with self-signed JWT, and output formatting. The debug group is registered in main.py.
Tech Stack: Click (existing), httpx (existing), PyJWT (existing), no new dependencies.
Task 1: Create cli/ package with shared context
Files:
- Create:
cli/__init__.py - Create:
cli/_context.py
Step 1: Create cli/__init__.py
"""Debug CLI package — mirrors web UI interactions for debugging."""
Step 2: Create cli/_context.py with CliContext, user resolution, JWT minting, and output helpers
"""Shared context for the debug CLI."""
import json
import os
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from database import engine
from repositories.user_repository import UserRepository
@dataclass
class CliContext:
"""Shared state passed through Click context."""
user_email: str
use_http: bool
json_output: bool
api_base_url: str
def resolve_user_id(email: str) -> int:
"""Get or create a database user by email. Returns the DB user ID."""
user_repo = UserRepository(engine)
db_user = user_repo.get_user_by_email(email)
if db_user is None:
db_user = user_repo.create_user(email)
if db_user.id is None:
raise RuntimeError(f"Failed to resolve user ID for {email}")
return db_user.id
def mint_jwt(email: str) -> str:
"""Create a self-signed JWT for HTTP mode (passkey-style HS256 token)."""
secret = os.getenv("JWT_SECRET", "change-me-in-production")
issuer = os.getenv("JWT_ISSUER", "wrongmove")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
payload = {
"sub": email,
"email": email,
"name": email,
"iss": issuer,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(hours=1),
}
return jwt.encode(payload, secret, algorithm=algorithm)
def get_http_headers(email: str) -> dict[str, str]:
"""Build HTTP headers with Authorization bearer token."""
token = mint_jwt(email)
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
def output(data: Any, json_mode: bool) -> None:
"""Print data in JSON or human-readable format."""
if json_mode:
print(json.dumps(data, indent=2, default=str))
elif isinstance(data, list):
if not data:
print("No results.")
return
# Print as simple table if list of dicts
if isinstance(data[0], dict):
keys = list(data[0].keys())
print(" ".join(f"{k:<20}" for k in keys))
print(" ".join("-" * 20 for _ in keys))
for row in data:
print(" ".join(f"{str(row.get(k, '')):<20}" for k in keys))
else:
for item in data:
print(f" {item}")
elif isinstance(data, dict):
for k, v in data.items():
print(f" {k}: {v}")
else:
print(data)
def error_output(message: str, json_mode: bool) -> None:
"""Print an error message."""
if json_mode:
print(json.dumps({"error": message}), file=sys.stderr)
else:
print(f"Error: {message}", file=sys.stderr)
Step 3: Commit
git add cli/__init__.py cli/_context.py
git commit -m "Add cli/ package with shared debug context"
Task 2: Create cli/districts.py
This is the simplest subcommand — good to wire up the full pattern first.
Files:
- Create:
cli/districts.py
Step 1: Create cli/districts.py
"""Debug CLI — district commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output
from services import district_service
@click.group("districts")
def districts_group() -> None:
"""District management commands."""
pass
@districts_group.command("list")
@click.pass_context
def list_districts(ctx: click.Context) -> None:
"""List all available districts."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/get_districts",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
data = district_service.get_all_districts()
if cli_ctx.json_output:
output(data, json_mode=True)
else:
print(f"Available districts ({len(data)}):")
for name in sorted(data.keys()):
print(f" {name}: {data[name]}")
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
Step 2: Commit
git add cli/districts.py
git commit -m "Add debug CLI districts subcommand"
Task 3: Register debug group in main.py
Files:
- Modify:
main.py(add imports and debug group at bottom, beforeif __name__)
Step 1: Add the debug group and register all subcommand groups
Add to main.py after the existing commands and before if __name__ == "__main__"::
from cli._context import CliContext
from cli.districts import districts_group
@cli.group("debug")
@click.option("--user-email", "-u", required=True, help="Email of user to impersonate")
@click.option("--http", "use_http", is_flag=True, default=False, help="Use HTTP requests instead of direct service calls")
@click.option("--json", "json_output", is_flag=True, default=False, help="Output in JSON format")
@click.option("--api-url", default="http://localhost:8000", help="API base URL for HTTP mode")
@click.pass_context
def debug(ctx: click.Context, user_email: str, use_http: bool, json_output: bool, api_url: str) -> None:
"""Debug CLI — mirrors web UI interactions with superuser access."""
ctx.ensure_object(dict)
ctx.obj["cli_ctx"] = CliContext(
user_email=user_email,
use_http=use_http,
json_output=json_output,
api_base_url=api_url,
)
debug.add_command(districts_group)
Step 2: Write test for debug group registration
Create tests/unit/test_debug_cli.py:
"""Tests for the debug CLI."""
from unittest.mock import MagicMock, patch
from click.testing import CliRunner
from main import cli
class TestDebugGroup:
"""Tests for the debug command group."""
@patch("main.engine", new_callable=MagicMock)
def test_debug_help_shows_subcommands(self, mock_engine: MagicMock) -> None:
runner = CliRunner()
result = runner.invoke(cli, ["debug", "--help"])
assert result.exit_code == 0
assert "districts" in result.output
@patch("main.engine", new_callable=MagicMock)
def test_debug_requires_user_email(self, mock_engine: MagicMock) -> None:
runner = CliRunner()
result = runner.invoke(cli, ["debug", "districts", "list"])
assert result.exit_code != 0
assert "user-email" in result.output.lower() or "Missing" in result.output
class TestDistrictsCommand:
"""Tests for debug districts subcommand."""
@patch("cli.districts.district_service.get_all_districts")
@patch("main.engine", new_callable=MagicMock)
def test_list_districts_direct(
self, mock_engine: MagicMock, mock_get: MagicMock
) -> None:
mock_get.return_value = {"London": "R1", "Camden": "R2"}
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "districts", "list"]
)
assert result.exit_code == 0
assert "London" in result.output
@patch("cli.districts.district_service.get_all_districts")
@patch("main.engine", new_callable=MagicMock)
def test_list_districts_json(
self, mock_engine: MagicMock, mock_get: MagicMock
) -> None:
mock_get.return_value = {"London": "R1"}
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "--json", "districts", "list"]
)
assert result.exit_code == 0
import json
data = json.loads(result.output)
assert "London" in data
Step 3: Run tests
Run: pytest tests/unit/test_debug_cli.py -v
Expected: All pass
Step 4: Commit
git add main.py tests/unit/test_debug_cli.py
git commit -m "Register debug group in main.py with districts subcommand"
Task 4: Create cli/decisions.py
Files:
- Create:
cli/decisions.py
Step 1: Create cli/decisions.py
"""Debug CLI — decision commands (like/dislike)."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from repositories.decision_repository import DecisionRepository
from services import decision_service
@click.group("decisions")
def decisions_group() -> None:
"""Like/dislike decision commands."""
pass
@decisions_group.command("set")
@click.argument("listing_id", type=int)
@click.argument("decision", type=click.Choice(["liked", "disliked"]))
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def set_decision(ctx: click.Context, listing_id: int, decision: str, listing_type: str) -> None:
"""Set a like/dislike decision for a listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.put(
f"{cli_ctx.api_base_url}/api/decisions/{listing_id}",
headers=get_http_headers(cli_ctx.user_email),
json={"decision": decision, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
result = decision_service.set_decision(repo, user_id, listing_id, listing_type, decision)
data = {
"listing_id": result.listing_id,
"listing_type": result.listing_type,
"decision": result.decision,
"created_at": str(result.created_at),
"updated_at": str(result.updated_at),
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@decisions_group.command("list")
@click.pass_context
def list_decisions(ctx: click.Context) -> None:
"""List all decisions for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/decisions",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
decisions = decision_service.get_user_decisions(repo, user_id)
data = [
{
"listing_id": d.listing_id,
"listing_type": d.listing_type,
"decision": d.decision,
"created_at": str(d.created_at),
"updated_at": str(d.updated_at),
}
for d in decisions
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@decisions_group.command("remove")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def remove_decision(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Remove a decision (un-like/un-dislike)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.delete(
f"{cli_ctx.api_base_url}/api/decisions/{listing_id}",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = DecisionRepository(engine)
deleted = decision_service.remove_decision(repo, user_id, listing_id, listing_type)
if not deleted:
error_output("Decision not found", cli_ctx.json_output)
return
data = {"success": True}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
Step 2: Register in main.py
Add import from cli.decisions import decisions_group and debug.add_command(decisions_group).
Step 3: Add tests to tests/unit/test_debug_cli.py
class TestDecisionsCommand:
"""Tests for debug decisions subcommand."""
@patch("cli.decisions.decision_service.set_decision")
@patch("cli.decisions.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_set_decision_direct(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_set: MagicMock
) -> None:
from datetime import datetime
mock_set.return_value = MagicMock(
listing_id=123, listing_type="RENT", decision="liked",
created_at=datetime(2025, 1, 1), updated_at=datetime(2025, 1, 1),
)
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "decisions", "set", "123", "liked", "-t", "RENT"]
)
assert result.exit_code == 0
mock_set.assert_called_once()
@patch("cli.decisions.decision_service.get_user_decisions")
@patch("cli.decisions.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_list_decisions_direct(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_list: MagicMock
) -> None:
mock_list.return_value = []
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "decisions", "list"]
)
assert result.exit_code == 0
Step 4: Run tests
Run: pytest tests/unit/test_debug_cli.py -v
Expected: All pass
Step 5: Commit
git add cli/decisions.py main.py tests/unit/test_debug_cli.py
git commit -m "Add debug CLI decisions subcommand (set, list, remove)"
Task 5: Create cli/pois.py
Files:
- Create:
cli/pois.py
Step 1: Create cli/pois.py
"""Debug CLI — POI commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from models.listing import ListingType
from repositories.poi_repository import POIRepository
from services import poi_service, task_service
@click.group("pois")
def pois_group() -> None:
"""Point of Interest management commands."""
pass
@pois_group.command("list")
@click.pass_context
def list_pois(ctx: click.Context) -> None:
"""List all POIs for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/poi",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
pois = poi_service.get_user_pois(repo, user_id)
data = [
{
"id": p.id,
"name": p.name,
"address": p.address,
"latitude": p.latitude,
"longitude": p.longitude,
"created_at": str(p.created_at),
}
for p in pois
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("create")
@click.option("--name", required=True, help="POI name")
@click.option("--address", required=True, help="Address")
@click.option("--lat", required=True, type=float, help="Latitude")
@click.option("--lon", required=True, type=float, help="Longitude")
@click.pass_context
def create_poi(ctx: click.Context, name: str, address: str, lat: float, lon: float) -> None:
"""Create a new POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/poi",
headers=get_http_headers(cli_ctx.user_email),
json={"name": name, "address": address, "latitude": lat, "longitude": lon},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
result = poi_service.create_poi(repo, user_id, name, address, lat, lon)
data = {
"id": result.poi.id,
"name": result.poi.name,
"address": result.poi.address,
"latitude": result.poi.latitude,
"longitude": result.poi.longitude,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("update")
@click.argument("poi_id", type=int)
@click.option("--name", default=None, help="New name")
@click.option("--address", default=None, help="New address")
@click.option("--lat", default=None, type=float, help="New latitude")
@click.option("--lon", default=None, type=float, help="New longitude")
@click.pass_context
def update_poi(ctx: click.Context, poi_id: int, name: str | None, address: str | None, lat: float | None, lon: float | None) -> None:
"""Update an existing POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
body: dict = {}
if name is not None:
body["name"] = name
if address is not None:
body["address"] = address
if lat is not None:
body["latitude"] = lat
if lon is not None:
body["longitude"] = lon
resp = httpx.put(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}",
headers=get_http_headers(cli_ctx.user_email),
json=body,
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
result = poi_service.update_poi(repo, poi_id, user_id, name, address, lat, lon)
if result is None:
error_output("POI not found", cli_ctx.json_output)
return
data = {
"id": result.poi.id,
"name": result.poi.name,
"address": result.poi.address,
"latitude": result.poi.latitude,
"longitude": result.poi.longitude,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("delete")
@click.argument("poi_id", type=int)
@click.pass_context
def delete_poi(ctx: click.Context, poi_id: int) -> None:
"""Delete a POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.delete(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
deleted = poi_service.delete_poi(repo, poi_id, user_id)
if not deleted:
error_output("POI not found", cli_ctx.json_output)
return
data = {"success": True}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("calculate")
@click.argument("poi_id", type=int)
@click.option("--travel-modes", required=True, help="Comma-separated: WALK,BICYCLE,TRANSIT")
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def calculate_distances(ctx: click.Context, poi_id: int, travel_modes: str, listing_type: str) -> None:
"""Trigger distance calculation for a POI."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
modes = [m.strip().upper() for m in travel_modes.split(",")]
lt = ListingType[listing_type]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/poi/{poi_id}/calculate",
headers=get_http_headers(cli_ctx.user_email),
json={"travel_modes": modes, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
result = poi_service.trigger_calculation(
poi_id=poi_id,
travel_modes=modes,
listing_type=lt,
user_email=cli_ctx.user_email,
)
if result.task_id:
task_service.add_task_for_user(cli_ctx.user_email, result.task_id)
data = {"task_id": result.task_id or "", "message": result.message}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@pois_group.command("distances")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def get_distances(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Get POI distances for a specific listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/poi/distances",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_id": listing_id, "listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
user_id = resolve_user_id(cli_ctx.user_email)
repo = POIRepository(engine)
distances = poi_service.get_distances_for_listing(repo, listing_id, lt, user_id)
data = [
{
"poi_id": d.poi_id,
"travel_mode": d.travel_mode,
"duration_seconds": d.duration_seconds,
"distance_meters": d.distance_meters,
}
for d in distances
]
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
Step 2: Register in main.py
Add import from cli.pois import pois_group and debug.add_command(pois_group).
Step 3: Add tests
class TestPOIsCommand:
"""Tests for debug pois subcommand."""
@patch("cli.pois.poi_service.get_user_pois")
@patch("cli.pois.resolve_user_id", return_value=1)
@patch("main.engine", new_callable=MagicMock)
def test_list_pois_empty(
self, mock_engine: MagicMock, mock_resolve: MagicMock, mock_pois: MagicMock
) -> None:
mock_pois.return_value = []
runner = CliRunner()
result = runner.invoke(
cli, ["debug", "-u", "test@example.com", "pois", "list"]
)
assert result.exit_code == 0
assert "No results" in result.output
Step 4: Run tests and commit
pytest tests/unit/test_debug_cli.py -v
git add cli/pois.py main.py tests/unit/test_debug_cli.py
git commit -m "Add debug CLI POI subcommand (list, create, update, delete, calculate, distances)"
Task 6: Create cli/tasks.py
Files:
- Create:
cli/tasks.py
Step 1: Create cli/tasks.py
"""Debug CLI — task management commands."""
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output
from services import task_service
@click.group("tasks")
def tasks_group() -> None:
"""Background task management commands."""
pass
@tasks_group.command("status")
@click.argument("task_id")
@click.pass_context
def task_status(ctx: click.Context, task_id: str) -> None:
"""Get the status of a background task."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/task_status",
headers=get_http_headers(cli_ctx.user_email),
params={"task_id": task_id},
)
resp.raise_for_status()
data = resp.json()
else:
status = task_service.get_task_status(task_id)
data = {
"task_id": status.task_id,
"status": status.status,
"progress": status.progress,
"processed": status.processed,
"total": status.total,
"message": status.message,
"error": status.error,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("list")
@click.pass_context
def list_tasks(ctx: click.Context) -> None:
"""List all task IDs for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/tasks_for_user",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
data = task_service.get_user_tasks(cli_ctx.user_email)
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("cancel")
@click.argument("task_id")
@click.pass_context
def cancel_task(ctx: click.Context, task_id: str) -> None:
"""Cancel a running task."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/cancel_task",
headers=get_http_headers(cli_ctx.user_email),
params={"task_id": task_id},
)
resp.raise_for_status()
data = resp.json()
else:
task_service.cancel_task(task_id, user_email=cli_ctx.user_email)
data = {"success": True, "message": "Task cancelled"}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@tasks_group.command("clear")
@click.pass_context
def clear_tasks(ctx: click.Context) -> None:
"""Clear all tasks for the user."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/clear_all_tasks",
headers=get_http_headers(cli_ctx.user_email),
)
resp.raise_for_status()
data = resp.json()
else:
count = task_service.clear_all_tasks(cli_ctx.user_email)
data = {"success": True, "count": count, "message": f"Cleared {count} tasks"}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
Step 2: Register in main.py
Add import from cli.tasks import tasks_group and debug.add_command(tasks_group).
Step 3: Commit
git add cli/tasks.py main.py
git commit -m "Add debug CLI tasks subcommand (status, list, cancel, clear)"
Task 7: Create cli/listings.py
Files:
- Create:
cli/listings.py
Step 1: Create cli/listings.py
"""Debug CLI — listing commands."""
import asyncio
import json
import click
import httpx
from cli._context import CliContext, get_http_headers, output, error_output, resolve_user_id
from database import engine
from models.listing import ListingType, QueryParameters
from repositories.listing_repository import ListingRepository
from services import listing_service, export_service, task_service
@click.group("listings")
def listings_group() -> None:
"""Listing browsing and management commands."""
pass
@listings_group.command("list")
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.option("--limit", default=10, type=int, help="Max listings to return")
@click.pass_context
def list_listings(ctx: click.Context, listing_type: str, limit: int) -> None:
"""List listings from the database."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/listing",
headers=get_http_headers(cli_ctx.user_email),
params={"limit": limit},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
result = asyncio.run(listing_service.get_listings(repository, limit=limit))
data = {
"total_count": result.total_count,
"listings": [
{
"id": l.id,
"price": l.price,
"bedrooms": l.number_of_bedrooms,
"sqm": l.square_meters,
"url": l.url,
}
for l in result.listings
],
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("detail")
@click.argument("listing_id", type=int)
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.pass_context
def listing_detail(ctx: click.Context, listing_id: int, listing_type: str) -> None:
"""Get detailed information for a single listing."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
try:
if cli_ctx.use_http:
resp = httpx.get(
f"{cli_ctx.api_base_url}/api/listing/{listing_id}/detail",
headers=get_http_headers(cli_ctx.user_email),
params={"listing_type": listing_type},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
lt = ListingType(listing_type)
listings = asyncio.run(repository.get_listings(only_ids=[listing_id], listing_type=lt))
if not listings:
error_output("Listing not found", cli_ctx.json_output)
return
listing = listings[0]
data = {
"id": listing.id,
"price": listing.price,
"bedrooms": listing.number_of_bedrooms,
"sqm": listing.square_meters,
"agency": listing.agency,
"url": listing.url,
"council_tax_band": listing.council_tax_band,
}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("stream")
@click.option("--type", "-t", "listing_type", default="RENT", type=click.Choice(["RENT", "BUY"]))
@click.option("--batch-size", default=50, type=int)
@click.option("--limit", default=None, type=int)
@click.option("--min-bedrooms", default=1, type=int)
@click.option("--max-bedrooms", default=999, type=int)
@click.option("--min-price", default=0, type=int)
@click.option("--max-price", default=10_000_000, type=int)
@click.pass_context
def stream_listings(
ctx: click.Context,
listing_type: str,
batch_size: int,
limit: int | None,
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
) -> None:
"""Stream listings as NDJSON (mirrors the streaming API endpoint)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
qp = QueryParameters(
listing_type=lt,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
)
try:
if cli_ctx.use_http:
params: dict = {
"listing_type": listing_type,
"batch_size": batch_size,
"min_bedrooms": min_bedrooms,
"max_bedrooms": max_bedrooms,
"min_price": min_price,
"max_price": max_price,
}
if limit is not None:
params["limit"] = limit
with httpx.stream(
"GET",
f"{cli_ctx.api_base_url}/api/listing_geojson/stream",
headers=get_http_headers(cli_ctx.user_email),
params=params,
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if line.strip():
print(line)
else:
result = asyncio.run(
export_service.export_to_geojson(repository=ListingRepository(engine=engine), query_parameters=qp, limit=limit)
)
if cli_ctx.json_output:
output(result.data, json_mode=True)
else:
features = result.data.get("features", []) if result.data else []
print(f"Streamed {len(features)} features")
for f in features[:5]:
props = f.get("properties", {})
print(f" [{props.get('id')}] {props.get('displayAddress', 'N/A')} - £{props.get('price', '?')}")
if len(features) > 5:
print(f" ... and {len(features) - 5} more")
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
@listings_group.command("refresh")
@click.option("--type", "-t", "listing_type", required=True, type=click.Choice(["RENT", "BUY"]))
@click.option("--min-bedrooms", default=1, type=int)
@click.option("--max-bedrooms", default=10, type=int)
@click.option("--min-price", default=0, type=int)
@click.option("--max-price", default=999_999, type=int)
@click.pass_context
def refresh_listings(
ctx: click.Context,
listing_type: str,
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
) -> None:
"""Trigger a listing refresh (scrape from Rightmove)."""
cli_ctx: CliContext = ctx.obj["cli_ctx"]
lt = ListingType[listing_type]
qp = QueryParameters(
listing_type=lt,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
)
try:
if cli_ctx.use_http:
resp = httpx.post(
f"{cli_ctx.api_base_url}/api/refresh_listings",
headers=get_http_headers(cli_ctx.user_email),
params={
"listing_type": listing_type,
"min_bedrooms": min_bedrooms,
"max_bedrooms": max_bedrooms,
"min_price": min_price,
"max_price": max_price,
},
)
resp.raise_for_status()
data = resp.json()
else:
repository = ListingRepository(engine=engine)
result = asyncio.run(
listing_service.refresh_listings(
repository, qp, async_mode=False,
)
)
data = {"message": result.message, "new_count": result.new_listings_count}
output(data, cli_ctx.json_output)
except httpx.HTTPStatusError as e:
error_output(f"HTTP {e.response.status_code}: {e.response.text}", cli_ctx.json_output)
except Exception as e:
error_output(str(e), cli_ctx.json_output)
Step 2: Register in main.py
Add import from cli.listings import listings_group and debug.add_command(listings_group).
Step 3: Commit
git add cli/listings.py main.py
git commit -m "Add debug CLI listings subcommand (list, detail, stream, refresh)"
Task 8: Final wiring, run full test suite, commit
Step 1: Verify main.py imports are complete
The final imports section and registration at the bottom of main.py should look like:
from cli._context import CliContext
from cli.districts import districts_group
from cli.decisions import decisions_group
from cli.pois import pois_group
from cli.tasks import tasks_group
from cli.listings import listings_group
# ... debug group definition ...
debug.add_command(districts_group)
debug.add_command(decisions_group)
debug.add_command(pois_group)
debug.add_command(tasks_group)
debug.add_command(listings_group)
Step 2: Run full test suite
Run: pytest tests/ -v --tb=short
Expected: All existing tests still pass, plus new debug CLI tests
Step 3: Final commit
git add -A
git commit -m "Complete debug CLI with all subcommands"