import asyncio import logging import time from collections import deque from dataclasses import dataclass, field from typing import Any from celery import Task from celery.schedules import crontab from celery_app import app from config.schedule_config import SchedulesConfig from config.scraper_config import ScraperConfig from listing_processor import ListingProcessor from models.listing import Listing, QueryParameters from rec.query import create_session, listing_query from rec.exceptions import CircuitBreakerOpenError, InvalidResponseError, ThrottlingError from rec.throttle_detector import get_throttle_metrics, reset_throttle_metrics from repositories.listing_repository import ListingRepository from database import engine from services.query_splitter import QuerySplitter, SubQuery from utils.redis_lock import redis_lock from services.listing_cache import invalidate_cache from services.task_progress_publisher import publish_task_progress logger = logging.getLogger("uvicorn.error") # Also configure a celery-specific logger that always outputs to stdout celery_logger = logging.getLogger("celery.task") if not celery_logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s: %(message)s" )) celery_logger.addHandler(handler) celery_logger.setLevel(logging.INFO) SCRAPE_LOCK_NAME = "scrape_listings" LOG_BUFFER_MAX_LINES = 200 # Number of concurrent consumer workers that process listings from the queue. NUM_WORKERS = 20 # Phase constants for task state reporting PHASE_SPLITTING = "splitting" PHASE_FETCHING = "fetching" PHASE_PROCESSING = "processing" PHASE_COMPLETED = "completed" # Module-level log buffer — active only during task execution. # This is safe as module-level mutable state because Celery workers use a # prefork pool: each worker process handles one task at a time, so there is # no concurrent access within a single process. The TaskLogHandler appends # here; _update_task_state reads from here. _active_log_buffer: deque[str] | None = None @dataclass class _PipelineState: """Shared mutable state for the streaming fetch-and-process pipeline.""" ids_collected: int = 0 completed_subqueries: int = 0 total_pages_fetched: int = 0 fetching_done: bool = False processed_count: int = 0 failed_count: int = 0 details_fetched: int = 0 images_downloaded: int = 0 ocr_completed: int = 0 processed_listings: list[Listing] = field(default_factory=list) class ProgressReporter: """Event-driven, throttled progress reporter. Call ``notify()`` whenever pipeline state changes (page fetched, item processed, phase change). Publishes are throttled to at most one every ``min_interval`` seconds to avoid flooding Redis / WebSocket. Use ``force=True`` for phase changes, completion, and errors so they are published immediately. ``run_background_flush`` is a long-running coroutine that publishes every ``background_interval`` seconds during quiet periods (keeps ETA / elapsed fields current even when no items are being processed). """ def __init__( self, task: Task, state: _PipelineState, subqueries_total: int, start_time: float, min_interval: float = 0.25, background_interval: float = 2.0, ) -> None: self._task = task self._state = state self._subqueries_total = subqueries_total self._start_time = start_time self._min_interval = min_interval self._background_interval = background_interval self._last_publish: float = 0.0 self._last_log_progress: float = 0.0 self._dirty = False self._deferred: asyncio.TimerHandle | None = None self._loop: asyncio.AbstractEventLoop | None = None self._stopped = False # -- internal helpers --------------------------------------------------- def _build_meta(self) -> tuple[str, dict[str, Any]]: state = self._state total = state.ids_collected done = state.processed_count + state.failed_count phase = PHASE_PROCESSING if state.fetching_done else PHASE_FETCHING progress_ratio = round(done / total, 2) if total > 0 else 0.0 elapsed = time.time() - self._start_time rate = done / elapsed if elapsed > 0 else 0 remaining = (total - done) if total > 0 else 0 eta = remaining / rate if rate > 0 else 0 label = ( f"{'Processing' if state.fetching_done else 'Fetching & processing'}: " f"{done}/{total}" ) meta: dict[str, Any] = { "phase": phase, "progress": progress_ratio, "processed": done, "total": total, "subqueries_completed": state.completed_subqueries, "subqueries_total": self._subqueries_total, "ids_collected": state.ids_collected, "pages_fetched": state.total_pages_fetched, "fetching_done": state.fetching_done, "details_fetched": state.details_fetched, "images_downloaded": state.images_downloaded, "ocr_completed": state.ocr_completed, "failed": state.failed_count, "elapsed_seconds": round(elapsed, 1), "rate_per_second": round(rate, 2), "eta_seconds": round(eta, 1), } return label, meta def _publish(self) -> None: label, meta = self._build_meta() # Log milestones at every 10% progress progress_ratio = meta["progress"] if progress_ratio >= self._last_log_progress + 0.1 or meta["processed"] == 1: done = meta["processed"] total = meta["total"] elapsed = meta["elapsed_seconds"] rate = meta["rate_per_second"] eta = meta["eta_seconds"] celery_logger.info( f"Progress: {progress_ratio * 100:.0f}% " f"({done}/{total}) " f"| Elapsed: {elapsed:.0f}s " f"| Rate: {rate:.1f}/s " f"| ETA: {eta:.0f}s" ) self._last_log_progress = progress_ratio _update_task_state(self._task, label, meta) self._last_publish = time.time() self._dirty = False def _cancel_deferred(self) -> None: if self._deferred is not None: self._deferred.cancel() self._deferred = None def _deferred_callback(self) -> None: """Called by asyncio.call_later when the throttle window expires.""" self._deferred = None if self._dirty and not self._stopped: self._publish() # -- public API --------------------------------------------------------- def notify(self, force: bool = False) -> None: """Signal that pipeline state has changed. If ``force`` is True the publish happens immediately (use for phase changes and completion). Otherwise the publish is throttled. """ if self._stopped: return self._dirty = True now = time.time() if force or (now - self._last_publish) >= self._min_interval: self._cancel_deferred() self._publish() elif self._deferred is None: # Schedule a deferred flush so trailing updates aren't lost loop = self._loop or asyncio.get_event_loop() delay = self._min_interval - (now - self._last_publish) self._deferred = loop.call_later(delay, self._deferred_callback) def flush(self) -> None: """Force a final publish (call at pipeline end).""" self._cancel_deferred() self._publish() async def run_background_flush(self) -> None: """Background keepalive: publish every ``background_interval`` seconds. This keeps ETA / elapsed fields current during quiet periods. Exits when the pipeline is done (fetching complete and all items processed). """ self._loop = asyncio.get_running_loop() while True: state = self._state total = state.ids_collected done = state.processed_count + state.failed_count if state.fetching_done and done >= total and total > 0: break if state.fetching_done and total == 0: break await asyncio.sleep(self._background_interval) # Publish if nothing else has published recently if not self._stopped: now = time.time() if (now - self._last_publish) >= self._background_interval: self._publish() self._stopped = True self._cancel_deferred() class TaskLogHandler(logging.Handler): """Captures log records into a deque for inclusion in task state updates.""" def __init__(self, buffer: deque[str]) -> None: super().__init__() self.buffer = buffer def emit(self, record: logging.LogRecord) -> None: try: self.buffer.append(self.format(record)) except Exception: pass def _update_task_state(task: Task, state: str, meta: dict[str, Any]) -> None: """Call task.update_state with logs injected from the active log buffer.""" if _active_log_buffer is not None: meta["logs"] = list(_active_log_buffer) task.update_state(state=state, meta=meta) if hasattr(task, 'request') and task.request.id: publish_task_progress(task.request.id, state, meta) async def _fetch_subquery( sq: SubQuery, parameters: QueryParameters, session: object, config: ScraperConfig, semaphore: asyncio.Semaphore, existing_ids: set[int], queue: asyncio.Queue[int | None], state: _PipelineState, reporter: ProgressReporter, ) -> None: """Fetch pages for a single subquery and enqueue new listing IDs.""" estimated = sq.estimated_results or 0 if estimated == 0: state.completed_subqueries += 1 reporter.notify() return page_size = parameters.page_size max_pages = min( config.max_pages_per_query, (estimated // page_size) + 1, ) for page_id in range(1, max_pages + 1): async with semaphore: await asyncio.sleep(config.request_delay_ms / 1000) try: result = await listing_query( page=page_id, channel=parameters.listing_type, min_bedrooms=sq.min_bedrooms, max_bedrooms=sq.max_bedrooms, radius=parameters.radius, min_price=sq.min_price, max_price=sq.max_price, district=sq.district, page_size=page_size, max_days_since_added=parameters.max_days_since_added, furnish_types=parameters.furnish_types or [], session=session, config=config, ) state.total_pages_fetched += 1 reporter.notify() properties = result.get("properties", []) for prop in properties: identifier = prop.get("identifier") if identifier and identifier not in existing_ids: existing_ids.add(identifier) state.ids_collected += 1 await queue.put(identifier) if len(properties) < page_size: break except CircuitBreakerOpenError as e: celery_logger.error(f"Circuit breaker open: {e}") break except ThrottlingError as e: celery_logger.warning( f"Throttling on {sq.district} page {page_id}: {e}" ) break except InvalidResponseError: celery_logger.debug( f"Max page for {sq.district}: {page_id - 1}" ) break except Exception as e: celery_logger.warning( f"Error fetching page {page_id} for " f"{sq.district}: {e}" ) break state.completed_subqueries += 1 reporter.notify() async def _process_worker( queue: asyncio.Queue[int | None], processor: ListingProcessor, state: _PipelineState, reporter: ProgressReporter, ) -> None: """Consumer worker: pull listing IDs from the queue and process them.""" while True: listing_id = await queue.get() if listing_id is None: break def step_callback(step_name: str) -> None: if step_name == "details": state.details_fetched += 1 elif step_name == "images": state.images_downloaded += 1 elif step_name == "ocr": state.ocr_completed += 1 listing = await processor.process_listing( listing_id, on_step_complete=step_callback ) if listing is not None: state.processed_count += 1 state.processed_listings.append(listing) else: state.failed_count += 1 reporter.notify() @app.task(bind=True, pydantic=True) def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: msg = "Another scrape job is already running, skipping this execution" celery_logger.warning(msg) meta = {"reason": "Another scrape job is running"} self.update_state(state="SKIPPED", meta=meta) publish_task_progress(self.request.id, "SKIPPED", meta) return {"status": "skipped", "reason": "another_job_running"} celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}") self.update_state(state="Starting...", meta={"phase": PHASE_SPLITTING, "progress": 0}) publish_task_progress(self.request.id, "Starting...", {"phase": PHASE_SPLITTING, "progress": 0}) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) result = {"phase": PHASE_COMPLETED, "progress": 1} publish_task_progress(self.request.id, "SUCCESS", result) return result async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: with redis_lock(SCRAPE_LOCK_NAME) as acquired: if not acquired: celery_logger.warning("Another scrape job is already running, skipping this execution") return {"status": "skipped", "reason": "another_job_running"} celery_logger.info(f"Acquired lock: {SCRAPE_LOCK_NAME}") parsed_parameters = QueryParameters.model_validate_json(parameters_json) await dump_listings_full(task=Task(), parameters=parsed_parameters) return {"progress": 0} async def dump_listings_full( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" global _active_log_buffer # Set up log capture into a module-level buffer so _update_task_state # can inject logs into every state update. log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES) log_handler = TaskLogHandler(log_buffer) log_handler.setFormatter( logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S") ) # Attach handler to both loggers used in the codebase, and ensure # they accept INFO-level messages (Celery's worker setup may leave # the celery.task logger at WARNING). _prev_celery_level = celery_logger.level _prev_logger_level = logger.level celery_logger.addHandler(log_handler) logger.addHandler(log_handler) if celery_logger.level == logging.NOTSET or celery_logger.level > logging.INFO: celery_logger.setLevel(logging.INFO) if logger.level == logging.NOTSET or logger.level > logging.INFO: logger.setLevel(logging.INFO) _active_log_buffer = log_buffer try: return await _dump_listings_full_inner(task=task, parameters=parameters) finally: _active_log_buffer = None celery_logger.removeHandler(log_handler) logger.removeHandler(log_handler) celery_logger.setLevel(_prev_celery_level) logger.setLevel(_prev_logger_level) async def _dump_listings_full_inner( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Inner implementation with log capture active. Uses a streaming pipeline: an asyncio.Queue bridges the fetcher (producer) and processor workers (consumers) so that listing processing starts as soon as IDs become available from each subquery. """ start_time = time.time() state = _PipelineState() celery_logger.info("=" * 60) celery_logger.info(f"PHASE 1: Splitting queries") celery_logger.info("=" * 60) repository = ListingRepository(engine) config = ScraperConfig.from_env() splitter = QuerySplitter(config) reset_throttle_metrics() def on_progress(phase: str, message: str, **kwargs: Any) -> None: meta: dict[str, Any] = {"phase": phase, "message": message} meta.update(kwargs) _update_task_state(task, message, meta) celery_logger.info(f"[{phase}] {message}") _update_task_state(task, "Analyzing query and splitting by price bands...", { "phase": PHASE_SPLITTING, "progress": 0, }) celery_logger.info("Starting query splitting and probing...") try: async with create_session(config) as session: subqueries = await splitter.split(parameters, session, on_progress) total_estimated = splitter.calculate_total_estimated_results(subqueries) celery_logger.info( f"Query split complete: {len(subqueries)} subqueries, " f"~{total_estimated} estimated total results" ) celery_logger.info("Loading existing listing IDs from database...") existing_ids = repository.get_listing_ids(parameters.listing_type) celery_logger.info(f"Found {len(existing_ids)} existing listings in DB") celery_logger.info("=" * 60) celery_logger.info(f"PHASE 2: Streaming fetch & process") celery_logger.info("=" * 60) queue: asyncio.Queue[int | None] = asyncio.Queue() semaphore = asyncio.Semaphore(config.max_concurrent_requests) _update_task_state( task, f"Fetching listings from {len(subqueries)} subqueries...", { "phase": PHASE_FETCHING, "subqueries_completed": 0, "subqueries_total": len(subqueries), "ids_collected": 0, "pages_fetched": 0, "estimated_results": total_estimated, "fetching_done": False, }, ) listing_processor = ListingProcessor(repository, parameters.listing_type) reporter = ProgressReporter( task, state, len(subqueries), start_time, ) # Producer: fetch all subqueries concurrently, then signal workers to stop async def producer() -> None: await asyncio.gather( *[ _fetch_subquery( sq, parameters, session, config, semaphore, existing_ids, queue, state, reporter, ) for sq in subqueries ] ) celery_logger.info( f"Fetch complete: {state.total_pages_fetched} pages from " f"{state.completed_subqueries} subqueries, " f"{state.ids_collected} new IDs" ) state.fetching_done = True reporter.notify(force=True) for _ in range(NUM_WORKERS): await queue.put(None) await asyncio.gather( producer(), *[_process_worker(queue, listing_processor, state, reporter) for _ in range(NUM_WORKERS)], reporter.run_background_flush(), ) reporter.flush() except CircuitBreakerOpenError as e: celery_logger.error(f"Circuit breaker prevented query: {e}") metrics = get_throttle_metrics() if metrics.total_requests > 0: celery_logger.info(metrics.summary()) return [] finally: metrics = get_throttle_metrics() if metrics.total_requests > 0: celery_logger.info( f"API Stats: {metrics.total_requests} requests, " f"avg {metrics.average_response_time:.2f}s, " f"{metrics.total_throttling_events} throttled" ) elapsed = time.time() - start_time celery_logger.info("=" * 60) celery_logger.info( f"COMPLETED: Processed {len(state.processed_listings)} listings in {elapsed:.1f}s" ) celery_logger.info("=" * 60) invalidate_cache() _update_task_state(task, "Completed", { "phase": PHASE_COMPLETED, "progress": 1, "processed": len(state.processed_listings), "total": state.ids_collected, "message": f"Processed {len(state.processed_listings)} listings in {elapsed:.0f}s", }) return state.processed_listings @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): """Register periodic tasks from environment configuration.""" try: config = SchedulesConfig.from_env() except ValueError as e: celery_logger.error(f"Failed to load schedule configuration: {e}") return for schedule in config.get_enabled_schedules(): celery_logger.info( f"Registering periodic task: {schedule.name} at {schedule.hour}:{schedule.minute}" ) sender.add_periodic_task( crontab( minute=schedule.minute, hour=schedule.hour, day_of_week=schedule.day_of_week, ), dump_listings_task.s(schedule.to_query_parameters().model_dump_json()), name=schedule.name, )