migrate processing to a pipeline approach where each listing is processed in a pipeline in parallel and status reported back to track progress

This commit is contained in:
Viktor Barzin 2025-07-27 18:33:39 +00:00
parent 4fa09e31c8
commit 91a0436f7f
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
6 changed files with 347 additions and 26 deletions

View file

@ -27,7 +27,7 @@ async def dump_images(
@retry(wait=wait_random(min=1, max=2), stop=stop_after_attempt(3))
async def dump_images_for_listing(listing: Listing, base_path: Path) -> Listing | None:
all_floorplans = listing.additional_info["property"]["floorplans"]
all_floorplans = listing.additional_info.get("property", {}).get("floorplans", [])
for floorplan in all_floorplans:
url = floorplan["url"]
picname = url.split("/")[-1]

View file

@ -91,6 +91,8 @@ async def refresh_listings(
await send_notification(
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
)
# await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync
# return {}
# TODO: rate limit
expiry_time = datetime.now() + timedelta(minutes=10)
task = listing_tasks.dump_listings_task.apply_async(

View file

@ -0,0 +1,184 @@
from __future__ import annotations
from abc import abstractmethod
import asyncio
from datetime import datetime
import logging
import multiprocessing
from pathlib import Path
import aiohttp
from models.listing import FurnishType, Listing, ListingSite, RentListing
from rec import floorplan
from rec.query import detail_query
from repositories.listing_repository import ListingRepository
logger = logging.getLogger("uvicorn.error")
class ListingProcessor:
semaphore: asyncio.Semaphore
process_steps: list[Step]
def __init__(self, listing_repository: ListingRepository):
self.semaphore = asyncio.Semaphore(20)
# Register new processing steps here
# Order is important
self.process_steps = [
FetchListingDetailsStep(listing_repository),
FetchImagesStep(listing_repository),
DetectFloorplanStep(listing_repository),
]
async def process_listing(self, listing_id: int) -> Listing | None:
listing = None
for step in self.process_steps:
if await step.needs_processing(listing_id):
async with self.semaphore:
listing = await step.process(listing_id)
return listing
async def listing_exists(self, listing_id: int) -> bool: ...
class Step:
listing_repository: ListingRepository
def __init__(self, listing_repository: ListingRepository):
self.listing_repository = listing_repository
@abstractmethod
async def process(self, listing_id: int) -> Listing: ...
@abstractmethod
async def needs_processing(self, listing_id: int) -> bool:
return True
class FetchListingDetailsStep(Step):
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching details for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
now = datetime.now()
if len(existing_listings) > 0:
# listing exists, do not refresh
return existing_listings[0]
listing_details = await detail_query(listing_id)
furnish_type_str = listing_details["property"].get("letFurnishType", "unknown")
if furnish_type_str is None:
furnish_type_str = "unknown"
elif "landlord" in furnish_type_str.lower():
furnish_type_str = "ask landlord"
else:
furnish_type_str = furnish_type_str.lower()
furnish_type = FurnishType(furnish_type_str)
available_from: datetime | None = None
available_from_str: str | None = listing_details["property"]["letDateAvailable"]
if available_from_str is None:
available_from = None
elif available_from_str.lower() == "now":
available_from = datetime.now()
else:
try:
available_from = datetime.strptime(available_from_str, "%d/%m/%Y")
except ValueError:
# If the date format is not as expected, return None
available_from = None
photos = listing_details["property"]["photos"]
# listing = Listing(
listing = RentListing( # TODO: should pick based on price?
id=listing_id,
price=listing_details["property"]["price"],
number_of_bedrooms=listing_details["property"]["bedrooms"],
square_meters=None, # populated later
agency=listing_details["property"]["branch"]["brandName"],
council_tax_band=listing_details["property"]["councilTaxInfo"]["content"][
0
]["value"],
longtitude=listing_details["property"]["longitude"],
latitude=listing_details["property"]["latitude"],
price_history_json="{}", # TODO: should upsert from existing
listing_site=ListingSite.RIGHTMOVE,
last_seen=now,
photo_thumbnail=photos[0]["thumbnailUrl"] if len(photos) > 0 else None,
furnish_type=furnish_type,
available_from=available_from,
additional_info=listing_details,
)
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching details for {listing_id=}")
# TODO: dump to filesystem
return listing
class FetchImagesStep(Step):
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching images for {listing_id=}")
existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id]
)
if len(existing_listings) == 0:
raise ValueError(f"Listing {listing_id} not found")
listing = existing_listings[0]
base_path = Path("data/rs/")
all_floorplans = listing.additional_info.get("property", {}).get(
"floorplans", []
)
for floorplan in all_floorplans:
url = floorplan["url"]
picname = url.split("/")[-1]
floorplan_path = Path(base_path, str(listing.id), "floorplans", picname)
if floorplan_path.exists():
continue
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 404:
return listing
if response.status != 200:
raise Exception(f"Error for {url}: {response.status}")
floorplan_path.parent.mkdir(parents=True, exist_ok=True)
with open(floorplan_path, "wb") as f:
f.write(await response.read())
listing.floorplan_image_paths.append(str(floorplan_path))
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching images for {listing_id=}")
return listing
class DetectFloorplanStep(Step):
ocr_semaphore: asyncio.Semaphore
def __init__(self, listing_repository: ListingRepository):
super().__init__(listing_repository)
self.ocr_semaphore = asyncio.Semaphore(multiprocessing.cpu_count() // 4)
async def needs_processing(self, listing_id: int) -> bool:
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
return False
return listings[0].square_meters is None
async def process(self, listing_id: int) -> Listing:
logger.debug(f"Running floorplan detection for {listing_id=}")
listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0:
raise ValueError(f"Listing {listing_id} does not exist")
listing = listings[0]
sqms = []
for floorplan_path in listing.floorplan_image_paths:
async with self.ocr_semaphore:
estimated_sqm, _ = await asyncio.to_thread(
floorplan.calculate_ocr, floorplan_path
)
if estimated_sqm is not None:
sqms.append(estimated_sqm)
max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0
# if max_sqm is not None:
listing.square_meters = max_sqm
await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed running floorplan detection for {listing_id=}")
return listing

View file

@ -6,6 +6,7 @@ import pathlib
import click
import importlib
import listing_processor
from models.listing import FurnishType, ListingType, QueryParameters
from rec.districts import get_districts
from data_access import Listing

View file

@ -17,7 +17,7 @@ class Slack(Surface):
@lru_cache(maxsize=None)
def get_notifier(surfaces: list[Surface] | None = None) -> apprise.Apprise:
surfaces = surfaces or [Slack()]
surfaces = surfaces or list[Surface]([Slack()])
obj = apprise.Apprise()
for surface in surfaces:
if conn := surface.connection_string():

View file

@ -1,16 +1,17 @@
import asyncio
import importlib
import itertools
import logging
from pathlib import Path
from typing import Any
from celery import Celery, Task
from celery import Task
from celery_app import app
from models.listing import FurnishType, Listing, ListingType, QueryParameters
from listing_processor import ListingProcessor
from models.listing import Listing, ListingType, QueryParameters
from rec.districts import get_districts
from rec.query import listing_query
from repositories.listing_repository import ListingRepository
from database import engine
from tasks.task_state import TaskStatus
dump_listings_module = importlib.import_module("1_dump_listings")
dump_images_module = importlib.import_module("3_dump_images")
detect_floorplan_module = importlib.import_module("4_detect_floorplan")
@ -20,28 +21,61 @@ logger = logging.getLogger("uvicorn.error")
@app.task(bind=True, pydantic=True)
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
asyncio.run(dump_listings_full(self, parsed_parameters))
return {"progress": 1}
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
self.update_state(state="Starting...", meta={"progress": 0})
return {"progress": 0}
async def dump_listings_full(self: Task, parameters: QueryParameters) -> list[Listing]:
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
await dump_listings_full(task=Task(), parameters=parsed_parameters)
return {"progress": 0}
async def dump_listings_full(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans"""
self.update_state(state="FETCHING_LISTINGS", meta={"progress": 0.1})
repository = ListingRepository(engine)
new_listings = await dump_listings_module.dump_listings(parameters, repository)
self.update_state(state="FETCHING_FLOORPLANS", meta={"progress": 0.3})
logger.debug(f"Upserted {len(new_listings)} new listings")
logger.debug("Starting to fetch floorplans")
await dump_images_module.dump_images(repository)
self.update_state(state="RUNNING_OCR_ON_FLOORPLANS", meta={"progress": 0.6})
logger.debug("Completed fetching floorplans")
logger.debug("Starting floorplan detection")
await detect_floorplan_module.detect_floorplan(repository)
logger.debug("Completed floorplan detection")
# refresh listings
listings = await repository.get_listings(parameters) # this can be better
new_listings = [l for l in listings if l.id in new_listings]
return new_listings
missing_ids = await get_missing_listing_ids(parameters, repository)
logger.info(f"Found {len(missing_ids)} missing listings")
listing_processor = ListingProcessor(repository)
logger.info(f"Starting processing {len(missing_ids)} new listings")
return await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=missing_ids
)
async def dump_listings_and_monitor(
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
) -> list[Listing]:
task_progress = {missing_id: 0 for missing_id in missing_ids}
async def process(missing_id: int) -> Listing | None:
listing = await listing_processor.process_listing(missing_id)
task_progress[missing_id] = 1
return listing
async def monitor() -> None:
while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = progress / len(missing_ids)
logger.error(
f"Task progress: {task_progress}% ({progress} out of {len(missing_ids)})"
)
task.update_state(
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})",
meta={"progress": progress_ratio},
)
await asyncio.sleep(1)
processed_listings = await asyncio.gather(
*[process(id) for id in missing_ids], *[monitor()]
)
filtered_listings = [l for l in processed_listings if l is not None]
return filtered_listings
@app.on_after_finalize.connect
@ -57,5 +91,105 @@ def setup_periodic_tasks(sender, **kwargs):
max_price=4000,
).model_dump_json()
),
name='Daily dump of interesting rent listings',
name="Daily dump of interesting rent listings",
)
async def get_missing_listing_ids(
parameters: QueryParameters,
repository: ListingRepository,
) -> set[int]:
semaphore = asyncio.Semaphore(5) # if too high, rightmove drops connections
districts = await get_valid_districts_to_scrape(parameters.district_names)
json_responses: list[list[dict[str, Any]]] = await asyncio.gather(
*[
_fetch_listings_with_semaphore(semaphore, parameters, district)
for district in districts.keys()
],
)
json_responses_flat = list(itertools.chain.from_iterable(json_responses))
logger.debug(f"Total listings fetched {len(json_responses_flat)}")
identifiers: set[int] = set()
for response_json in json_responses_flat:
if response_json == {}:
continue
if response_json["totalAvailableResults"] == 0:
continue
for property in response_json["properties"]:
identifier = property["identifier"]
identifiers.add(identifier)
# if listing is already in db, do not fetch details again
all_listing_ids = {l.id for l in await repository.get_listings()}
missing_ids = all_listing_ids - identifiers
return missing_ids
async def get_valid_districts_to_scrape(
district_names: set[str] | None,
) -> dict[str, str]:
if district_names:
districts = {
district: locid
for district, locid in get_districts().items()
if district in district_names
}
else:
districts = get_districts()
return districts
async def _fetch_listings_with_semaphore(
semaphore: asyncio.Semaphore,
parameters: QueryParameters,
district: str,
) -> list[dict[str, Any]]:
result = []
# split the price in N bands to avoid the 1.5k capping by rightmove
# basically instead of 1 query with price between 1k and 5k that is capped at 1500 results
# we do 10 queries each with an increment in price range so we send more queries but each
# has a smaller chance of returning more than 1.5k results
number_of_steps = 1
price_step = parameters.max_price // number_of_steps
for step in range(number_of_steps):
min_price = step * price_step
max_price = (step + 1) * price_step
logger.debug(
f"Step {step} of {number_of_steps} with {min_price=} and {max_price=}"
)
for num_bedrooms in range(parameters.min_bedrooms, parameters.max_bedrooms + 1):
for page_id in range(
1,
3, # seems like all searches stop at 1500 entries (page_id * page_size)
):
logger.debug(f"Processing {page_id=} for {district=}")
async with semaphore:
try:
listing_query_result = await listing_query(
page=page_id,
channel=parameters.listing_type,
# min_bedrooms=parameters.min_bedrooms,
# max_bedrooms=parameters.max_bedrooms,
min_bedrooms=num_bedrooms,
max_bedrooms=num_bedrooms,
radius=parameters.radius,
min_price=min_price,
max_price=max_price,
district=district,
page_size=parameters.page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
)
except Exception as e:
if "GENERIC_ERROR" in str(e): # Too big page id
logger.debug(f"Max page id for {district=}: {page_id-1}")
break
raise e
result.append(listing_query_result)
return result