"""CLI entry point for the Real Estate Crawler.""" import asyncio from datetime import datetime import os import pathlib from typing import Callable, ParamSpec, TypeVar import click from models.listing import FurnishType, ListingType, QueryParameters from data_access import Listing from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode from repositories.listing_repository import ListingRepository from functools import wraps from database import engine from services import ( listing_service, export_service, district_service, ) P = ParamSpec("P") R = TypeVar("R") def build_query_parameters( type: str, district: list[str] | tuple[str, ...] | None, min_bedrooms: int, max_bedrooms: int, min_price: int, max_price: int, furnish_types: list[str] | tuple[str, ...], available_from: datetime | None, last_seen_days: int, min_sqm: int | None = None, radius: int = 0, page_size: int = 500, max_days_since_added: int = 14, ) -> QueryParameters: """Build QueryParameters from CLI options.""" return QueryParameters( listing_type=ListingType[type], district_names=set(district) if district else set(), min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, furnish_types=[FurnishType[ft] for ft in furnish_types] if furnish_types else None, let_date_available_from=available_from, last_seen_days=last_seen_days, min_sqm=min_sqm, radius=radius, page_size=page_size, max_days_since_added=max_days_since_added, ) def listing_filter_options(func: Callable[P, R]) -> Callable[P, R]: """Decorator that adds common listing filter options and builds QueryParameters. The wrapped function receives a `query_parameters: QueryParameters` kwarg instead of individual filter values. """ @click.option( "--type", "-t", help="Type of listing to scrape (BUY or RENT)", type=click.Choice( ListingType.__members__.keys(), case_sensitive=False, ), required=True, ) @click.option( "--min-bedrooms", default=1, help="Minimum number of bedrooms", type=click.IntRange(min=1), ) @click.option( "--max-bedrooms", default=10, help="Maximum number of bedrooms", type=click.IntRange(min=1, max=10), ) @click.option( "--min-price", default=0, help="Minimum price in GBP", type=click.IntRange(min=0), ) @click.option( "--max-price", default=999_999, help="Maximum price in GBP", type=click.IntRange(min=0), ) @click.option( "--district", default=None, help="District to filter by (can be repeated for multiple districts)", type=click.Choice(district_service.get_district_names(), case_sensitive=False), multiple=True, ) @click.option( "--furnish-types", "-f", help="Furnish type filter for rented listings (can be repeated)", type=click.Choice( [furnish_type.name for furnish_type in FurnishType.__members__.values()], case_sensitive=False, ), multiple=True, ) @click.option( "--available-from", help="Only include listings available from this date (format: YYYY-MM-DD)", default=None, type=click.DateTime(), ) @click.option( "--last-seen-days", help="Only include listings seen in the last N days", default=14, type=int, ) @click.option( "--min-sqm", help="Minimum square meters for the listing", default=None, type=int, ) @wraps(func) def wrapper( *args: P.args, type: str, district: tuple[str, ...], min_bedrooms: int, max_bedrooms: int, min_price: int, max_price: int, furnish_types: tuple[str, ...], available_from: datetime | None, last_seen_days: int, min_sqm: int | None, **kwargs: P.kwargs, ) -> R: query_parameters = build_query_parameters( type=type, district=district, min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, furnish_types=furnish_types, available_from=available_from, last_seen_days=last_seen_days, min_sqm=min_sqm, ) return func(*args, query_parameters=query_parameters, **kwargs) return wrapper @click.group() @click.option( "--data-dir", default=pathlib.Path("data/rs/"), help="Data directory for storing listings", type=click.Path( writable=True, file_okay=False, dir_okay=True, resolve_path=True, ), ) @click.pass_context def cli(ctx: click.Context, data_dir: str) -> None: ctx.ensure_object(dict) ctx.obj["data_dir"] = pathlib.Path(data_dir) ctx.obj["repository"] = ListingRepository(engine=engine) @cli.command() @listing_filter_options @click.option( "--include-processing", "-p", is_flag=True, help="Also download images and run floorplan OCR detection", ) @click.pass_context def dump_listings( ctx: click.Context, query_parameters: QueryParameters, include_processing: bool, ) -> None: """Fetch listings from Rightmove API.""" repository: ListingRepository = ctx.obj["repository"] click.echo(f"Fetching listings with parameters: {query_parameters}") result = asyncio.run( listing_service.refresh_listings( repository, query_parameters, full=include_processing, async_mode=False, ) ) click.echo(result.message) @cli.command() @click.pass_context def dump_images(ctx: click.Context) -> None: """Download floorplan images for all listings.""" data_dir: pathlib.Path = ctx.obj["data_dir"] repository: ListingRepository = ctx.obj["repository"] click.echo(f"Downloading images to {data_dir}") count = asyncio.run(listing_service.download_images(repository, data_dir)) click.echo(f"Processed {count} listings") @cli.command() @click.pass_context def detect_floorplan(ctx: click.Context) -> None: """Run OCR on floorplan images to detect square meters.""" repository: ListingRepository = ctx.obj["repository"] click.echo("Running floorplan detection...") count = asyncio.run(listing_service.detect_floorplans(repository)) click.echo(f"Processed {count} listings") @cli.command() @click.option( "--destination-address", "-d", help="Destination address for routing", required=True, type=click.STRING, ) @click.option( "--travel-mode", "-m", help="Travel mode for routing (e.g. transit, driving, walking, bicycling)", type=click.Choice(TravelMode.__members__.keys(), case_sensitive=False), required=True, ) @click.option( "--limit", "-l", help="Maximum number of listings to calculate routes for", type=click.IntRange(min=1), default=1, ) @click.pass_context def routing( ctx: click.Context, destination_address: str, travel_mode: str, limit: int, ) -> None: """Calculate transit routes for listings.""" repository: ListingRepository = ctx.obj["repository"] if os.environ.get(API_KEY_ENVIRONMENT_VARIABLE) is None: raise click.ClickException( f"{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set." ) click.echo(f"Calculating routes to '{destination_address}' for {limit} listings") count = asyncio.run( listing_service.calculate_routes( repository, destination_address, travel_mode, limit=limit, ) ) click.echo(f"Processed {count} listings") @cli.command() @click.option( "--output-file", "-O", help="Path to the output CSV file", required=True, type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True), ) @listing_filter_options @click.pass_context def export_csv( ctx: click.Context, output_file: str, query_parameters: QueryParameters, ) -> None: """Export listings to CSV file.""" repository: ListingRepository = ctx.obj["repository"] click.echo(f"Exporting to {output_file}") result = asyncio.run( export_service.export_to_csv( repository, pathlib.Path(output_file), query_parameters, ) ) click.echo(result.message) @cli.command() @click.option( "--output-file", "-O", help="Path to the output GeoJSON file", required=True, type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True), ) @listing_filter_options @click.pass_context def export_immoweb( ctx: click.Context, output_file: str, query_parameters: QueryParameters, ) -> None: """Export listings to GeoJSON file for map visualization.""" repository: ListingRepository = ctx.obj["repository"] click.echo(f"Exporting to {output_file}") result = asyncio.run( export_service.export_to_geojson( repository, query_parameters=query_parameters, output_path=pathlib.Path(output_file), ) ) click.echo(result.message) @cli.command() @click.pass_context def populate_db(ctx: click.Context) -> None: """Populate database from filesystem data (legacy migration).""" data_dir: pathlib.Path = ctx.obj["data_dir"] repository: ListingRepository = ctx.obj["repository"] click.echo(f"Populating database from {data_dir}") listings = Listing.get_all_listings( [path for path in data_dir.glob("*/listing.json")] ) asyncio.run(repository.upsert_listings_legacy(listings)) click.echo(f"Imported {len(listings)} listings") @cli.command() def list_districts() -> None: """List all available districts.""" districts = district_service.get_all_districts() click.echo(f"Available districts ({len(districts)}):") for name in sorted(districts.keys()): click.echo(f" - {name}") if __name__ == "__main__": cli()