merge dump listings and dump details commands - fetch both details and listings in the same command
This commit is contained in:
parent
29213f3d26
commit
842f7cefbe
7 changed files with 54 additions and 59 deletions
|
|
@ -1,13 +1,18 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
import pathlib
|
import pathlib
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from rec.query import listing_query, QueryParameters
|
from rec.query import detail_query, listing_query, QueryParameters
|
||||||
from rec.districts import get_districts
|
from rec.districts import get_districts
|
||||||
|
import repositories
|
||||||
|
from sqlalchemy import Engine
|
||||||
|
from tqdm.asyncio import tqdm
|
||||||
from data_access import Listing
|
from data_access import Listing
|
||||||
|
|
||||||
|
|
||||||
async def dump_listings(
|
async def dump_listings(
|
||||||
parameters: QueryParameters,
|
parameters: QueryParameters,
|
||||||
|
db_engine: Engine,
|
||||||
data_dir: pathlib.Path = pathlib.Path("data/rs/"),
|
data_dir: pathlib.Path = pathlib.Path("data/rs/"),
|
||||||
) -> list[Listing]:
|
) -> list[Listing]:
|
||||||
if parameters.district_names:
|
if parameters.district_names:
|
||||||
|
|
@ -19,23 +24,20 @@ async def dump_listings(
|
||||||
else:
|
else:
|
||||||
districts = get_districts()
|
districts = get_districts()
|
||||||
print("Valid districts to scrape:", districts.keys())
|
print("Valid districts to scrape:", districts.keys())
|
||||||
listings = []
|
|
||||||
|
|
||||||
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
|
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
|
||||||
json_responses = await asyncio.gather(
|
json_responses = await tqdm.gather(
|
||||||
*[
|
*[
|
||||||
_dump_listings_with_semaphore(semaphore, i, parameters, locid)
|
_dump_listings_with_semaphore(semaphore, i, parameters, locid)
|
||||||
for locid in districts.values()
|
for locid in districts.values()
|
||||||
for i in [1, 2]
|
for i in [1, 2]
|
||||||
]
|
],
|
||||||
|
desc="Fetching listings",
|
||||||
)
|
)
|
||||||
listings = []
|
listings: list[Listing] = []
|
||||||
for response_json in json_responses:
|
for response_json in json_responses:
|
||||||
if response_json["totalAvailableResults"] == 0:
|
if response_json["totalAvailableResults"] == 0:
|
||||||
print("No results found")
|
|
||||||
continue
|
continue
|
||||||
if response_json["totalAvailableResults"] > 0:
|
|
||||||
print("totalAvailableResults: ", response_json["totalAvailableResults"])
|
|
||||||
for property in response_json["properties"]:
|
for property in response_json["properties"]:
|
||||||
identifier = property["identifier"]
|
identifier = property["identifier"]
|
||||||
|
|
||||||
|
|
@ -43,6 +45,21 @@ async def dump_listings(
|
||||||
listing.dump_listing(property)
|
listing.dump_listing(property)
|
||||||
listings.append(listing)
|
listings.append(listing)
|
||||||
|
|
||||||
|
# if listing is already in db, do not fetch details again
|
||||||
|
repository = repositories.ListingRepository(db_engine)
|
||||||
|
all_listings = await repository.get_listings(
|
||||||
|
only_ids=[listing.identifier for listing in listings]
|
||||||
|
)
|
||||||
|
all_listing_ids = {listing.id for listing in all_listings}
|
||||||
|
|
||||||
|
await tqdm.gather(
|
||||||
|
*[
|
||||||
|
_dump_detail_with_semaphore(semaphore, listing)
|
||||||
|
for listing in listings
|
||||||
|
# if listing.identifier not in all_listing_ids # One day we will rely solely on the model data
|
||||||
|
],
|
||||||
|
desc="Fetching details",
|
||||||
|
)
|
||||||
return listings
|
return listings
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -53,7 +70,7 @@ async def _dump_listings_with_semaphore(
|
||||||
location_id: str,
|
location_id: str,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
listing = await listing_query(
|
listing_query_result = await listing_query(
|
||||||
page=page_id,
|
page=page_id,
|
||||||
channel=parameters.listing_type,
|
channel=parameters.listing_type,
|
||||||
min_bedrooms=parameters.min_bedrooms,
|
min_bedrooms=parameters.min_bedrooms,
|
||||||
|
|
@ -66,4 +83,16 @@ async def _dump_listings_with_semaphore(
|
||||||
max_days_since_added=parameters.max_days_since_added,
|
max_days_since_added=parameters.max_days_since_added,
|
||||||
furnish_types=parameters.furnish_types or [],
|
furnish_types=parameters.furnish_types or [],
|
||||||
)
|
)
|
||||||
return listing
|
|
||||||
|
return listing_query_result
|
||||||
|
|
||||||
|
|
||||||
|
async def _dump_detail_with_semaphore(semaphore: asyncio.Semaphore, listing: Listing):
|
||||||
|
if listing.path_detail_json().exists():
|
||||||
|
return
|
||||||
|
|
||||||
|
# for listing in tqdm(filtered_listings):
|
||||||
|
async with semaphore:
|
||||||
|
d = await detail_query(listing.identifier)
|
||||||
|
with open(listing.path_detail_json(), "w") as f:
|
||||||
|
json.dump(d, f)
|
||||||
|
|
|
||||||
|
|
@ -1,28 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
from rec.query import detail_query
|
|
||||||
from tqdm.asyncio import tqdm
|
|
||||||
|
|
||||||
from data_access import Listing
|
|
||||||
|
|
||||||
# Setting this too high either crashes rightmove or gets us blocked
|
|
||||||
semaphore = asyncio.Semaphore(10)
|
|
||||||
|
|
||||||
|
|
||||||
async def dump_detail(listing_paths: list[str]):
|
|
||||||
listings = Listing.get_all_listings(listing_paths)
|
|
||||||
filtered_listings = await tqdm.gather(
|
|
||||||
*[_dump_detail_for_listing(listing) for listing in listings]
|
|
||||||
)
|
|
||||||
return filtered_listings
|
|
||||||
|
|
||||||
|
|
||||||
async def _dump_detail_for_listing(listing: Listing):
|
|
||||||
if listing.path_detail_json().exists():
|
|
||||||
return
|
|
||||||
|
|
||||||
# for listing in tqdm(filtered_listings):
|
|
||||||
async with semaphore:
|
|
||||||
d = await detail_query(listing.identifier)
|
|
||||||
with open(listing.path_detail_json(), "w") as f:
|
|
||||||
json.dump(d, f)
|
|
||||||
|
|
@ -37,7 +37,7 @@ class Listing:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_all_listings(
|
def get_all_listings(
|
||||||
listing_paths: list[str],
|
listing_paths: list[pathlib.Path],
|
||||||
seen_in_the_last_n_days: int = 30,
|
seen_in_the_last_n_days: int = 30,
|
||||||
) -> List["Listing"]:
|
) -> List["Listing"]:
|
||||||
identifiers = []
|
identifiers = []
|
||||||
|
|
@ -256,8 +256,11 @@ class Listing:
|
||||||
@property
|
@property
|
||||||
def detailobject(self) -> dict[str, Any]:
|
def detailobject(self) -> dict[str, Any]:
|
||||||
if self._cached is None:
|
if self._cached is None:
|
||||||
with open(self.path_detail_json()) as f:
|
if self.path_detail_json().exists():
|
||||||
self._cached = json.load(f)
|
with open(self.path_detail_json()) as f:
|
||||||
|
self._cached = json.load(f)
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
return self._cached # type: ignore
|
return self._cached # type: ignore
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
||||||
|
|
@ -11,14 +11,13 @@ from data_access import Listing
|
||||||
import csv_exporter
|
import csv_exporter
|
||||||
from rec.query import ListingType, FurnishType, QueryParameters
|
from rec.query import ListingType, FurnishType, QueryParameters
|
||||||
from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode
|
from rec.routing import API_KEY_ENVIRONMENT_VARIABLE, TravelMode
|
||||||
from repositories.listing_repositorty import ListingRepository
|
from repositories.listing_repository import ListingRepository
|
||||||
from ui_exporter import export_immoweb as export_immoweb_ui
|
from ui_exporter import export_immoweb as export_immoweb_ui
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from database import engine
|
from database import engine
|
||||||
|
|
||||||
|
|
||||||
dump_listings_module = importlib.import_module("1_dump_listings")
|
dump_listings_module = importlib.import_module("1_dump_listings")
|
||||||
dump_detail_module = importlib.import_module("2_dump_detail")
|
|
||||||
dump_images_module = importlib.import_module("3_dump_images")
|
dump_images_module = importlib.import_module("3_dump_images")
|
||||||
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
|
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
|
||||||
routing_module = importlib.import_module("5_routing")
|
routing_module = importlib.import_module("5_routing")
|
||||||
|
|
@ -157,21 +156,12 @@ def dump_listings(
|
||||||
)
|
)
|
||||||
data_dir_path = pathlib.Path(data_dir)
|
data_dir_path = pathlib.Path(data_dir)
|
||||||
listings = asyncio.run(
|
listings = asyncio.run(
|
||||||
dump_listings_module.dump_listings(query_parameters, data_dir_path)
|
dump_listings_module.dump_listings(query_parameters, engine, data_dir_path)
|
||||||
)
|
)
|
||||||
repository = ListingRepository(engine=engine)
|
repository = ListingRepository(engine=engine)
|
||||||
asyncio.run(repository.upsert_listings(listings))
|
asyncio.run(repository.upsert_listings(listings))
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.pass_context
|
|
||||||
def dump_details(ctx: click.core.Context):
|
|
||||||
data_dir = ctx.obj["data_dir"]
|
|
||||||
click.echo(f"Running dump_detail for listings stored in {data_dir}")
|
|
||||||
listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json")))
|
|
||||||
asyncio.run(dump_detail_module.dump_detail(listing_paths))
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def dump_images(ctx: click.core.Context):
|
def dump_images(ctx: click.core.Context):
|
||||||
|
|
@ -298,7 +288,7 @@ def export_csv(
|
||||||
)
|
)
|
||||||
output_file_path = pathlib.Path(output_file)
|
output_file_path = pathlib.Path(output_file)
|
||||||
listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json")))
|
listing_paths = sorted(list(pathlib.Path(data_dir).glob("*/listing.json")))
|
||||||
listings = Listing.get_all_listings([str(path) for path in listing_paths])
|
listings = Listing.get_all_listings([path for path in listing_paths])
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
csv_exporter.export_to_csv(
|
csv_exporter.export_to_csv(
|
||||||
listings,
|
listings,
|
||||||
|
|
@ -365,7 +355,7 @@ def populate_db(
|
||||||
click.echo(f"Populating the database with data from {data_dir}")
|
click.echo(f"Populating the database with data from {data_dir}")
|
||||||
repository = ListingRepository(engine=engine)
|
repository = ListingRepository(engine=engine)
|
||||||
listings = Listing.get_all_listings(
|
listings = Listing.get_all_listings(
|
||||||
[str(path) for path in pathlib.Path(data_dir).glob("*/listing.json")]
|
[path for path in pathlib.Path(data_dir).glob("*/listing.json")]
|
||||||
)
|
)
|
||||||
asyncio.run(repository.upsert_listings(listings))
|
asyncio.run(repository.upsert_listings(listings))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -90,7 +90,7 @@ class PropertyType(enum.StrEnum):
|
||||||
TERRACED = "terraced"
|
TERRACED = "terraced"
|
||||||
|
|
||||||
|
|
||||||
async def detail_query(detail_id: int):
|
async def detail_query(detail_id: int) -> dict[str, Any]:
|
||||||
params = {
|
params = {
|
||||||
"apiApplication": "ANDROID",
|
"apiApplication": "ANDROID",
|
||||||
"appVersion": "3.70.0",
|
"appVersion": "3.70.0",
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,11 @@
|
||||||
set -euxo pipefail
|
set -euxo pipefail
|
||||||
|
|
||||||
DATA_DIR="data/rs"
|
DATA_DIR="data/rs"
|
||||||
LISTING_FILTER_OPTIONS="--min-price 2000 --max-price 4000 --min-bedrooms 2 --max-bedrooms 4 -t rent --available-from $(date +%Y-%m-%d) --last-seen-days 7 --furnish-type furnished"
|
LISTING_FILTER_OPTIONS="--min-price 2000 --max-price 4000 --min-bedrooms 2 --max-bedrooms 4 -t rent --available-from $(date +%Y-%m-%d) --last-seen-days 7 --furnish-types furnished"
|
||||||
|
|
||||||
|
alembic upgrade head # init db
|
||||||
|
|
||||||
python main.py --data-dir $DATA_DIR dump-listings $LISTING_FILTER_OPTIONS
|
python main.py --data-dir $DATA_DIR dump-listings $LISTING_FILTER_OPTIONS
|
||||||
python main.py --data-dir $DATA_DIR dump-details
|
|
||||||
python main.py --data-dir $DATA_DIR dump-images
|
python main.py --data-dir $DATA_DIR dump-images
|
||||||
python main.py --data-dir $DATA_DIR detect-floorplan
|
python main.py --data-dir $DATA_DIR detect-floorplan
|
||||||
#python main.py --data-dir $DATA_DIR routing --destination-address 'Meta Brock Street' -m transit # NOTE: THIS CONSUMES API CALLS; USE CAREFULLY; add -l to limit number of entries
|
#python main.py --data-dir $DATA_DIR routing --destination-address 'Meta Brock Street' -m transit # NOTE: THIS CONSUMES API CALLS; USE CAREFULLY; add -l to limit number of entries
|
||||||
|
|
|
||||||
|
|
@ -49,6 +49,6 @@ async def export_immoweb(
|
||||||
prefix = "var data = "
|
prefix = "var data = "
|
||||||
serialized_data = {"type": "FeatureCollection", "features": immoweb_listings}
|
serialized_data = {"type": "FeatureCollection", "features": immoweb_listings}
|
||||||
result = prefix + json.dumps(serialized_data, indent=4)
|
result = prefix + json.dumps(serialized_data, indent=4)
|
||||||
with open(output_file_path, "w") as f:
|
with open(str(output_file_path), "w") as f:
|
||||||
f.write(result)
|
f.write(result)
|
||||||
# json.dump(serialized_data, f, indent=4)
|
# json.dump(serialized_data, f, indent=4)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue