Replace the sequential fetch-all-then-process pipeline with a streaming architecture where listing processing starts as soon as IDs become available from each subquery. A producer task fetches pages and enqueues new IDs (filtered inline against DB), while 20 consumer workers process listings concurrently from the queue. - Add ListingRepository.get_listing_ids() for fast ID-only projection - Refactor listing_tasks.py: remove get_ids_to_process/dump_listings_and_monitor, replace with unified producer/worker/monitor pipeline - Apply same pattern to CLI path in listing_fetcher.py - Remove 'filtering' phase from frontend, show combined fetch+process metrics - Add fetching_done flag to TaskResult for phase transition tracking
458 lines
18 KiB
Python
458 lines
18 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from collections import deque
|
|
from typing import Any
|
|
from celery import Task
|
|
from celery.schedules import crontab
|
|
from celery_app import app
|
|
from config.schedule_config import SchedulesConfig
|
|
from config.scraper_config import ScraperConfig
|
|
from listing_processor import ListingProcessor
|
|
from models.listing import Listing, QueryParameters
|
|
from rec.query import create_session, listing_query
|
|
from rec.exceptions import CircuitBreakerOpenError, ThrottlingError
|
|
from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics
|
|
from repositories.listing_repository import ListingRepository
|
|
from database import engine
|
|
from services.query_splitter import QuerySplitter, SubQuery
|
|
from utils.redis_lock import redis_lock
|
|
from services.listing_cache import invalidate_cache
|
|
|
|
logger = logging.getLogger("uvicorn.error")
|
|
|
|
# Also configure a celery-specific logger that always outputs to stdout
|
|
celery_logger = logging.getLogger("celery.task")
|
|
if not celery_logger.handlers:
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
|
))
|
|
celery_logger.addHandler(handler)
|
|
celery_logger.setLevel(logging.INFO)
|
|
|
|
SCRAPE_LOCK_NAME = "scrape_listings"
|
|
LOG_BUFFER_MAX_LINES = 200
|
|
|
|
# Module-level log buffer — active only during task execution.
|
|
# The TaskLogHandler appends here; _update_task_state reads from here.
|
|
_active_log_buffer: deque[str] | None = None
|
|
|
|
|
|
class TaskLogHandler(logging.Handler):
|
|
"""Captures log records into a deque for inclusion in task state updates."""
|
|
|
|
def __init__(self, buffer: deque[str]) -> None:
|
|
super().__init__()
|
|
self.buffer = buffer
|
|
|
|
def emit(self, record: logging.LogRecord) -> None:
|
|
try:
|
|
self.buffer.append(self.format(record))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _update_task_state(task: Task, state: str, meta: dict[str, Any]) -> None:
|
|
"""Call task.update_state with logs injected from the active log buffer."""
|
|
if _active_log_buffer is not None:
|
|
meta["logs"] = list(_active_log_buffer)
|
|
task.update_state(state=state, meta=meta)
|
|
|
|
|
|
@app.task(bind=True, pydantic=True)
|
|
def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
msg = "Another scrape job is already running, skipping this execution"
|
|
logger.warning(msg)
|
|
celery_logger.warning(msg)
|
|
self.update_state(state="SKIPPED", meta={"reason": "Another scrape job is running"})
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}")
|
|
|
|
self.update_state(state="Starting...", meta={"phase": "splitting", "progress": 0})
|
|
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
|
|
return {"phase": "completed", "progress": 1}
|
|
|
|
|
|
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
|
|
with redis_lock(SCRAPE_LOCK_NAME) as acquired:
|
|
if not acquired:
|
|
logger.warning("Another scrape job is already running, skipping this execution")
|
|
return {"status": "skipped", "reason": "another_job_running"}
|
|
|
|
logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}")
|
|
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
|
|
await dump_listings_full(task=Task(), parameters=parsed_parameters)
|
|
return {"progress": 0}
|
|
|
|
|
|
async def dump_listings_full(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Fetches all listings, images as well as detects floorplans"""
|
|
global _active_log_buffer
|
|
|
|
# Set up log capture into a module-level buffer so _update_task_state
|
|
# can inject logs into every state update.
|
|
log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES)
|
|
log_handler = TaskLogHandler(log_buffer)
|
|
log_handler.setFormatter(
|
|
logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")
|
|
)
|
|
|
|
# Attach handler to both loggers used in the codebase, and ensure
|
|
# they accept INFO-level messages (Celery's worker setup may leave
|
|
# the celery.task logger at WARNING).
|
|
_prev_celery_level = celery_logger.level
|
|
_prev_logger_level = logger.level
|
|
celery_logger.addHandler(log_handler)
|
|
logger.addHandler(log_handler)
|
|
if celery_logger.level == logging.NOTSET or celery_logger.level > logging.INFO:
|
|
celery_logger.setLevel(logging.INFO)
|
|
if logger.level == logging.NOTSET or logger.level > logging.INFO:
|
|
logger.setLevel(logging.INFO)
|
|
|
|
_active_log_buffer = log_buffer
|
|
|
|
try:
|
|
return await _dump_listings_full_inner(task=task, parameters=parameters)
|
|
finally:
|
|
_active_log_buffer = None
|
|
celery_logger.removeHandler(log_handler)
|
|
logger.removeHandler(log_handler)
|
|
celery_logger.setLevel(_prev_celery_level)
|
|
logger.setLevel(_prev_logger_level)
|
|
|
|
|
|
async def _dump_listings_full_inner(
|
|
*, task: Task, parameters: QueryParameters
|
|
) -> list[Listing]:
|
|
"""Inner implementation with log capture active.
|
|
|
|
Uses a streaming pipeline: an asyncio.Queue bridges the fetcher (producer)
|
|
and processor workers (consumers) so that listing processing starts as
|
|
soon as IDs become available from each subquery.
|
|
"""
|
|
start_time = time.time()
|
|
NUM_WORKERS = 20
|
|
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info("PHASE 1: Splitting queries")
|
|
celery_logger.info("=" * 60)
|
|
|
|
repository = ListingRepository(engine)
|
|
config = ScraperConfig.from_env()
|
|
splitter = QuerySplitter(config)
|
|
|
|
# Reset throttle metrics
|
|
reset_throttle_metrics()
|
|
|
|
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
|
|
meta: dict[str, Any] = {"phase": phase, "message": message}
|
|
meta.update(kwargs)
|
|
_update_task_state(task, message, meta)
|
|
celery_logger.info(f"[{phase}] {message}")
|
|
|
|
_update_task_state(task, "Analyzing query and splitting by price bands...", {
|
|
"phase": "splitting", "progress": 0,
|
|
})
|
|
celery_logger.info("Starting query splitting and probing...")
|
|
|
|
try:
|
|
async with create_session(config) as session:
|
|
subqueries = await splitter.split(parameters, session, on_progress)
|
|
|
|
total_estimated = splitter.calculate_total_estimated_results(subqueries)
|
|
celery_logger.info(
|
|
f"Query split complete: {len(subqueries)} subqueries, "
|
|
f"~{total_estimated} estimated total results"
|
|
)
|
|
|
|
# Load existing IDs (fast, ID-only projection)
|
|
celery_logger.info("Loading existing listing IDs from database...")
|
|
existing_ids = repository.get_listing_ids(parameters.listing_type)
|
|
celery_logger.info(f"Found {len(existing_ids)} existing listings in DB")
|
|
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info("PHASE 2: Streaming fetch & process")
|
|
celery_logger.info("=" * 60)
|
|
|
|
# Shared state for the streaming pipeline
|
|
queue: asyncio.Queue[int | None] = asyncio.Queue()
|
|
ids_collected = 0
|
|
completed_subqueries = 0
|
|
total_pages_fetched = 0
|
|
fetching_done = False
|
|
processed_count = 0
|
|
failed_count = 0
|
|
details_fetched = 0
|
|
images_downloaded = 0
|
|
ocr_completed = 0
|
|
processed_listings: list[Listing] = []
|
|
semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
|
|
|
_update_task_state(
|
|
task,
|
|
f"Fetching listings from {len(subqueries)} subqueries...",
|
|
{
|
|
"phase": "fetching",
|
|
"subqueries_completed": 0,
|
|
"subqueries_total": len(subqueries),
|
|
"ids_collected": 0,
|
|
"pages_fetched": 0,
|
|
"estimated_results": total_estimated,
|
|
"fetching_done": False,
|
|
},
|
|
)
|
|
|
|
listing_processor = ListingProcessor(repository)
|
|
|
|
# --- Producer: fetch subquery pages and enqueue new IDs ---
|
|
async def producer() -> None:
|
|
nonlocal ids_collected, completed_subqueries, total_pages_fetched
|
|
nonlocal fetching_done
|
|
|
|
async def fetch_subquery(sq: SubQuery) -> None:
|
|
nonlocal ids_collected, completed_subqueries, total_pages_fetched
|
|
|
|
estimated = sq.estimated_results or 0
|
|
if estimated == 0:
|
|
completed_subqueries += 1
|
|
return
|
|
|
|
page_size = parameters.page_size
|
|
max_pages = min(
|
|
config.max_pages_per_query,
|
|
(estimated // page_size) + 1,
|
|
)
|
|
|
|
for page_id in range(1, max_pages + 1):
|
|
async with semaphore:
|
|
await asyncio.sleep(config.request_delay_ms / 1000)
|
|
try:
|
|
result = await listing_query(
|
|
page=page_id,
|
|
channel=parameters.listing_type,
|
|
min_bedrooms=sq.min_bedrooms,
|
|
max_bedrooms=sq.max_bedrooms,
|
|
radius=parameters.radius,
|
|
min_price=sq.min_price,
|
|
max_price=sq.max_price,
|
|
district=sq.district,
|
|
page_size=page_size,
|
|
max_days_since_added=parameters.max_days_since_added,
|
|
furnish_types=parameters.furnish_types or [],
|
|
session=session,
|
|
config=config,
|
|
)
|
|
total_pages_fetched += 1
|
|
|
|
# Extract and enqueue new IDs inline
|
|
properties = result.get("properties", [])
|
|
for prop in properties:
|
|
identifier = prop.get("identifier")
|
|
if identifier and identifier not in existing_ids:
|
|
existing_ids.add(identifier)
|
|
ids_collected += 1
|
|
await queue.put(identifier)
|
|
|
|
if len(properties) < page_size:
|
|
break
|
|
|
|
except CircuitBreakerOpenError as e:
|
|
celery_logger.error(f"Circuit breaker open: {e}")
|
|
break
|
|
except ThrottlingError as e:
|
|
celery_logger.warning(
|
|
f"Throttling on {sq.district} page {page_id}: {e}"
|
|
)
|
|
break
|
|
except Exception as e:
|
|
if "GENERIC_ERROR" in str(e):
|
|
logger.debug(
|
|
f"Max page for {sq.district}: {page_id - 1}"
|
|
)
|
|
break
|
|
logger.warning(
|
|
f"Error fetching page {page_id} for "
|
|
f"{sq.district}: {e}"
|
|
)
|
|
break
|
|
|
|
completed_subqueries += 1
|
|
|
|
# Fetch all subqueries concurrently
|
|
await asyncio.gather(
|
|
*[fetch_subquery(sq) for sq in subqueries]
|
|
)
|
|
|
|
celery_logger.info(
|
|
f"Fetch complete: {total_pages_fetched} pages from "
|
|
f"{completed_subqueries} subqueries, "
|
|
f"{ids_collected} new IDs"
|
|
)
|
|
fetching_done = True
|
|
|
|
# Send sentinel values to stop workers
|
|
for _ in range(NUM_WORKERS):
|
|
await queue.put(None)
|
|
|
|
# --- Consumer workers: process listings from queue ---
|
|
async def worker() -> None:
|
|
nonlocal processed_count, failed_count
|
|
nonlocal details_fetched, images_downloaded, ocr_completed
|
|
|
|
while True:
|
|
listing_id = await queue.get()
|
|
if listing_id is None:
|
|
break
|
|
|
|
def step_callback(step_name: str) -> None:
|
|
nonlocal details_fetched, images_downloaded, ocr_completed
|
|
if step_name == "details":
|
|
details_fetched += 1
|
|
elif step_name == "images":
|
|
images_downloaded += 1
|
|
elif step_name == "ocr":
|
|
ocr_completed += 1
|
|
|
|
listing = await listing_processor.process_listing(
|
|
listing_id, on_step_complete=step_callback
|
|
)
|
|
if listing is not None:
|
|
processed_count += 1
|
|
processed_listings.append(listing)
|
|
else:
|
|
failed_count += 1
|
|
|
|
# --- Monitor: reports combined progress ---
|
|
async def monitor() -> None:
|
|
last_progress = 0.0
|
|
|
|
while True:
|
|
total = ids_collected
|
|
done = processed_count + failed_count
|
|
|
|
if fetching_done and done >= total and total > 0:
|
|
break
|
|
if fetching_done and total == 0:
|
|
break
|
|
|
|
# Determine phase label
|
|
phase = "processing" if fetching_done else "fetching"
|
|
|
|
if total > 0:
|
|
progress_ratio = round(done / total, 2)
|
|
else:
|
|
progress_ratio = 0.0
|
|
|
|
elapsed = time.time() - start_time
|
|
rate = done / elapsed if elapsed > 0 else 0
|
|
remaining = (total - done) if total > 0 else 0
|
|
eta = remaining / rate if rate > 0 else 0
|
|
|
|
if progress_ratio >= last_progress + 0.1 or done == 1:
|
|
celery_logger.info(
|
|
f"Progress: {progress_ratio * 100:.0f}% "
|
|
f"({done}/{total}) "
|
|
f"| Elapsed: {elapsed:.0f}s "
|
|
f"| Rate: {rate:.1f}/s "
|
|
f"| ETA: {eta:.0f}s"
|
|
)
|
|
last_progress = progress_ratio
|
|
|
|
_update_task_state(
|
|
task,
|
|
f"{'Processing' if fetching_done else 'Fetching & processing'}: "
|
|
f"{done}/{total}",
|
|
{
|
|
"phase": phase,
|
|
"progress": progress_ratio,
|
|
"processed": done,
|
|
"total": total,
|
|
"subqueries_completed": completed_subqueries,
|
|
"subqueries_total": len(subqueries),
|
|
"ids_collected": ids_collected,
|
|
"pages_fetched": total_pages_fetched,
|
|
"fetching_done": fetching_done,
|
|
"details_fetched": details_fetched,
|
|
"images_downloaded": images_downloaded,
|
|
"ocr_completed": ocr_completed,
|
|
"failed": failed_count,
|
|
"elapsed_seconds": round(elapsed, 1),
|
|
"rate_per_second": round(rate, 2),
|
|
"eta_seconds": round(eta, 1),
|
|
},
|
|
)
|
|
await asyncio.sleep(1)
|
|
|
|
# Run producer, workers, and monitor concurrently
|
|
await asyncio.gather(
|
|
producer(),
|
|
*[worker() for _ in range(NUM_WORKERS)],
|
|
monitor(),
|
|
)
|
|
|
|
except CircuitBreakerOpenError as e:
|
|
celery_logger.error(f"Circuit breaker prevented query: {e}")
|
|
metrics = get_throttle_metrics()
|
|
if metrics.total_requests > 0:
|
|
celery_logger.info(metrics.summary())
|
|
return []
|
|
finally:
|
|
metrics = get_throttle_metrics()
|
|
if metrics.total_requests > 0:
|
|
celery_logger.info(
|
|
f"API Stats: {metrics.total_requests} requests, "
|
|
f"avg {metrics.average_response_time:.2f}s, "
|
|
f"{metrics.total_throttling_events} throttled"
|
|
)
|
|
|
|
elapsed = time.time() - start_time
|
|
celery_logger.info("=" * 60)
|
|
celery_logger.info(
|
|
f"COMPLETED: Processed {len(processed_listings)} listings in {elapsed:.1f}s"
|
|
)
|
|
celery_logger.info("=" * 60)
|
|
|
|
invalidate_cache()
|
|
|
|
_update_task_state(task, "Completed", {
|
|
"phase": "completed", "progress": 1,
|
|
"processed": len(processed_listings), "total": ids_collected,
|
|
"message": f"Processed {len(processed_listings)} listings in {elapsed:.0f}s",
|
|
})
|
|
|
|
return processed_listings
|
|
|
|
|
|
@app.on_after_finalize.connect
|
|
def setup_periodic_tasks(sender, **kwargs):
|
|
"""Register periodic tasks from environment configuration."""
|
|
try:
|
|
config = SchedulesConfig.from_env()
|
|
except ValueError as e:
|
|
logger.error(f"Failed to load schedule configuration: {e}")
|
|
return
|
|
|
|
for schedule in config.get_enabled_schedules():
|
|
logger.info(
|
|
f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}"
|
|
)
|
|
|
|
sender.add_periodic_task(
|
|
crontab(
|
|
minute=schedule.minute,
|
|
hour=schedule.hour,
|
|
day_of_week=schedule.day_of_week,
|
|
),
|
|
dump_listings_task.s(schedule.to_query_parameters().model_dump_json()),
|
|
name=schedule.name,
|
|
)
|