delete 1 dump listings as it is renamed
This commit is contained in:
parent
4d7ca7b920
commit
61e1832bc1
1 changed files with 0 additions and 157 deletions
|
|
@ -1,157 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import importlib
|
|
||||||
import itertools
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import pathlib
|
|
||||||
from typing import Any
|
|
||||||
from listing_processor import ListingProcessor
|
|
||||||
from rec.query import listing_query
|
|
||||||
from models.listing import QueryParameters
|
|
||||||
from rec.districts import get_districts
|
|
||||||
from repositories import ListingRepository
|
|
||||||
from tqdm.asyncio import tqdm
|
|
||||||
from data_access import Listing
|
|
||||||
from models import Listing as modelListing
|
|
||||||
|
|
||||||
dump_images_module = importlib.import_module("3_dump_images")
|
|
||||||
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
|
|
||||||
|
|
||||||
logger = logging.getLogger("uvicorn.error")
|
|
||||||
|
|
||||||
|
|
||||||
async def dump_listings_full(
|
|
||||||
parameters: QueryParameters,
|
|
||||||
repository: ListingRepository,
|
|
||||||
data_dir: pathlib.Path = pathlib.Path("data/rs/"),
|
|
||||||
) -> list[modelListing]:
|
|
||||||
"""Fetches all listings, images as well as detects floorplans"""
|
|
||||||
new_listings = await dump_listings(parameters, repository, data_dir)
|
|
||||||
logger.debug(f"Upserted {len(new_listings)} new listings")
|
|
||||||
# logger.debug("Starting to fetch floorplans")
|
|
||||||
# await dump_images_module.dump_images(repository, image_base_path=data_dir)
|
|
||||||
# logger.debug("Completed fetching floorplans")
|
|
||||||
# logger.debug("Starting floorplan detection")
|
|
||||||
# await detect_floorplan_module.detect_floorplan(repository)
|
|
||||||
# logger.debug("Completed floorplan detection")
|
|
||||||
# refresh listings
|
|
||||||
listings = await repository.get_listings(parameters) # this can be better
|
|
||||||
new_listings = [x for x in listings if x.id in new_listings]
|
|
||||||
return new_listings
|
|
||||||
|
|
||||||
|
|
||||||
async def dump_listings(
|
|
||||||
parameters: QueryParameters,
|
|
||||||
repository: ListingRepository,
|
|
||||||
data_dir: pathlib.Path = pathlib.Path("data/rs/"),
|
|
||||||
) -> list[modelListing]:
|
|
||||||
if parameters.district_names:
|
|
||||||
districts = {
|
|
||||||
district: locid
|
|
||||||
for district, locid in get_districts().items()
|
|
||||||
if district in parameters.district_names
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
districts = get_districts()
|
|
||||||
logger.debug("Valid districts to scrape:", districts.keys())
|
|
||||||
|
|
||||||
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
|
|
||||||
json_responses: list[list[dict[str, Any]]] = await tqdm.gather(
|
|
||||||
*[
|
|
||||||
_fetch_listings_with_semaphore(semaphore, parameters, district)
|
|
||||||
for district in districts.keys()
|
|
||||||
],
|
|
||||||
desc="Fetching listings",
|
|
||||||
)
|
|
||||||
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
|
|
||||||
logger.debug(f"Total listings fetched {len(json_responses_flat)}")
|
|
||||||
listings: list[Listing] = []
|
|
||||||
for response_json in json_responses_flat:
|
|
||||||
if response_json == {}:
|
|
||||||
continue
|
|
||||||
if response_json["totalAvailableResults"] == 0:
|
|
||||||
continue
|
|
||||||
for property in response_json["properties"]:
|
|
||||||
identifier = property["identifier"]
|
|
||||||
|
|
||||||
listing = Listing(identifier, data_dir=data_dir, _listing_object=property)
|
|
||||||
listings.append(listing)
|
|
||||||
|
|
||||||
# if listing is already in db, do not fetch details again
|
|
||||||
all_listing_ids = [x.id for x in await repository.get_listings()]
|
|
||||||
missing_listing = [
|
|
||||||
listing for listing in listings if listing.identifier not in all_listing_ids
|
|
||||||
]
|
|
||||||
missing_ids = [listing.identifier for listing in missing_listing]
|
|
||||||
missing_ids = [missing_ids[0]]
|
|
||||||
listing_processor = ListingProcessor(repository)
|
|
||||||
logger.info(f"Starting processing {len(missing_listing)} new listings")
|
|
||||||
processed_listings = await tqdm.gather(
|
|
||||||
*[listing_processor.process_listing(id) for id in missing_ids]
|
|
||||||
)
|
|
||||||
filtered_listings = [x for x in processed_listings if x is not None]
|
|
||||||
|
|
||||||
return filtered_listings
|
|
||||||
|
|
||||||
|
|
||||||
async def _fetch_listings_with_semaphore(
|
|
||||||
semaphore: asyncio.Semaphore,
|
|
||||||
parameters: QueryParameters,
|
|
||||||
district: str,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
result = []
|
|
||||||
# split the price in N bands to avoid the 1.5k capping by rightmove
|
|
||||||
# basically instead of 1 query with price between 1k and 5k that is capped at 1500 results
|
|
||||||
# we do 10 queries each with an increment in price range so we send more queries but each
|
|
||||||
# has a smaller chance of returning more than 1.5k results
|
|
||||||
|
|
||||||
number_of_steps = 1
|
|
||||||
price_step = parameters.max_price // number_of_steps
|
|
||||||
|
|
||||||
for step in range(number_of_steps):
|
|
||||||
min_price = step * price_step
|
|
||||||
max_price = (step + 1) * price_step
|
|
||||||
logger.debug(
|
|
||||||
f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}"
|
|
||||||
)
|
|
||||||
|
|
||||||
for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1):
|
|
||||||
for page_id in range(
|
|
||||||
1,
|
|
||||||
3, # seems like all searches stop at 1500 entries (page_id * page_size)
|
|
||||||
):
|
|
||||||
logger.debug(f"Processing {page_id=} for {district=}")
|
|
||||||
|
|
||||||
async with semaphore:
|
|
||||||
try:
|
|
||||||
listing_query_result = await listing_query(
|
|
||||||
page=page_id,
|
|
||||||
channel=parameters.listing_type,
|
|
||||||
# min_bedrooms=parameters.min_bedrooms,
|
|
||||||
# max_bedrooms=parameters.max_bedrooms,
|
|
||||||
min_bedrooms=num_bedrooms,
|
|
||||||
max_bedrooms=num_bedrooms,
|
|
||||||
radius=parameters.radius,
|
|
||||||
min_price=min_price,
|
|
||||||
max_price=max_price,
|
|
||||||
district=district,
|
|
||||||
page_size=parameters.page_size,
|
|
||||||
max_days_since_added=parameters.max_days_since_added,
|
|
||||||
furnish_types=parameters.furnish_types or [],
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
if "GENERIC_ERROR" in str(e): # Too big page id
|
|
||||||
logger.debug(f"Max page id for {district=}: {page_id-1}")
|
|
||||||
break
|
|
||||||
raise e
|
|
||||||
result.append(listing_query_result)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
async def dump_listings_to_fs(listings: list[Listing]) -> None:
|
|
||||||
for listing in tqdm(listings, desc="Dumping listings to FS"):
|
|
||||||
listing.dump_listing()
|
|
||||||
# if not listing.path_detail_json().exists():
|
|
||||||
with open(listing.path_detail_json(), "w") as f:
|
|
||||||
json.dump(listing._details_object, f, indent=4)
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue