import asyncio from datetime import datetime import json import os import pathlib import click import importlib from rec.districts import get_districts from data_access import Listing import csv_exporter from rec.query import ListingType, FurnishType, QueryParameters from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode from repositories.listing_repository import ListingRepository from ui_exporter import export_immoweb as export_immoweb_ui from functools import wraps from database import engine dump_listings_module = importlib.import_module("1_dump_listings") dump_images_module = importlib.import_module("3_dump_images") detect_floorplan_module = importlib.import_module("4_detect_floorplan") routing_module = importlib.import_module("5_routing") def listing_filter_options(func): """Decorator to add common options for filtering listings.""" @click.option( "--type", "-t", help="Type of listing to scrape", type=click.Choice( ListingType.__members__.keys(), case_sensitive=False, ), required=True, ) @click.option( "--min-bedrooms", default=1, help="Minimum number of bedrooms", type=click.IntRange(min=1), ) @click.option( "--max-bedrooms", default=5, help="Maximum number of bedrooms", type=click.IntRange(min=1), ) @click.option( "--min-price", default=0, help="Minimum price", type=click.IntRange(min=0), ) @click.option( "--max-price", default=1000000, help="Maximum price", type=click.IntRange(min=0), ) @click.option( "--district", default=None, help="Districts to scrape", type=click.Choice(get_districts().keys(), case_sensitive=False), multiple=True, ) @click.option( "--furnish-types", "-f", help="Furnish types for rented listings", type=click.Choice( [furnish_type.name for furnish_type in FurnishType.__members__.values()], case_sensitive=False, ), multiple=True, ) @click.option( "--available-from", help="Let date available from", default=None, type=click.DateTime(), ) @click.option( "--last-seen-days", help="Last seen (days). If set, only listings that were seen in the last N days will be included.", default=14, type=int, ) @click.option( "--min-sqm", help="Minimum square meters for the listing", default=None, type=int, ) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper @click.group() @click.option( "--data-dir", default=pathlib.Path("data/rs/"), help="Districts to scrape", type=click.Path( writable=True, file_okay=False, dir_okay=True, resolve_path=True, ), ) @click.pass_context def cli(ctx, data_dir: str): ctx.ensure_object(dict) ctx.obj["data_dir"] = data_dir @cli.command() @listing_filter_options @click.pass_context def dump_listings( ctx: click.core.Context, district: list[str], min_bedrooms: int, max_bedrooms: int, min_price: int, max_price: int, type: str, furnish_types: list[str], available_from: datetime | None, last_seen_days: int, min_sqm: int | None = None, ): data_dir: str = ctx.obj["data_dir"] query_parameters = QueryParameters( listing_type=ListingType[type], district_names=set(district), min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types], let_date_available_from=available_from, last_seen_days=last_seen_days, min_sqm=min_sqm, ) click.echo( f"Running dump_listings for districts {district}, data dir {data_dir} and parameters: " f"{query_parameters}" ) data_dir_path = pathlib.Path(data_dir) repository = ListingRepository(engine=engine) asyncio.run( dump_listings_module.dump_listings(query_parameters, repository, data_dir_path) ) @cli.command() @click.pass_context def dump_images(ctx: click.core.Context): data_dir = ctx.obj["data_dir"] click.echo(f"Running dump_images for listings stored in {engine.url}") repository = ListingRepository(engine=engine) asyncio.run(dump_images_module.dump_images(repository, image_base_path=data_dir)) @cli.command() @click.pass_context def detect_floorplan(ctx: click.core.Context): data_dir = ctx.obj["data_dir"] click.echo(f"Running detect_floorplan for listings stored in {engine.url}") repository = ListingRepository(engine=engine) asyncio.run(detect_floorplan_module.detect_floorplan(repository)) @cli.command() @click.option( "--destination-address", "-d", help="Destination address for routing", required=True, type=click.STRING, ) @click.option( "--travel-mode", "-m", help="Travel mode for routing", type=click.Choice( TravelMode.__members__.keys(), case_sensitive=False, ), required=True, ) @click.option( "--limit", "-l", help="Limit the number of listings to process", type=click.IntRange(min=1), default=1, # by default limit to 1 to avoid accidental API usage ) @click.pass_context def routing( ctx: click.core.Context, destination_address: str, travel_mode: str, limit: int ): data_dir = ctx.obj["data_dir"] click.echo(f"Running routing for the first {limit} listings in {data_dir}") listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json"))) listing_paths = listing_paths[:limit] if os.environ.get(API_KEY_ENVIRONMENT_VARIABLE) is None: raise click.exceptions.MissingParameter( f"{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set. " "Please set it to your API key for the routing service." ) repository = ListingRepository(engine=engine) asyncio.run( routing_module.calculate_route( repository, destination_address, # destination_address_coordinates, TravelMode[travel_mode], limit=limit, ) ) @cli.command() @click.option( "--columns", "-C", help="Columns to include in the CSV file", type=click.Choice( Listing.ALL_COLUMNS, case_sensitive=False, ), multiple=True, default=Listing.ALL_COLUMNS, ) @click.option( "--output-file", "-O", help="Path to the output CSV file", required=True, type=click.Path( writable=True, file_okay=True, dir_okay=False, resolve_path=True, ), ) @click.pass_context @listing_filter_options def export_csv( ctx: click.core.Context, output_file: str, columns: tuple[str], district: list[str], min_bedrooms: int, max_bedrooms: int, min_price: int, max_price: int, type: str, furnish_types: list[str], available_from: datetime | None, last_seen_days: int, min_sqm: int | None = None, ): data_dir = ctx.obj["data_dir"] query_parameters = QueryParameters( listing_type=ListingType[type], district_names=set(district), min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types], let_date_available_from=available_from, last_seen_days=last_seen_days, min_sqm=min_sqm, ) click.echo( f"Exporting data to {output_file} using {data_dir=} and query parameters: {query_parameters}" ) output_file_path = pathlib.Path(output_file) listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json"))) listings = Listing.get_all_listings([path for path in listing_paths]) asyncio.run( csv_exporter.export_to_csv( listings, output_file_path, list(columns), query_parameters=query_parameters, ), ) @cli.command() @click.option( "--output-file", "-O", help="Path to the output immoweb file", required=True, type=click.Path( writable=True, file_okay=True, dir_okay=False, resolve_path=True, ), ) @listing_filter_options @click.pass_context def export_immoweb( ctx: click.core.Context, output_file: str, district: list[str], min_bedrooms: int, max_bedrooms: int, min_price: int, max_price: int, type: str, furnish_types: list[str], available_from: datetime | None, last_seen_days: int, min_sqm: int | None = None, ): query_parameters = QueryParameters( listing_type=ListingType[type], district_names=set(district), min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types], let_date_available_from=available_from, last_seen_days=last_seen_days, min_sqm=min_sqm, ) click.echo( f"Exporting data to {output_file} that matches the query parameters: {query_parameters}" ) asyncio.run(export_immoweb_ui(ctx, output_file, query_parameters)) @cli.command() @click.pass_context def populate_db( ctx: click.core.Context, ): data_dir = ctx.obj["data_dir"] click.echo(f"Populating the database with data from {data_dir}") repository = ListingRepository(engine=engine) listings = Listing.get_all_listings( [path for path in pathlib.Path(data_dir).glob("*/listing.json")] ) asyncio.run(repository.upsert_listings(listings)) if __name__ == "__main__": cli()