wrongmove/crawler/tasks/listing_tasks.py

232 lines
8.8 KiB
Python

import asyncio
import itertools
import logging
from typing import Any
from celery import Task
from celery.schedules import crontab
from celery_app import app
from config.schedule_config import SchedulesConfig
from listing_processor import ListingProcessor
from models.listing import Listing, QueryParameters
from rec.districts import get_districts
from rec.query import listing_query
from repositories.listing_repository import ListingRepository
from database import engine
from services import image_fetcher, floorplan_detector
from utils.redis_lock import redis_lock
logger = logging.getLogger("uvicorn.error")
SCRAPE_LOCK_NAME = "scrape_listings"
@app.task(bind=True, pydantic=True)
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
if not acquired:
logger.warning("Another scrape job is already running, skipping this execution")
self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"})
return {"status": "skipped", "reason": "another_job_running"}
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
self.update_state(state="Starting...", meta={"progress": 0})
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
return {"progress": 0}
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
if not acquired:
logger.warning("Another scrape job is already running, skipping this execution")
return {"status": "skipped", "reason": "another_job_running"}
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
await dump_listings_full(task=Task(), parameters=parsed_parameters)
return {"progress": 0}
async def dump_listings_full(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans"""
repository = ListingRepository(engine)
task.update_state(state="Identifying missing listings", meta={"progress": 0})
ids_to_process = await get_ids_to_process(
parameters=parameters, repository=repository, task=task
)
logger.info(f"Found {len(ids_to_process)} listings to process")
listing_processor = ListingProcessor(repository)
logger.info(f"Starting processing {len(ids_to_process)} listings")
return await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=ids_to_process
)
async def dump_listings_and_monitor(
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
) -> list[Listing]:
task_progress = {missing_id: 0 for missing_id in missing_ids}
async def process(missing_id: int) -> Listing | None:
listing = await listing_processor.process_listing(missing_id)
task_progress[missing_id] = 1
return listing
async def monitor() -> None:
while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2)
logger.error(
f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})"
)
task.update_state(
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)},
)
await asyncio.sleep(1)
processed_listings = await asyncio.gather(
*[process(id) for id in missing_ids], *[monitor()]
)
filtered_listings = [l for l in processed_listings if l is not None]
return filtered_listings
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
"""Register periodic tasks from environment configuration."""
try:
config = SchedulesConfig.from_env()
except ValueError as e:
logger.error(f"Failed to load schedule configuration: {e}")
return
for schedule in config.get_enabled_schedules():
logger.info(
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
)
sender.add_periodic_task(
crontab(
minute=schedule.minute,
hour=schedule.hour,
day_of_week=schedule.day_of_week,
),
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
name=schedule.name,
)
async def get_ids_to_process(
*,
parameters: QueryParameters,
repository: ListingRepository,
task: Task,
) -> set[int]:
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
districts = await get_valid_districts_to_scrape(parameters.district_names)
task.update_state(state="Fetching listings to scrape", meta={"progress": 0})
json_responses: list[list[dict[str, Any]]] = await asyncio.gather(
*[
_fetch_listings_with_semaphore(
task=task, semaphore=semaphore, parameters=parameters, district=district
)
for district in districts.keys()
],
)
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
logger.debug(f"Total listings fetched {len(json_responses_flat)}")
identifiers: set[int] = set()
for response_json in json_responses_flat:
if response_json == {}:
continue
if response_json["totalAvailableResults"] == 0:
continue
for property in response_json["properties"]:
identifier = property["identifier"]
identifiers.add(identifier)
# if listing is already in db, do not fetch details again
all_listing_ids = {l.id for l in await repository.get_listings()}
all_ids = all_listing_ids.union(identifiers)
return all_ids
async def get_valid_districts_to_scrape(
district_names: set[str] | None,
) -> dict[str, str]:
if district_names:
districts = {
district: locid
for district, locid in get_districts().items()
if district in district_names
}
else:
districts = get_districts()
return districts
async def _fetch_listings_with_semaphore(
*,
task: Task,
semaphore: asyncio.Semaphore,
parameters: QueryParameters,
district: str,
) -> list[dict[str, Any]]:
result = []
# split the price in N bands to avoid the 1.5k capping by rightmove
# basically instead of 1 query with price between 1k and 5k that is capped at 1500 results
# we do 10 queries each with an increment in price range so we send more queries but each
# has a smaller chance of returning more than 1.5k results
number_of_steps = 10
price_step = parameters.max_price // number_of_steps
for step in range(number_of_steps):
task.update_state(
state=f"Fetching listings ({step} out of {number_of_steps})",
meta={"progress": step / number_of_steps},
)
min_price = step * price_step
max_price = (step + 1) * price_step
logger.debug(
f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}"
)
for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1):
for page_id in range(
1,
3, # seems like all searches stop at 1500 entries (page_id * page_size)
):
logger.debug(f"Processing {page_id=} for {district=}")
async with semaphore:
try:
listing_query_result = await listing_query(
page=page_id,
channel=parameters.listing_type,
# min_bedrooms=parameters.min_bedrooms,
# max_bedrooms=parameters.max_bedrooms,
min_bedrooms=num_bedrooms,
max_bedrooms=num_bedrooms,
radius=parameters.radius,
min_price=min_price,
max_price=max_price,
district=district,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
except Exception as e:
if "GENERIC_ERROR" in str(e): # Too big page id
logger.debug(f"Max page id for {district=}: {page_id-1}")
break
raise e
result.append(listing_query_result)
return result