wrongmove/crawler/main.py

418 lines
11 KiB
Python
Raw Permalink Normal View History

"""CLI entry point for the Real Estate Crawler."""
2025-05-17 21:55:42 +00:00
import asyncio
from datetime import datetime
import os
import pathlib
from typing import Callable, ParamSpec, TypeVar
import click
from models.listing import FurnishType, ListingType, QueryParameters
2025-05-17 20:13:28 +00:00
from data_access import Listing
from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode
from repositories.listing_repository import ListingRepository
from functools import wraps
from database import engine
from services import (
listing_service,
export_service,
district_service,
)
P = ParamSpec("P")
R = TypeVar("R")
def listing_filter_options(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator to add common options for filtering listings."""
@click.option(
"--type",
"-t",
help="Type of listing to scrape",
type=click.Choice(
ListingType.__members__.keys(),
case_sensitive=False,
),
required=True,
)
@click.option(
"--min-bedrooms",
default=1,
help="Minimum number of bedrooms",
type=click.IntRange(min=1),
)
@click.option(
"--max-bedrooms",
2025-06-08 20:58:28 +00:00
default=10,
help="Maximum number of bedrooms",
type=click.IntRange(min=1, max=10),
)
@click.option(
"--min-price",
default=0,
help="Minimum price",
type=click.IntRange(min=0),
)
@click.option(
"--max-price",
2025-06-08 20:58:28 +00:00
default=999_999,
help="Maximum price",
type=click.IntRange(min=0),
)
@click.option(
"--district",
default=None,
help="Districts to scrape",
type=click.Choice(district_service.get_district_names(), case_sensitive=False),
multiple=True,
)
@click.option(
"--furnish-types",
"-f",
help="Furnish types for rented listings",
type=click.Choice(
[furnish_type.name for furnish_type in FurnishType.__members__.values()],
case_sensitive=False,
),
multiple=True,
)
@click.option(
"--available-from",
help="Let date available from",
default=None,
type=click.DateTime(),
)
2025-06-01 15:26:38 +00:00
@click.option(
"--last-seen-days",
help="Last seen (days). If set, only listings that were seen in the last N days will be included.",
default=14,
type=int,
)
@click.option(
"--min-sqm",
help="Minimum square meters for the listing",
default=None,
type=int,
)
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
return func(*args, **kwargs)
return wrapper
def build_query_parameters(
type: str,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
furnish_types: list[str],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
radius: int = 0,
page_size: int = 500,
max_days_since_added: int = 14,
) -> QueryParameters:
"""Build QueryParameters from CLI options."""
return QueryParameters(
listing_type=ListingType[type],
district_names=set(district) if district else None,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=[FurnishType[ft] for ft in furnish_types] if furnish_types else None,
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
radius=radius,
page_size=page_size,
max_days_since_added=max_days_since_added,
)
@click.group()
@click.option(
2025-05-31 23:50:43 +00:00
"--data-dir",
default=pathlib.Path("data/rs/"),
help="Data directory for storing listings",
type=click.Path(
writable=True,
file_okay=False,
dir_okay=True,
resolve_path=True,
),
)
@click.pass_context
def cli(ctx: click.Context, data_dir: str) -> None:
ctx.ensure_object(dict)
ctx.obj["data_dir"] = pathlib.Path(data_dir)
ctx.obj["repository"] = ListingRepository(engine=engine)
@cli.command()
@listing_filter_options
@click.option("--full", is_flag=True, help="Include images and floorplan detection")
@click.pass_context
def dump_listings(
ctx: click.Context,
full: bool,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
2025-06-01 15:26:38 +00:00
last_seen_days: int,
min_sqm: int | None = None,
) -> None:
"""Fetch listings from Rightmove API."""
data_dir: pathlib.Path = ctx.obj["data_dir"]
repository: ListingRepository = ctx.obj["repository"]
query_parameters = build_query_parameters(
type=type,
district=district,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=furnish_types,
available_from=available_from,
2025-06-01 15:26:38 +00:00
last_seen_days=last_seen_days,
min_sqm=min_sqm,
2025-05-31 23:50:43 +00:00
)
click.echo(f"Fetching listings with parameters: {query_parameters}")
result = asyncio.run(
listing_service.refresh_listings(
repository,
query_parameters,
full=full,
async_mode=False,
)
)
click.echo(result.message)
@cli.command()
@click.pass_context
def dump_images(ctx: click.Context) -> None:
"""Download floorplan images for all listings."""
data_dir: pathlib.Path = ctx.obj["data_dir"]
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Downloading images to {data_dir}")
count = asyncio.run(listing_service.download_images(repository, data_dir))
click.echo(f"Processed {count} listings")
@cli.command()
@click.pass_context
def detect_floorplan(ctx: click.Context) -> None:
"""Run OCR on floorplan images to detect square meters."""
repository: ListingRepository = ctx.obj["repository"]
click.echo("Running floorplan detection...")
count = asyncio.run(listing_service.detect_floorplans(repository))
click.echo(f"Processed {count} listings")
@cli.command()
@click.option(
2025-05-31 23:50:43 +00:00
"--destination-address",
"-d",
help="Destination address for routing",
required=True,
type=click.STRING,
)
@click.option(
2025-05-31 23:50:43 +00:00
"--travel-mode",
"-m",
help="Travel mode for routing",
type=click.Choice(TravelMode.__members__.keys(), case_sensitive=False),
required=True,
)
@click.option(
2025-05-31 23:50:43 +00:00
"--limit",
"-l",
help="Limit the number of listings to process",
type=click.IntRange(min=1),
default=1,
)
@click.pass_context
2025-05-31 23:50:43 +00:00
def routing(
ctx: click.Context,
destination_address: str,
travel_mode: str,
limit: int,
) -> None:
"""Calculate transit routes for listings."""
repository: ListingRepository = ctx.obj["repository"]
if os.environ.get(API_KEY_ENVIRONMENT_VARIABLE) is None:
raise click.ClickException(
f"{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set."
2025-05-31 23:50:43 +00:00
)
click.echo(f"Calculating routes to '{destination_address}' for {limit} listings")
count = asyncio.run(
listing_service.calculate_routes(
repository,
destination_address,
travel_mode,
limit=limit,
2025-05-31 23:50:43 +00:00
)
)
click.echo(f"Processed {count} listings")
2025-05-17 20:13:28 +00:00
@cli.command()
@click.option(
2025-05-31 23:50:43 +00:00
"--output-file",
"-O",
help="Path to the output CSV file",
2025-05-17 20:13:28 +00:00
required=True,
type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True),
2025-05-17 20:13:28 +00:00
)
@listing_filter_options
@click.pass_context
def export_csv(
ctx: click.Context,
output_file: str,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
) -> None:
"""Export listings to CSV file."""
repository: ListingRepository = ctx.obj["repository"]
query_parameters = build_query_parameters(
type=type,
district=district,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=furnish_types,
available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
click.echo(f"Exporting to {output_file}")
result = asyncio.run(
export_service.export_to_csv(
repository,
pathlib.Path(output_file),
query_parameters,
)
2025-05-31 23:50:43 +00:00
)
click.echo(result.message)
2025-05-31 23:50:43 +00:00
@cli.command()
@click.option(
2025-05-31 23:50:43 +00:00
"--output-file",
"-O",
help="Path to the output GeoJSON file",
required=True,
type=click.Path(writable=True, file_okay=True, dir_okay=False, resolve_path=True),
)
@listing_filter_options
@click.pass_context
def export_immoweb(
ctx: click.Context,
output_file: str,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
2025-06-01 15:26:38 +00:00
last_seen_days: int,
min_sqm: int | None = None,
) -> None:
"""Export listings to GeoJSON file for map visualization."""
repository: ListingRepository = ctx.obj["repository"]
query_parameters = build_query_parameters(
type=type,
district=district,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=furnish_types,
available_from=available_from,
2025-06-01 15:26:38 +00:00
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
click.echo(f"Exporting to {output_file}")
result = asyncio.run(
export_service.export_to_geojson(
repository,
query_parameters=query_parameters,
output_path=pathlib.Path(output_file),
)
)
click.echo(result.message)
2025-05-17 20:13:28 +00:00
@cli.command()
@click.pass_context
def populate_db(ctx: click.Context) -> None:
"""Populate database from filesystem data (legacy migration)."""
data_dir: pathlib.Path = ctx.obj["data_dir"]
repository: ListingRepository = ctx.obj["repository"]
click.echo(f"Populating database from {data_dir}")
listings = Listing.get_all_listings(
[path for path in data_dir.glob("*/listing.json")]
)
2025-06-08 20:58:28 +00:00
asyncio.run(repository.upsert_listings_legacy(listings))
click.echo(f"Imported {len(listings)} listings")
@cli.command()
def list_districts() -> None:
"""List all available districts."""
districts = district_service.get_all_districts()
click.echo(f"Available districts ({len(districts)}):")
for name in sorted(districts.keys()):
click.echo(f" - {name}")
2025-05-31 23:50:43 +00:00
if __name__ == "__main__":
cli()