The get_ids_to_process function was using set union instead of set difference, causing it to return all existing listing IDs along with new ones. This meant: 1. When there were no new listings, the task would iterate through all existing listings, find nothing to process for each, and complete almost instantly 2. The task showed no progress because processing was too fast Fixed by: - Changed `all_listing_ids.union(identifiers)` to `identifiers - all_listing_ids` to only return IDs that are NOT already in the database - Added explicit check for empty set with informative task state "No new listings found" so users understand why the task completed quickly
239 lines
9.1 KiB
Python
239 lines
9.1 KiB
Python
import asyncio
|
|
import itertools
|
|
import logging
|
|
from typing import Any
|
|
from celery import Task
|
|
from celery.schedules import crontab
|
|
from celery_app import app
|
|
from config.schedule_config import SchedulesConfig
|
|
from listing_processor import ListingProcessor
|
|
from models.listing import Listing, QueryParameters
|
|
from rec.districts import get_districts
|
|
from rec.query import listing_query
|
|
from repositories.listing_repository import ListingRepository
|
|
from database import engine
|
|
from services import image_fetcher, floorplan_detector
|
|
from utils.redis_lock import redis_lock
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
SCRAPE_LOCK_NAME = "scrape_listings"
|
|
|
|
|
|
@app.task(bind=True, pydantic=True)
|
|
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape job is already running, skipping this execution")
|
|
self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"})
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
self.update_state(state="Starting...", meta={"progress": 0})
|
|
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
|
|
return {"progress": 0}
|
|
|
|
|
|
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape job is already running, skipping this execution")
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
await dump_listings_full(task=Task(), parameters=parsed_parameters)
|
|
return {"progress": 0}
|
|
|
|
|
|
async def dump_listings_full(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Fetches all listings, images as well as detects floorplans"""
|
|
repository = ListingRepository(engine)
|
|
|
|
task.update_state(state="Identifying missing listings", meta={"progress": 0})
|
|
ids_to_process = await get_ids_to_process(
|
|
parameters=parameters, repository=repository, task=task
|
|
)
|
|
logger.info(f"Found {len(ids_to_process)} listings to process")
|
|
|
|
if len(ids_to_process) == 0:
|
|
task.update_state(
|
|
state="No new listings found",
|
|
meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"},
|
|
)
|
|
return []
|
|
|
|
listing_processor = ListingProcessor(repository)
|
|
logger.info(f"Starting processing {len(ids_to_process)} listings")
|
|
return await dump_listings_and_monitor(
|
|
task=task, listing_processor=listing_processor, missing_ids=ids_to_process
|
|
)
|
|
|
|
|
|
async def dump_listings_and_monitor(
|
|
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
|
|
) -> list[Listing]:
|
|
task_progress = {missing_id: 0 for missing_id in missing_ids}
|
|
|
|
async def process(missing_id: int) -> Listing | None:
|
|
listing = await listing_processor.process_listing(missing_id)
|
|
task_progress[missing_id] = 1
|
|
return listing
|
|
|
|
async def monitor() -> None:
|
|
while (progress := sum(task_progress.values())) < len(missing_ids):
|
|
progress_ratio = round(progress / len(missing_ids), 2)
|
|
logger.error(
|
|
f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})"
|
|
)
|
|
task.update_state(
|
|
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
|
|
meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)},
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
processed_listings = await asyncio.gather(
|
|
*[process(id) for id in missing_ids], *[monitor()]
|
|
)
|
|
filtered_listings = [l for l in processed_listings if l is not None]
|
|
|
|
return filtered_listings
|
|
|
|
|
|
@app.on_after_finalize.connect
|
|
def setup_periodic_tasks(sender, **kwargs):
|
|
"""Register periodic tasks from environment configuration."""
|
|
try:
|
|
config = SchedulesConfig.from_env()
|
|
except ValueError as e:
|
|
logger.error(f"Failed to load schedule configuration: {e}")
|
|
return
|
|
|
|
for schedule in config.get_enabled_schedules():
|
|
logger.info(
|
|
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
|
|
)
|
|
|
|
sender.add_periodic_task(
|
|
crontab(
|
|
minute=schedule.minute,
|
|
hour=schedule.hour,
|
|
day_of_week=schedule.day_of_week,
|
|
),
|
|
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
|
|
name=schedule.name,
|
|
)
|
|
|
|
|
|
async def get_ids_to_process(
|
|
*,
|
|
parameters: QueryParameters,
|
|
repository: ListingRepository,
|
|
task: Task,
|
|
) -> set[int]:
|
|
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
|
|
districts = await get_valid_districts_to_scrape(parameters.district_names)
|
|
task.update_state(state="Fetching listings to scrape", meta={"progress": 0})
|
|
json_responses: list[list[dict[str, Any]]] = await asyncio.gather(
|
|
*[
|
|
_fetch_listings_with_semaphore(
|
|
task=task, semaphore=semaphore, parameters=parameters, district=district
|
|
)
|
|
for district in districts.keys()
|
|
],
|
|
)
|
|
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
|
|
logger.debug(f"Total listings fetched {len(json_responses_flat)}")
|
|
|
|
identifiers: set[int] = set()
|
|
for response_json in json_responses_flat:
|
|
if response_json == {}:
|
|
continue
|
|
if response_json["totalAvailableResults"] == 0:
|
|
continue
|
|
for property in response_json["properties"]:
|
|
identifier = property["identifier"]
|
|
identifiers.add(identifier)
|
|
|
|
# if listing is already in db, do not fetch details again
|
|
all_listing_ids = {l.id for l in await repository.get_listings()}
|
|
new_ids = identifiers - all_listing_ids
|
|
return new_ids
|
|
|
|
|
|
async def get_valid_districts_to_scrape(
|
|
district_names: set[str] | None,
|
|
) -> dict[str, str]:
|
|
if district_names:
|
|
districts = {
|
|
district: locid
|
|
for district, locid in get_districts().items()
|
|
if district in district_names
|
|
}
|
|
else:
|
|
districts = get_districts()
|
|
return districts
|
|
|
|
|
|
async def _fetch_listings_with_semaphore(
|
|
*,
|
|
task: Task,
|
|
semaphore: asyncio.Semaphore,
|
|
parameters: QueryParameters,
|
|
district: str,
|
|
) -> list[dict[str, Any]]:
|
|
result = []
|
|
# split the price in N bands to avoid the 1.5k capping by rightmove
|
|
# basically instead of 1 query with price between 1k and 5k that is capped at 1500 results
|
|
# we do 10 queries each with an increment in price range so we send more queries but each
|
|
# has a smaller chance of returning more than 1.5k results
|
|
|
|
number_of_steps = 10
|
|
price_step = parameters.max_price // number_of_steps
|
|
|
|
for step in range(number_of_steps):
|
|
task.update_state(
|
|
state=f"Fetching listings ({step} out of {number_of_steps})",
|
|
meta={"progress": step / number_of_steps},
|
|
)
|
|
min_price = step * price_step
|
|
max_price = (step + 1) * price_step
|
|
logger.debug(
|
|
f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}"
|
|
)
|
|
|
|
for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1):
|
|
for page_id in range(
|
|
1,
|
|
3, # seems like all searches stop at 1500 entries (page_id * page_size)
|
|
):
|
|
logger.debug(f"Processing {page_id=} for {district=}")
|
|
|
|
async with semaphore:
|
|
try:
|
|
listing_query_result = await listing_query(
|
|
page=page_id,
|
|
channel=parameters.listing_type,
|
|
# min_bedrooms=parameters.min_bedrooms,
|
|
# max_bedrooms=parameters.max_bedrooms,
|
|
min_bedrooms=num_bedrooms,
|
|
max_bedrooms=num_bedrooms,
|
|
radius=parameters.radius,
|
|
min_price=min_price,
|
|
max_price=max_price,
|
|
district=district,
|
|
page_size=parameters.page_size,
|
|
max_days_since_added=parameters.max_days_since_added,
|
|
furnish_types=parameters.furnish_types or [],
|
|
)
|
|
|
|
except Exception as e:
|
|
if "GENERIC_ERROR" in str(e): # Too big page id
|
|
logger.debug(f"Max page id for {district=}: {page_id-1}")
|
|
break
|
|
raise e
|
|
result.append(listing_query_result)
|
|
return result
|