Add comprehensive logging to Celery tasks and listing processor

This commit is contained in:
Viktor Barzin 2026-02-02 23:01:13 +00:00
parent 33ae5c91a2
commit 3267adca66
2 changed files with 269 additions and 107 deletions

View file

@ -13,6 +13,9 @@ from repositories.listing_repository import ListingRepository
logger = logging.getLogger("uvicorn.error") logger = logging.getLogger("uvicorn.error")
# Also use celery task logger for visibility in worker output
celery_logger = logging.getLogger("celery.task")
class ListingProcessor: class ListingProcessor:
semaphore: asyncio.Semaphore semaphore: asyncio.Semaphore
@ -36,15 +39,16 @@ class ListingProcessor:
for step in self.process_steps: for step in self.process_steps:
if await step.needs_processing(listing_id): if await step.needs_processing(listing_id):
async with self.semaphore: async with self.semaphore:
step_name = step.__class__.__name__
try: try:
listing = await step.process(listing_id) listing = await step.process(listing_id)
logger.debug(f"[{listing_id}] {step_name} completed")
except Exception as e: except Exception as e:
logger.error(f"Failed to process {listing_id=}: {e}") logger.error(f"[{listing_id}] {step_name} failed: {e}")
celery_logger.error(f"[{listing_id}] {step_name} failed: {e}")
return None return None
return listing return listing
async def listing_exists(self, listing_id: int) -> bool: ...
class Step: class Step:
listing_repository: ListingRepository listing_repository: ListingRepository
@ -65,19 +69,23 @@ class FetchListingDetailsStep(Step):
existing_listings = await self.listing_repository.get_listings( existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id] only_ids=[listing_id]
) )
if (existing_listings) == 0: if len(existing_listings) == 0:
return True return True
return False return False
async def process(self, listing_id: int) -> Listing: async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching details for {listing_id=}") logger.debug(f"[{listing_id}] Fetching property details from API")
celery_logger.info(f"[{listing_id}] Fetching details...")
existing_listings = await self.listing_repository.get_listings( existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id] only_ids=[listing_id]
) )
now = datetime.now() now = datetime.now()
if len(existing_listings) > 0: if len(existing_listings) > 0:
# listing exists, do not refresh # listing exists, do not refresh
logger.debug(f"[{listing_id}] Already exists, skipping refresh")
return existing_listings[0] return existing_listings[0]
listing_details = await detail_query(listing_id) listing_details = await detail_query(listing_id)
furnish_type_str = listing_details["property"].get("letFurnishType", "unknown") furnish_type_str = listing_details["property"].get("letFurnishType", "unknown")
@ -124,7 +132,12 @@ class FetchListingDetailsStep(Step):
additional_info=listing_details, additional_info=listing_details,
) )
await self.listing_repository.upsert_listings([listing]) await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching details for {listing_id=}")
celery_logger.info(
f"[{listing_id}] Details fetched: £{listing.price}, "
f"{listing.number_of_bedrooms}BR, {listing.agency}"
)
logger.debug(f"[{listing_id}] Details fetch complete")
# TODO: dump to filesystem # TODO: dump to filesystem
return listing return listing
@ -140,7 +153,8 @@ class FetchImagesStep(Step):
return len(listing.floorplan_image_paths) == 0 return len(listing.floorplan_image_paths) == 0
async def process(self, listing_id: int) -> Listing: async def process(self, listing_id: int) -> Listing:
logger.debug(f"Fetching images for {listing_id=}") logger.debug(f"[{listing_id}] Fetching floorplan images")
existing_listings = await self.listing_repository.get_listings( existing_listings = await self.listing_repository.get_listings(
only_ids=[listing_id] only_ids=[listing_id]
) )
@ -152,6 +166,12 @@ class FetchImagesStep(Step):
all_floorplans = listing.additional_info.get("property", {}).get( all_floorplans = listing.additional_info.get("property", {}).get(
"floorplans", [] "floorplans", []
) )
if len(all_floorplans) == 0:
logger.debug(f"[{listing_id}] No floorplans available")
return listing
downloaded = 0
client_timeout = aiohttp.ClientTimeout(total=30) client_timeout = aiohttp.ClientTimeout(total=30)
for floorplan_obj in all_floorplans: for floorplan_obj in all_floorplans:
url = floorplan_obj["url"] url = floorplan_obj["url"]
@ -169,8 +189,12 @@ class FetchImagesStep(Step):
with open(floorplan_path, "wb") as f: with open(floorplan_path, "wb") as f:
f.write(await response.read()) f.write(await response.read())
listing.floorplan_image_paths.append(str(floorplan_path)) listing.floorplan_image_paths.append(str(floorplan_path))
downloaded += 1
await self.listing_repository.upsert_listings([listing]) await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed fetching images for {listing_id=}")
celery_logger.info(f"[{listing_id}] Downloaded {downloaded} floorplan images")
logger.debug(f"[{listing_id}] Image fetch complete")
return listing return listing
@ -188,11 +212,19 @@ class DetectFloorplanStep(Step):
return listings[0].square_meters is None return listings[0].square_meters is None
async def process(self, listing_id: int) -> Listing: async def process(self, listing_id: int) -> Listing:
logger.debug(f"Running floorplan detection for {listing_id=}") logger.debug(f"[{listing_id}] Running OCR on floorplans")
listings = await self.listing_repository.get_listings(only_ids=[listing_id]) listings = await self.listing_repository.get_listings(only_ids=[listing_id])
if len(listings) == 0: if len(listings) == 0:
raise ValueError(f"Listing {listing_id} does not exist") raise ValueError(f"Listing {listing_id} does not exist")
listing = listings[0] listing = listings[0]
if len(listing.floorplan_image_paths) == 0:
logger.debug(f"[{listing_id}] No floorplan images to process")
listing.square_meters = 0
await self.listing_repository.upsert_listings([listing])
return listing
sqms = [] sqms = []
for floorplan_path in listing.floorplan_image_paths: for floorplan_path in listing.floorplan_image_paths:
async with self.ocr_semaphore: async with self.ocr_semaphore:
@ -201,9 +233,15 @@ class DetectFloorplanStep(Step):
) )
if estimated_sqm is not None: if estimated_sqm is not None:
sqms.append(estimated_sqm) sqms.append(estimated_sqm)
max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0 max_sqm = max(sqms, default=0) # try once, if we fail, keep as 0
# if max_sqm is not None:
listing.square_meters = max_sqm listing.square_meters = max_sqm
await self.listing_repository.upsert_listings([listing]) await self.listing_repository.upsert_listings([listing])
logger.debug(f"Completed running floorplan detection for {listing_id=}")
if max_sqm > 0:
celery_logger.info(f"[{listing_id}] OCR detected {max_sqm} sqm")
else:
logger.debug(f"[{listing_id}] OCR: no square meters detected")
logger.debug(f"[{listing_id}] OCR complete")
return listing return listing

