wrongmove/crawler/main.py
2025-06-08 18:18:38 +00:00

366 lines
9.9 KiB
Python

import asyncio
from datetime import datetime
import json
import os
import pathlib
import click
import importlib
from models.listing import FurnishType, ListingType, QueryParameters
from rec.districts import get_districts
from data_access import Listing
import csv_exporter
from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode
from repositories.listing_repository import ListingRepository
from ui_exporter import export_immoweb as export_immoweb_ui
from functools import wraps
from database import engine
dump_listings_module = importlib.import_module("1_dump_listings")
dump_images_module = importlib.import_module("3_dump_images")
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
routing_module = importlib.import_module("5_routing")
def listing_filter_options(func):
"""Decorator to add common options for filtering listings."""
@click.option(
"--type",
"-t",
help="Type of listing to scrape",
type=click.Choice(
ListingType.__members__.keys(),
case_sensitive=False,
),
required=True,
)
@click.option(
"--min-bedrooms",
default=1,
help="Minimum number of bedrooms",
type=click.IntRange(min=1),
)
@click.option(
"--max-bedrooms",
default=5,
help="Maximum number of bedrooms",
type=click.IntRange(min=1),
)
@click.option(
"--min-price",
default=0,
help="Minimum price",
type=click.IntRange(min=0),
)
@click.option(
"--max-price",
default=1000000,
help="Maximum price",
type=click.IntRange(min=0),
)
@click.option(
"--district",
default=None,
help="Districts to scrape",
type=click.Choice(get_districts().keys(), case_sensitive=False),
multiple=True,
)
@click.option(
"--furnish-types",
"-f",
help="Furnish types for rented listings",
type=click.Choice(
[furnish_type.name for furnish_type in FurnishType.__members__.values()],
case_sensitive=False,
),
multiple=True,
)
@click.option(
"--available-from",
help="Let date available from",
default=None,
type=click.DateTime(),
)
@click.option(
"--last-seen-days",
help="Last seen (days). If set, only listings that were seen in the last N days will be included.",
default=14,
type=int,
)
@click.option(
"--min-sqm",
help="Minimum square meters for the listing",
default=None,
type=int,
)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@click.group()
@click.option(
"--data-dir",
default=pathlib.Path("data/rs/"),
help="Districts to scrape",
type=click.Path(
writable=True,
file_okay=False,
dir_okay=True,
resolve_path=True,
),
)
@click.pass_context
def cli(ctx, data_dir: str):
ctx.ensure_object(dict)
ctx.obj["data_dir"] = data_dir
@cli.command()
@listing_filter_options
@click.pass_context
def dump_listings(
ctx: click.core.Context,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
):
data_dir: str = ctx.obj["data_dir"]
query_parameters = QueryParameters(
listing_type=ListingType[type],
district_names=set(district),
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types],
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
click.echo(
f"Running dump_listings for districts {district}, data dir {data_dir} and parameters: "
f"{query_parameters}"
)
data_dir_path = pathlib.Path(data_dir)
repository = ListingRepository(engine=engine)
asyncio.run(
dump_listings_module.dump_listings(query_parameters, repository, data_dir_path)
)
@cli.command()
@click.pass_context
def dump_images(ctx: click.core.Context):
data_dir = ctx.obj["data_dir"]
click.echo(f"Running dump_images for listings stored in {engine.url}")
repository = ListingRepository(engine=engine)
asyncio.run(dump_images_module.dump_images(repository, image_base_path=data_dir))
@cli.command()
@click.pass_context
def detect_floorplan(ctx: click.core.Context):
data_dir = ctx.obj["data_dir"]
click.echo(f"Running detect_floorplan for listings stored in {engine.url}")
repository = ListingRepository(engine=engine)
asyncio.run(detect_floorplan_module.detect_floorplan(repository))
@cli.command()
@click.option(
"--destination-address",
"-d",
help="Destination address for routing",
required=True,
type=click.STRING,
)
@click.option(
"--travel-mode",
"-m",
help="Travel mode for routing",
type=click.Choice(
TravelMode.__members__.keys(),
case_sensitive=False,
),
required=True,
)
@click.option(
"--limit",
"-l",
help="Limit the number of listings to process",
type=click.IntRange(min=1),
default=1, # by default limit to 1 to avoid accidental API usage
)
@click.pass_context
def routing(
ctx: click.core.Context, destination_address: str, travel_mode: str, limit: int
):
data_dir = ctx.obj["data_dir"]
click.echo(f"Running routing for the first {limit} listings in {data_dir}")
listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json")))
listing_paths = listing_paths[:limit]
if os.environ.get(API_KEY_ENVIRONMENT_VARIABLE) is None:
raise click.exceptions.MissingParameter(
f"{API_KEY_ENVIRONMENT_VARIABLE} environment variable is not set. "
"Please set it to your API key for the routing service."
)
repository = ListingRepository(engine=engine)
asyncio.run(
routing_module.calculate_route(
repository,
destination_address,
# destination_address_coordinates,
TravelMode[travel_mode],
limit=limit,
)
)
@cli.command()
# @click.option(
# "--columns",
# "-C",
# help="Columns to include in the CSV file",
# type=click.Choice(
# # csv_exporter.get_columns_from_listings(),
# [1],
# case_sensitive=False,
# ),
# multiple=True,
# default=Listing.ALL_COLUMNS,
# )
@click.option(
"--output-file",
"-O",
help="Path to the output CSV file",
required=True,
type=click.Path(
writable=True,
file_okay=True,
dir_okay=False,
resolve_path=True,
),
)
@click.pass_context
@listing_filter_options
def export_csv(
ctx: click.core.Context,
output_file: str,
# columns: tuple[str],
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
):
# use model
data_dir = ctx.obj["data_dir"]
query_parameters = QueryParameters(
listing_type=ListingType[type],
district_names=set(district),
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types],
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
click.echo(
f"Exporting data to {output_file} using {data_dir=} and query parameters: {query_parameters}"
)
output_file_path = pathlib.Path(output_file)
repository = ListingRepository(engine=engine)
asyncio.run(
csv_exporter.export_to_csv(
repository,
output_file_path,
# list(columns),
query_parameters=query_parameters,
),
)
@cli.command()
@click.option(
"--output-file",
"-O",
help="Path to the output immoweb file",
required=True,
type=click.Path(
writable=True,
file_okay=True,
dir_okay=False,
resolve_path=True,
),
)
@listing_filter_options
@click.pass_context
def export_immoweb(
ctx: click.core.Context,
output_file: str,
district: list[str],
min_bedrooms: int,
max_bedrooms: int,
min_price: int,
max_price: int,
type: str,
furnish_types: list[str],
available_from: datetime | None,
last_seen_days: int,
min_sqm: int | None = None,
):
query_parameters = QueryParameters(
listing_type=ListingType[type],
district_names=set(district),
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
furnish_types=[FurnishType[furnish_type] for furnish_type in furnish_types],
let_date_available_from=available_from,
last_seen_days=last_seen_days,
min_sqm=min_sqm,
)
click.echo(
f"Exporting data to {output_file} for listings stored in {engine.url} that match the query parameters: {query_parameters}"
)
repository = ListingRepository(engine=engine)
asyncio.run(export_immoweb_ui(repository, output_file, query_parameters))
@cli.command()
@click.pass_context
def populate_db(
ctx: click.core.Context,
):
data_dir = ctx.obj["data_dir"]
click.echo(f"Populating the database with data from {data_dir}")
repository = ListingRepository(engine=engine)
listings = Listing.get_all_listings(
[path for path in pathlib.Path(data_dir).glob("*/listing.json")]
)
asyncio.run(repository.upsert_listings(listings))
if __name__ == "__main__":
cli()