270 lines
10 KiB
Python
270 lines
10 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
from celery import Task
|
|
from celery.schedules import crontab
|
|
from celery_app import app
|
|
from config.schedule_config import SchedulesConfig
|
|
from config.scraper_config import ScraperConfig
|
|
from listing_processor import ListingProcessor
|
|
from models.listing import Listing, QueryParameters
|
|
from rec.query import create_session, listing_query
|
|
from repositories.listing_repository import ListingRepository
|
|
from database import engine
|
|
from services.query_splitter import QuerySplitter, SubQuery
|
|
from utils.redis_lock import redis_lock
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
SCRAPE_LOCK_NAME = "scrape_listings"
|
|
|
|
|
|
@app.task(bind=True, pydantic=True)
|
|
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape job is already running, skipping this execution")
|
|
self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"})
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
self.update_state(state="Starting...", meta={"progress": 0})
|
|
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
|
|
return {"progress": 0}
|
|
|
|
|
|
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape job is already running, skipping this execution")
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
await dump_listings_full(task=Task(), parameters=parsed_parameters)
|
|
return {"progress": 0}
|
|
|
|
|
|
async def dump_listings_full(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Fetches all listings, images as well as detects floorplans"""
|
|
repository = ListingRepository(engine)
|
|
|
|
task.update_state(state="Identifying missing listings", meta={"progress": 0})
|
|
ids_to_process = await get_ids_to_process(
|
|
parameters=parameters, repository=repository, task=task
|
|
)
|
|
logger.info(f"Found {len(ids_to_process)} listings to process")
|
|
|
|
if len(ids_to_process) == 0:
|
|
task.update_state(
|
|
state="No new listings found",
|
|
meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"},
|
|
)
|
|
return []
|
|
|
|
listing_processor = ListingProcessor(repository)
|
|
logger.info(f"Starting processing {len(ids_to_process)} listings")
|
|
return await dump_listings_and_monitor(
|
|
task=task, listing_processor=listing_processor, missing_ids=ids_to_process
|
|
)
|
|
|
|
|
|
async def dump_listings_and_monitor(
|
|
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
|
|
) -> list[Listing]:
|
|
task_progress = {missing_id: 0 for missing_id in missing_ids}
|
|
|
|
async def process(missing_id: int) -> Listing | None:
|
|
listing = await listing_processor.process_listing(missing_id)
|
|
task_progress[missing_id] = 1
|
|
return listing
|
|
|
|
async def monitor() -> None:
|
|
while (progress := sum(task_progress.values())) < len(missing_ids):
|
|
progress_ratio = round(progress / len(missing_ids), 2)
|
|
logger.error(
|
|
f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})"
|
|
)
|
|
task.update_state(
|
|
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
|
|
meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)},
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
processed_listings = await asyncio.gather(
|
|
*[process(id) for id in missing_ids], *[monitor()]
|
|
)
|
|
filtered_listings = [l for l in processed_listings if l is not None]
|
|
|
|
return filtered_listings
|
|
|
|
|
|
@app.on_after_finalize.connect
|
|
def setup_periodic_tasks(sender, **kwargs):
|
|
"""Register periodic tasks from environment configuration."""
|
|
try:
|
|
config = SchedulesConfig.from_env()
|
|
except ValueError as e:
|
|
logger.error(f"Failed to load schedule configuration: {e}")
|
|
return
|
|
|
|
for schedule in config.get_enabled_schedules():
|
|
logger.info(
|
|
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
|
|
)
|
|
|
|
sender.add_periodic_task(
|
|
crontab(
|
|
minute=schedule.minute,
|
|
hour=schedule.hour,
|
|
day_of_week=schedule.day_of_week,
|
|
),
|
|
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
|
|
name=schedule.name,
|
|
)
|
|
|
|
|
|
async def get_ids_to_process(
|
|
*,
|
|
parameters: QueryParameters,
|
|
repository: ListingRepository,
|
|
task: Task,
|
|
) -> set[int]:
|
|
"""Fetch all listing IDs using intelligent query splitting.
|
|
|
|
Uses the QuerySplitter to adaptively split large queries and maximize
|
|
data extraction while respecting Rightmove's result caps.
|
|
|
|
Args:
|
|
parameters: Query parameters for the search.
|
|
repository: Repository for checking existing listings.
|
|
task: Celery task for progress updates.
|
|
|
|
Returns:
|
|
Set of new listing IDs that need to be processed.
|
|
"""
|
|
config = ScraperConfig.from_env()
|
|
splitter = QuerySplitter(config)
|
|
|
|
def on_progress(phase: str, message: str) -> None:
|
|
task.update_state(state=message, meta={"phase": phase})
|
|
|
|
async with create_session(config) as session:
|
|
# Phase 1 & 2: Split and probe queries
|
|
task.update_state(
|
|
state="Analyzing query and splitting by price bands...",
|
|
meta={"phase": "splitting", "progress": 0},
|
|
)
|
|
subqueries = await splitter.split(parameters, session, on_progress)
|
|
|
|
total_estimated = splitter.calculate_total_estimated_results(subqueries)
|
|
logger.info(
|
|
f"Split into {len(subqueries)} subqueries, "
|
|
f"estimated {total_estimated} total results"
|
|
)
|
|
|
|
# Phase 3: Fetch all pages for each subquery
|
|
task.update_state(
|
|
state=f"Fetching listings from {len(subqueries)} subqueries...",
|
|
meta={
|
|
"phase": "fetching",
|
|
"subqueries": len(subqueries),
|
|
"estimated_results": total_estimated,
|
|
},
|
|
)
|
|
|
|
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
|
identifiers: set[int] = set()
|
|
|
|
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
|
|
"""Fetch all pages for a single subquery."""
|
|
results: list[dict[str, Any]] = []
|
|
|
|
# Calculate how many pages we need based on estimated results
|
|
estimated = sq.estimated_results or 0
|
|
if estimated == 0:
|
|
return results
|
|
|
|
# Fetch pages up to max_pages_per_query or until no more results
|
|
page_size = parameters.page_size
|
|
max_pages = min(
|
|
config.max_pages_per_query,
|
|
(estimated // page_size) + 1,
|
|
)
|
|
|
|
for page_id in range(1, max_pages + 1):
|
|
async with semaphore:
|
|
await asyncio.sleep(config.request_delay_ms / 1000)
|
|
try:
|
|
result = await listing_query(
|
|
page=page_id,
|
|
channel=parameters.listing_type,
|
|
min_bedrooms=sq.min_bedrooms,
|
|
max_bedrooms=sq.max_bedrooms,
|
|
radius=parameters.radius,
|
|
min_price=sq.min_price,
|
|
max_price=sq.max_price,
|
|
district=sq.district,
|
|
page_size=page_size,
|
|
max_days_since_added=parameters.max_days_since_added,
|
|
furnish_types=parameters.furnish_types or [],
|
|
session=session,
|
|
)
|
|
results.append(result)
|
|
|
|
# Check if we've received all results
|
|
properties = result.get("properties", [])
|
|
if len(properties) < page_size:
|
|
# No more results on next page
|
|
break
|
|
|
|
except Exception as e:
|
|
if "GENERIC_ERROR" in str(e):
|
|
# Reached end of results
|
|
logger.debug(
|
|
f"Max page for {sq.district}: {page_id - 1}"
|
|
)
|
|
break
|
|
logger.warning(
|
|
f"Error fetching page {page_id} for {sq.district}: {e}"
|
|
)
|
|
break
|
|
|
|
return results
|
|
|
|
# Fetch all subqueries concurrently
|
|
all_results = await asyncio.gather(
|
|
*[fetch_subquery(sq) for sq in subqueries]
|
|
)
|
|
|
|
# Extract identifiers from all results
|
|
for subquery_results in all_results:
|
|
for response_json in subquery_results:
|
|
if not response_json:
|
|
continue
|
|
if response_json.get("totalAvailableResults", 0) == 0:
|
|
continue
|
|
for property_data in response_json.get("properties", []):
|
|
identifier = property_data.get("identifier")
|
|
if identifier:
|
|
identifiers.add(identifier)
|
|
|
|
logger.info(f"Found {len(identifiers)} unique listings")
|
|
|
|
# Filter out listings already in the database
|
|
all_listing_ids = {l.id for l in await repository.get_listings()}
|
|
new_ids = identifiers - all_listing_ids
|
|
|
|
task.update_state(
|
|
state=f"Found {len(new_ids)} new listings to process",
|
|
meta={
|
|
"phase": "filtering",
|
|
"total_found": len(identifiers),
|
|
"new_listings": len(new_ids),
|
|
},
|
|
)
|
|
|
|
return new_ids
|