View file

@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import time
from typing import Any from typing import Any
from celery import Task from celery import Task
from celery.schedules import crontab from celery.schedules import crontab
@ -9,6 +10,8 @@ from config.scraper_config import ScraperConfig
from listing_processor import ListingProcessor from listing_processor import ListingProcessor
from models.listing import Listing, QueryParameters from models.listing import Listing, QueryParameters
from rec.query import create_session, listing_query from rec.query import create_session, listing_query
from rec.exceptions import CircuitBreakerOpenError, ThrottlingError
from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics
from repositories.listing_repository import ListingRepository from repositories.listing_repository import ListingRepository
from database import engine from database import engine
from services.query_splitter import QuerySplitter, SubQuery from services.query_splitter import QuerySplitter, SubQuery
@ -16,6 +19,16 @@ from utils.redis_lock import redis_lock
logger = logging.getLogger("uvicorn.error") logger = logging.getLogger("uvicorn.error")
# Also configure a celery-specific logger that always outputs to stdout
celery_logger = logging.getLogger("celery.task")
if not celery_logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))
celery_logger.addHandler(handler)
celery_logger.setLevel(logging.INFO)
SCRAPE_LOCK_NAME = "scrape_listings" SCRAPE_LOCK_NAME = "scrape_listings"
@ -23,12 +36,18 @@ SCRAPE_LOCK_NAME = "scrape_listings"
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
with redis_lock(SCRAPE_LOCK_NAME) as acquired: with redis_lock(SCRAPE_LOCK_NAME) as acquired:
if not acquired: if not acquired:
logger.warning("Another scrape job is already running, skipping this execution") msg = "Another scrape job is already running, skipping this execution"
logger.warning(msg)
celery_logger.warning(msg)
self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"}) self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"})
return {"status": "skipped", "reason": "another_job_running"} return {"status": "skipped", "reason": "another_job_running"}
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
parsed_parameters = QueryParameters.model_validate_json(parameters_json) parsed_parameters = QueryParameters.model_validate_json(parameters_json)
celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}")
self.update_state(state="Starting...", meta={"progress": 0}) self.update_state(state="Starting...", meta={"progress": 0})
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
return {"progress": 0} return {"progress": 0}
@ -50,46 +69,91 @@ async def dump_listings_full(
*, task: Task, parameters: QueryParameters *, task: Task, parameters: QueryParameters
) -> list[Listing]: ) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans""" """Fetches all listings, images as well as detects floorplans"""
start_time = time.time()
celery_logger.info("=" * 60)
celery_logger.info("PHASE 1: Initializing listing fetch")
celery_logger.info("=" * 60)
repository = ListingRepository(engine) repository = ListingRepository(engine)
task.update_state(state="Identifying missing listings", meta={"progress": 0}) task.update_state(state="Identifying missing listings", meta={"progress": 0})
celery_logger.info("Querying Rightmove API to identify new listings...")
ids_to_process = await get_ids_to_process( ids_to_process = await get_ids_to_process(
parameters=parameters, repository=repository, task=task parameters=parameters, repository=repository, task=task
) )
celery_logger.info(f"Found {len(ids_to_process)} new listings to process")
logger.info(f"Found {len(ids_to_process)} listings to process") logger.info(f"Found {len(ids_to_process)} listings to process")
if len(ids_to_process) == 0: if len(ids_to_process) == 0:
elapsed = time.time() - start_time
celery_logger.info(f"No new listings found. Completed in {elapsed:.1f}s")
task.update_state( task.update_state(
state="No new listings found", state="No new listings found",
meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"}, meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"},
) )
return [] return []
celery_logger.info("=" * 60)
celery_logger.info("PHASE 2: Processing listings (fetch details, images, OCR)")
celery_logger.info("=" * 60)
listing_processor = ListingProcessor(repository) listing_processor = ListingProcessor(repository)
celery_logger.info(f"Starting processing {len(ids_to_process)} listings")
logger.info(f"Starting processing {len(ids_to_process)} listings") logger.info(f"Starting processing {len(ids_to_process)} listings")
return await dump_listings_and_monitor(
result = await dump_listings_and_monitor(
task=task, listing_processor=listing_processor, missing_ids=ids_to_process task=task, listing_processor=listing_processor, missing_ids=ids_to_process
) )
elapsed = time.time() - start_time
celery_logger.info("=" * 60)
celery_logger.info(f"COMPLETED: Processed {len(result)} listings in {elapsed:.1f}s")
celery_logger.info("=" * 60)
return result
async def dump_listings_and_monitor( async def dump_listings_and_monitor(
*, task: Task, listing_processor: ListingProcessor, missing_ids: set[int] *, task: Task, listing_processor: ListingProcessor, missing_ids: set[int]
) -> list[Listing]: ) -> list[Listing]:
task_progress = {missing_id: 0 for missing_id in missing_ids} task_progress = {missing_id: 0 for missing_id in missing_ids}
processed_count = 0
failed_count = 0
start_time = time.time()
async def process(missing_id: int) -> Listing | None: async def process(missing_id: int) -> Listing | None:
nonlocal processed_count, failed_count
listing = await listing_processor.process_listing(missing_id) listing = await listing_processor.process_listing(missing_id)
task_progress[missing_id] = 1 task_progress[missing_id] = 1
if listing is not None:
processed_count += 1
else:
failed_count += 1
return listing return listing
async def monitor() -> None: async def monitor() -> None:
last_progress = 0
while (progress := sum(task_progress.values())) < len(missing_ids): while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2) progress_ratio = round(progress / len(missing_ids), 2)
logger.error(
f"Task progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})" # Log every 10% progress or at least every update
) if progress_ratio >= last_progress + 0.1 or progress == 1:
elapsed = time.time() - start_time
rate = progress / elapsed if elapsed > 0 else 0
eta = (len(missing_ids) - progress) / rate if rate > 0 else 0
celery_logger.info(
f"Progress: {progress_ratio * 100:.0f}% "
f"({progress}/{len(missing_ids)}) "
f"| Elapsed: {elapsed:.0f}s "
f"| Rate: {rate:.1f}/s "
f"| ETA: {eta:.0f}s"
)
last_progress = progress_ratio
task.update_state( task.update_state(
state=f"Progress: {progress_ratio * 100}% ({progress} out of {len(missing_ids)})", state=f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})",
meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)}, meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)},
) )
await asyncio.sleep(1) await asyncio.sleep(1)
@ -97,7 +161,11 @@ async def dump_listings_and_monitor(
processed_listings = await asyncio.gather( processed_listings = await asyncio.gather(
*[process(id) for id in missing_ids], *[monitor()] *[process(id) for id in missing_ids], *[monitor()]
) )
filtered_listings = [l for l in processed_listings if l is not None] filtered_listings = [listing for listing in processed_listings if listing is not None]
celery_logger.info(
f"Processing complete: {processed_count} successful, {failed_count} failed"
)
return filtered_listings return filtered_listings
@ -149,115 +217,171 @@ async def get_ids_to_process(
config = ScraperConfig.from_env() config = ScraperConfig.from_env()
splitter = QuerySplitter(config) splitter = QuerySplitter(config)
# Reset throttle metrics
reset_throttle_metrics()
def on_progress(phase: str, message: str) -> None: def on_progress(phase: str, message: str) -> None:
task.update_state(state=message, meta={"phase": phase}) task.update_state(state=message, meta={"phase": phase})
celery_logger.info(f"[{phase}] {message}")
async with create_session(config) as session: celery_logger.info("Starting query splitting and probing...")
# Phase 1 & 2: Split and probe queries
task.update_state(
state="Analyzing query and splitting by price bands...",
meta={"phase": "splitting", "progress": 0},
)
subqueries = await splitter.split(parameters, session, on_progress)
total_estimated = splitter.calculate_total_estimated_results(subqueries) try:
logger.info( async with create_session(config) as session:
f"Split into {len(subqueries)} subqueries, " # Phase 1 & 2: Split and probe queries
f"estimated {total_estimated} total results" task.update_state(
) state="Analyzing query and splitting by price bands...",
meta={"phase": "splitting", "progress": 0},
)
subqueries = await splitter.split(parameters, session, on_progress)
# Phase 3: Fetch all pages for each subquery total_estimated = splitter.calculate_total_estimated_results(subqueries)
task.update_state( celery_logger.info(
state=f"Fetching listings from {len(subqueries)} subqueries...", f"Query split complete: {len(subqueries)} subqueries, "
meta={ f"~{total_estimated} estimated total results"
"phase": "fetching", )
"subqueries": len(subqueries), logger.info(
"estimated_results": total_estimated, f"Split into {len(subqueries)} subqueries, "
}, f"estimated {total_estimated} total results"
)
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
identifiers: set[int] = set()
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
"""Fetch all pages for a single subquery."""
results: list[dict[str, Any]] = []
# Calculate how many pages we need based on estimated results
estimated = sq.estimated_results or 0
if estimated == 0:
return results
# Fetch pages up to max_pages_per_query or until no more results
page_size = parameters.page_size
max_pages = min(
config.max_pages_per_query,
(estimated // page_size) + 1,
) )
for page_id in range(1, max_pages + 1): # Phase 3: Fetch all pages for each subquery
async with semaphore: task.update_state(
await asyncio.sleep(config.request_delay_ms / 1000) state=f"Fetching listings from {len(subqueries)} subqueries...",
try: meta={
result = await listing_query( "phase": "fetching",
page=page_id, "subqueries": len(subqueries),
channel=parameters.listing_type, "estimated_results": total_estimated,
min_bedrooms=sq.min_bedrooms, },
max_bedrooms=sq.max_bedrooms, )
radius=parameters.radius,
min_price=sq.min_price,
max_price=sq.max_price,
district=sq.district,
page_size=page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
session=session,
)
results.append(result)
# Check if we've received all results celery_logger.info(f"Fetching pages from {len(subqueries)} subqueries...")
properties = result.get("properties", [])
if len(properties) < page_size: semaphore = asyncio.Semaphore(config.max_concurrent_requests)
# No more results on next page identifiers: set[int] = set()
completed_subqueries = 0
total_pages_fetched = 0
async def fetch_subquery(sq: SubQuery) -> list[dict[str, Any]]:
"""Fetch all pages for a single subquery."""
nonlocal completed_subqueries, total_pages_fetched
results: list[dict[str, Any]] = []
# Calculate how many pages we need based on estimated results
estimated = sq.estimated_results or 0
if estimated == 0:
completed_subqueries += 1
return results
# Fetch pages up to max_pages_per_query or until no more results
page_size = parameters.page_size
max_pages = min(
config.max_pages_per_query,
(estimated // page_size) + 1,
)
for page_id in range(1, max_pages + 1):
async with semaphore:
await asyncio.sleep(config.request_delay_ms / 1000)
try:
result = await listing_query(
page=page_id,
channel=parameters.listing_type,
min_bedrooms=sq.min_bedrooms,
max_bedrooms=sq.max_bedrooms,
radius=parameters.radius,
min_price=sq.min_price,
max_price=sq.max_price,
district=sq.district,
page_size=page_size,
max_days_since_added=parameters.max_days_since_added,
furnish_types=parameters.furnish_types or [],
session=session,
config=config,
)
results.append(result)
total_pages_fetched += 1
# Check if we've received all results
properties = result.get("properties", [])
if len(properties) < page_size:
# No more results on next page
break
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker open: {e}")
break break
except ThrottlingError as e:
except Exception as e: celery_logger.warning(
if "GENERIC_ERROR" in str(e): f"Throttling on {sq.district} page {page_id}: {e}"
# Reached end of results )
logger.debug( break
f"Max page for {sq.district}: {page_id - 1}" except Exception as e:
if "GENERIC_ERROR" in str(e):
# Reached end of results
logger.debug(
f"Max page for {sq.district}: {page_id - 1}"
)
break
logger.warning(
f"Error fetching page {page_id} for {sq.district}: {e}"
) )
break break
logger.warning(
f"Error fetching page {page_id} for {sq.district}: {e}"
)
break
return results completed_subqueries += 1
return results
# Fetch all subqueries concurrently # Fetch all subqueries concurrently
all_results = await asyncio.gather( all_results = await asyncio.gather(
*[fetch_subquery(sq) for sq in subqueries] *[fetch_subquery(sq) for sq in subqueries]
) )
# Extract identifiers from all results celery_logger.info(
for subquery_results in all_results: f"Fetch complete: {total_pages_fetched} pages from "
for response_json in subquery_results: f"{completed_subqueries} subqueries"
if not response_json: )
continue
if response_json.get("totalAvailableResults", 0) == 0:
continue
for property_data in response_json.get("properties", []):
identifier = property_data.get("identifier")
if identifier:
identifiers.add(identifier)
# Extract identifiers from all results
for subquery_results in all_results:
for response_json in subquery_results:
if not response_json:
continue
if response_json.get("totalAvailableResults", 0) == 0:
continue
for property_data in response_json.get("properties", []):
identifier = property_data.get("identifier")
if identifier:
identifiers.add(identifier)
except CircuitBreakerOpenError as e:
celery_logger.error(f"Circuit breaker prevented query: {e}")
# Log throttle metrics
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(metrics.summary())
return set()
finally:
# Log throttle metrics
metrics = get_throttle_metrics()
if metrics.total_requests > 0:
celery_logger.info(f"API Stats: {metrics.total_requests} requests, "
f"avg {metrics.average_response_time:.2f}s, "
f"{metrics.total_throttling_events} throttled")
celery_logger.info(f"Found {len(identifiers)} unique listing IDs from API")
logger.info(f"Found {len(identifiers)} unique listings") logger.info(f"Found {len(identifiers)} unique listings")
# Filter out listings already in the database # Filter out listings already in the database
all_listing_ids = {l.id for l in await repository.get_listings()} celery_logger.info("Checking database for existing listings...")
all_listing_ids = {listing.id for listing in await repository.get_listings()}
new_ids = identifiers - all_listing_ids new_ids = identifiers - all_listing_ids
celery_logger.info(
f"Filtering: {len(identifiers)} total, "
f"{len(all_listing_ids)} existing in DB, "
f"{len(new_ids)} new to process"
)
task.update_state( task.update_state(
state=f"Found {len(new_ids)} new listings to process", state=f"Found {len(new_ids)} new listings to process",
meta={ meta={