wrongmove/main.py

504 lines
14 KiB
Python

"""CLI entry point for the Real Estate Crawler."""
import asyncio
from datetime import datetime
import os
import pathlib
from typing import Callable, ParamSpec, TypeVar
import click
from models.listing import FurnishType, ListingType, QueryParameters
from data_access import Listing
from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode
from repositories.listing_repository import ListingRepository
from functools import wraps
from database import engine
from services import (
listing_service,
export_service,
district_service,
poi_service,
)
from repositories.poi_repository import POIRepository
from repositories.user_repository import UserRepository
P = ParamSpec("P")
R = TypeVar("R")
def build_query_parameters(
type: str,
district: list[str] | tuple[str, ...] | None,
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
furnish_types: list[str] | tuple[str, ...],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
radius: int = 0,
page_size: int = 500,
max_days_since_added: int = 14,
) -> QueryParameters:
"""Build QueryParameters from CLI options."""
return QueryParameters(
listing_type=ListingType[type],
district_names=set(district) if district else set(),
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=[FurnishType[ft] for ft in furnish_types] if furnish_types else None,
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
radius=radius,
page_size=page_size,
max_days_since_added=max_days_since_added,
)
def listing_filter_options(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator that adds common listing filter options and builds QueryParameters.
The wrapped function receives a `query_parameters: QueryParameters` kwarg
instead of individual filter values.
"""
@click.option(
"--type",
"-t",
help="Type of listing to scrape (BUY or RENT)",
type=click.Choice(
ListingType.__members__.keys(),
case_sensitive=False,
),
required=True,
)
@click.option(
"--min-bedrooms",
default=1,
help="Minimum number of bedrooms",
type=click.IntRange(min=1),
)
@click.option(
"--max-bedrooms",
default=10,
help="Maximum number of bedrooms",
type=click.IntRange(min=1, max=10),
)
@click.option(
"--min-price",
default=0,
help="Minimum price in GBP",
type=click.IntRange(min=0),
)
@click.option(
"--max-price",
default=999_999,
help="Maximum price in GBP",
type=click.IntRange(min=0),
)
@click.option(
"--district",
default=None,
help="District to filter by (can be repeated for multiple districts)",
type=click.Choice(district_service.get_district_names(), case_sensitive=False),
multiple=True,
)
@click.option(
"--furnish-types",
"-f",
help="Furnish type filter for rented listings (can be repeated)",
type=click.Choice(
[furnish_type.name for furnish_type in FurnishType.__members__.values()],
case_sensitive=False,
),
multiple=True,
)
@click.option(
"--available-from",
help="Only include listings available from this date (format: YYYY-MM-DD)",
default=None,
type=click.DateTime(),
)
@click.option(
"--last-seen-days",
help="Only include listings seen in the last N days",
default=14,
type=int,
)
@click.option(
"--min-sqm",
help="Minimum square meters for the listing",
default=None,
type=int,
)
@wraps(func)
def wrapper(
*args: P.args,
type: str,
district: tuple[str, ...],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
furnish_types: tuple[str, ...],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None,
**kwargs: P.kwargs,
) -> R:
query_parameters = build_query_parameters(
type=type,
district=district,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=furnish_types,
available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
return func(*args, query_parameters=query_parameters, **kwargs)
return wrapper
@click.group()
@click.option(
"--data-dir",
default=pathlib.Path("data/rs/"),
help="Data directory for storing listings",
type=click.Path(
writable=True,
file_okay=False,
dir_okay=True,
resolve_path=True,
),
)
@click.pass_context
def cli(ctx: click.Context, data_dir: str) -> None:
ctx.ensure_object(dict)
ctx.obj["data_dir"] = pathlib.Path(data_dir)
ctx.obj["repository"] = ListingRepository(engine=engine)
@cli.command()
@listing_filter_options
@click.option(
"--include-processing",
"-p",
is_flag=True,
help="Also download images and run floorplan OCR detection",
)
@click.pass_context
def dump_listings(
ctx: click.Context,
query_parameters: QueryParameters,
include_processing: bool,
) -> None:
"""Fetch listings from Rightmove API."""
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Fetching listings with parameters: {query_parameters}")
result = asyncio.run(
listing_service.refresh_listings(
repository,
query_parameters,
full=include_processing,
async_mode=False,
)
)
click.echo(result.message)
@cli.command()
@click.pass_context
def dump_images(ctx: click.Context) -> None:
"""Download floorplan images for all listings."""
data_dir: pathlib.Path = ctx.obj["data_dir"]
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Downloading images to {data_dir}")
count = asyncio.run(listing_service.download_images(repository, data_dir))
click.echo(f"Processed {count} listings")
@cli.command()
@click.pass_context
def detect_floorplan(ctx: click.Context) -> None:
"""Run OCR on floorplan images to detect square meters."""
repository: ListingRepository = ctx.obj["repository"]
click.echo("Running floorplan detection...")
count = asyncio.run(listing_service.detect_floorplans(repository))
click.echo(f"Processed {count} listings")
@cli.command()
@click.option(
"--destination-address",
"-d",
help="Destination address for routing",
required=True,
type=click.STRING,
)
@click.option(
"--travel-mode",
"-m",
help="Travel mode for routing (e.g. transit, driving, walking, bicycling)",
type=click.Choice(TravelMode.__members__.keys(), case_sensitive=False),
required=True,
)
@click.option(
"--limit",
"-l",
help="Maximum number of listings to calculate routes for",
type=click.IntRange(min=1),
default=1,
)
@click.pass_context
def routing(
ctx: click.Context,
destination_address: str,
travel_mode: str,
limit: int,
) -> None:
"""Calculate transit routes for listings."""
repository: ListingRepository = ctx.obj["repository"]
if os.environ.get(API_KEY_ENVIRONMENT_VARIABLE) is None:
raise click.ClickException(
f"{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set."
)
click.echo(f"Calculating routes to '{destination_address}' for {limit} listings")
count = asyncio.run(
listing_service.calculate_routes(
repository,
destination_address,
travel_mode,
limit=limit,
)
)
click.echo(f"Processed {count} listings")
@cli.command()
@click.option(
"--output-file",
"-O",
help="Path to the output CSV file",
required=True,
type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True),
)
@listing_filter_options
@click.pass_context
def export_csv(
ctx: click.Context,
output_file: str,
query_parameters: QueryParameters,
) -> None:
"""Export listings to CSV file."""
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Exporting to {output_file}")
result = asyncio.run(
export_service.export_to_csv(
repository,
pathlib.Path(output_file),
query_parameters,
)
)
click.echo(result.message)
@cli.command()
@click.option(
"--output-file",
"-O",
help="Path to the output GeoJSON file",
required=True,
type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True),
)
@listing_filter_options
@click.pass_context
def export_immoweb(
ctx: click.Context,
output_file: str,
query_parameters: QueryParameters,
) -> None:
"""Export listings to GeoJSON file for map visualization."""
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Exporting to {output_file}")
result = asyncio.run(
export_service.export_to_geojson(
repository,
query_parameters=query_parameters,
output_path=pathlib.Path(output_file),
)
)
click.echo(result.message)
@cli.command()
@click.pass_context
def populate_db(ctx: click.Context) -> None:
"""Populate database from filesystem data (legacy migration)."""
data_dir: pathlib.Path = ctx.obj["data_dir"]
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Populating database from {data_dir}")
listings = Listing.get_all_listings(
[path for path in data_dir.glob("*/listing.json")]
)
asyncio.run(repository.upsert_listings_legacy(listings))
click.echo(f"Imported {len(listings)} listings")
@cli.command()
def list_districts() -> None:
"""List all available districts."""
districts = district_service.get_all_districts()
click.echo(f"Available districts ({len(districts)}):")
for name in sorted(districts.keys()):
click.echo(f" - {name}")
@cli.command()
@click.option("--name", required=True, help="Name for the POI (e.g., 'Office')")
@click.option("--address", required=True, help="Human-readable address")
@click.option("--lat", required=True, type=float, help="Latitude")
@click.option("--lon", required=True, type=float, help="Longitude")
@click.option("--user-email", required=True, help="User email to associate POI with")
def add_poi(name: str, address: str, lat: float, lon: float, user_email: str) -> None:
"""Create a Point of Interest."""
user_repo = UserRepository(engine)
db_user = user_repo.get_user_by_email(user_email)
if db_user is None:
db_user = user_repo.create_user(user_email)
poi_repo = POIRepository(engine)
result = poi_service.create_poi(
poi_repo,
user_id=db_user.id, # type: ignore[arg-type]
name=name,
address=address,
latitude=lat,
longitude=lon,
)
click.echo(f"Created POI: {result.poi.name} (id={result.poi.id})")
@cli.command()
@click.option("--user-email", required=True, help="User email to list POIs for")
def list_pois(user_email: str) -> None:
"""List POIs for a user."""
user_repo = UserRepository(engine)
db_user = user_repo.get_user_by_email(user_email)
if db_user is None:
click.echo(f"User '{user_email}' not found")
return
poi_repo = POIRepository(engine)
pois = poi_service.get_user_pois(poi_repo, db_user.id) # type: ignore[arg-type]
if not pois:
click.echo("No POIs found")
return
for p in pois:
click.echo(f" [{p.id}] {p.name} - {p.address} ({p.latitude}, {p.longitude})")
@cli.command()
@click.option("--poi-id", required=True, type=int, help="POI ID to calculate distances for")
@click.option(
"--travel-modes",
required=True,
help="Comma-separated travel modes (WALK, BICYCLE, TRANSIT)",
)
@click.option(
"--listing-type",
required=True,
type=click.Choice(ListingType.__members__.keys(), case_sensitive=False),
help="Listing type (BUY or RENT)",
)
def calculate_poi(poi_id: int, travel_modes: str, listing_type: str) -> None:
"""Calculate distances from listings to a POI."""
from services.poi_distance_calculator import calculate_poi_distances as calc
from config.routing_config import RoutingConfig
poi_repo = POIRepository(engine)
poi = poi_repo.get_poi_by_id(poi_id)
if poi is None:
click.echo(f"POI {poi_id} not found")
return
listing_repo = ListingRepository(engine=engine)
lt = ListingType[listing_type]
modes = [m.strip().upper() for m in travel_modes.split(",")]
click.echo(f"Calculating {modes} distances for POI '{poi.name}' ({lt.value} listings)...")
total = asyncio.run(calc(
listing_repo=listing_repo,
poi_repo=poi_repo,
poi=poi,
travel_modes=modes,
listing_type=lt,
config=RoutingConfig.from_env(),
))
click.echo(f"Computed {total} distances")
from cli._context import CliContext
from cli.districts import districts_group
from cli.decisions import decisions_group
from cli.pois import pois_group
from cli.tasks import tasks_group
@cli.group("debug")
@click.option("--user-email", "-u", required=True, help="Email of user to impersonate")
@click.option("--http", "use_http", is_flag=True, default=False, help="Use HTTP requests instead of direct service calls")
@click.option("--json", "json_output", is_flag=True, default=False, help="Output in JSON format")
@click.option("--api-url", default="http://localhost:8000", help="API base URL for HTTP mode")
@click.pass_context
def debug(ctx: click.Context, user_email: str, use_http: bool, json_output: bool, api_url: str) -> None:
"""Debug CLI — mirrors web UI interactions with superuser access."""
ctx.ensure_object(dict)
ctx.obj["cli_ctx"] = CliContext(
user_email=user_email,
use_http=use_http,
json_output=json_output,
api_base_url=api_url,
)
debug.add_command(districts_group)
debug.add_command(decisions_group)
debug.add_command(pois_group)
debug.add_command(tasks_group)
if __name__ == "__main__":
cli()