From b4837e160354a46be52864ed295d8e3334a9a5e5 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Fri, 6 Feb 2026 22:37:53 +0000 Subject: [PATCH] Add crawl job progress drawer with phase tracking and live logs - Add phase-aware progress reporting across all crawl phases (splitting, fetching, filtering, processing) with per-step counters - Add TaskProgressDrawer component with phase timeline stepper, detail counters, progress bar with ETA, and live worker log viewer - Add on_step_complete callback to ListingProcessor for granular tracking of details/images/OCR steps - Extend QuerySplitter on_progress callback with structured counter data - Capture celery worker logs via ring buffer handler and inject into task state updates for frontend display - Guard taskResult updates with phase presence check to prevent drawer from blanking during state transitions --- .../frontend/src/components/TaskIndicator.tsx | 62 ++- .../src/components/TaskProgressDrawer.tsx | 363 ++++++++++++++++++ crawler/frontend/src/types/index.ts | 30 +- crawler/listing_processor.py | 27 +- crawler/services/query_splitter.py | 19 + crawler/tasks/listing_tasks.py | 140 ++++++- 6 files changed, 617 insertions(+), 24 deletions(-) create mode 100644 crawler/frontend/src/components/TaskProgressDrawer.tsx diff --git a/crawler/frontend/src/components/TaskIndicator.tsx b/crawler/frontend/src/components/TaskIndicator.tsx index f396dfd..7cfbcf4 100644 --- a/crawler/frontend/src/components/TaskIndicator.tsx +++ b/crawler/frontend/src/components/TaskIndicator.tsx @@ -7,6 +7,7 @@ import { useEffect, useState } from 'react'; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from './ui/tooltip'; import { Button } from './ui/button'; import { Loader2, CheckCircle2, XCircle, X, Trash2 } from 'lucide-react'; +import { TaskProgressDrawer } from './TaskProgressDrawer'; interface TaskIndicatorProps { taskID: string | null; @@ -19,8 +20,10 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { const [processed, setProcessed] = useState(null); const [total, setTotal] = useState(null); const [taskStatus, setTaskStatus] = useState(null); + const [taskResult, setTaskResult] = useState(null); const [isCancelling, setIsCancelling] = useState(false); const [isClearing, setIsClearing] = useState(false); + const [drawerOpen, setDrawerOpen] = useState(false); useEffect(() => { getUser().then(setUser); @@ -29,6 +32,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { useEffect(() => { if (!user || !taskID) { setTaskStatus(null); + setTaskResult(null); return; } @@ -37,6 +41,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { setProgressPercentage(0); setProcessed(null); setTotal(null); + setTaskResult(null); const pollTaskStatus = async () => { try { @@ -46,6 +51,20 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { if (status === TaskStatus.SUCCESS) { setProgressPercentage(100); + // Parse final result for the drawer to show completed state. + // Only update taskResult if the new result has phase info; + // otherwise keep the last in-progress result which has richer data + // than the bare SUCCESS return value. + if (data.result) { + try { + const parsedResult: TaskResult = JSON.parse(data.result); + if (parsedResult.phase) { + setTaskResult(parsedResult); + } + } catch { + // Ignore parsing errors + } + } return true; // Stop polling } @@ -57,7 +76,18 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { if (data.result) { try { const parsedResult: TaskResult = JSON.parse(data.result); - setProgressPercentage(parsedResult.progress * 100); + // Only update taskResult if the parsed data has a phase field. + // This prevents blanking the drawer when the backend sends a + // state update without phase info (e.g. during brief transitions). + if (parsedResult.phase) { + setTaskResult(parsedResult); + } + // Only update progress/processed/total when the fields are + // actually present — otherwise keep the previous values so + // the UI doesn't flash back to 0 during phase transitions. + if (parsedResult.progress !== undefined) { + setProgressPercentage(parsedResult.progress * 100); + } if (parsedResult.processed !== undefined) { setProcessed(parsedResult.processed); } @@ -113,6 +143,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { const result = await clearAllTasks(user); if (result.success) { setTaskStatus(null); + setTaskResult(null); onTaskCancelled?.(); } } catch { @@ -144,18 +175,27 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { if (processed !== null && total !== null && total > 0) { return `${processed} / ${total}`; } + if (taskResult?.phase && taskResult.phase !== 'processing') { + const phaseLabels: Record = { + splitting: 'Splitting', + splitting_complete: 'Split done', + fetching: 'Fetching', + filtering: 'Filtering', + }; + return phaseLabels[taskResult.phase] ?? `${Math.round(progressPercentage)}%`; + } return `${Math.round(progressPercentage)}%`; }; const getTooltipContent = () => { if (isInProgress) { if (processed !== null && total !== null && total > 0) { - return `Processing: ${processed} / ${total} listings (${Math.round(progressPercentage)}%)`; + return `Processing: ${processed} / ${total} listings (${Math.round(progressPercentage)}%) — click for details`; } - return `Task running: ${Math.round(progressPercentage)}%`; + return `Task running: ${getProgressText()} — click for details`; } if (taskStatus === TaskStatus.SUCCESS) { - return 'Task completed successfully'; + return 'Task completed successfully — click for details'; } if (taskStatus === TaskStatus.REVOKED) { return 'Task was cancelled'; @@ -168,7 +208,10 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
-
+
setDrawerOpen(true)} + > {getStatusIcon()} {isInProgress && (
@@ -230,6 +273,15 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
+ ); } diff --git a/crawler/frontend/src/components/TaskProgressDrawer.tsx b/crawler/frontend/src/components/TaskProgressDrawer.tsx new file mode 100644 index 0000000..5a1394a --- /dev/null +++ b/crawler/frontend/src/components/TaskProgressDrawer.tsx @@ -0,0 +1,363 @@ +import { TaskStatus, type TaskPhase, type TaskResult } from '@/types'; +import { + Sheet, + SheetContent, + SheetHeader, + SheetTitle, + SheetDescription, + SheetFooter, +} from './ui/sheet'; +import { Button } from './ui/button'; +import { CheckCircle2, Circle, Loader2, XCircle } from 'lucide-react'; +import { useEffect, useRef } from 'react'; + +interface TaskProgressDrawerProps { + open: boolean; + onOpenChange: (open: boolean) => void; + taskResult: TaskResult | null; + taskStatus: TaskStatus | null; + taskID: string | null; + onCancel: () => void; + isCancelling: boolean; +} + +const PHASES: { key: TaskPhase; label: string }[] = [ + { key: 'splitting', label: 'Splitting queries' }, + { key: 'fetching', label: 'Fetching listings' }, + { key: 'filtering', label: 'Filtering results' }, + { key: 'processing', label: 'Processing listings' }, +]; + +function getPhaseIndex(phase: TaskPhase | undefined): number { + if (!phase) return -1; + if (phase === 'splitting_complete') return 1; // splitting done, fetching is next + if (phase === 'completed') return PHASES.length; + return PHASES.findIndex((p) => p.key === phase); +} + +function formatEta(seconds: number | undefined): string { + if (seconds === undefined || seconds <= 0) return ''; + const mins = Math.floor(seconds / 60); + const secs = Math.round(seconds % 60); + if (mins > 0) { + return `~${mins}m ${secs}s remaining`; + } + return `~${secs}s remaining`; +} + +function StatusBadge({ status }: { status: TaskStatus | null }) { + if (!status) return null; + + const isInProgress = + status !== TaskStatus.SUCCESS && + status !== TaskStatus.FAILURE && + status !== TaskStatus.REVOKED; + + if (isInProgress) { + return ( + + + Running + + ); + } + if (status === TaskStatus.SUCCESS) { + return ( + + + Complete + + ); + } + if (status === TaskStatus.REVOKED) { + return ( + + + Cancelled + + ); + } + return ( + + + Failed + + ); +} + +function PhaseTimeline({ + currentPhase, + taskStatus, +}: { + currentPhase: TaskPhase | undefined; + taskStatus: TaskStatus | null; +}) { + const isTerminal = + taskStatus === TaskStatus.SUCCESS || + taskStatus === TaskStatus.FAILURE || + taskStatus === TaskStatus.REVOKED; + const activeIdx = isTerminal ? PHASES.length : getPhaseIndex(currentPhase); + + return ( +
+ {PHASES.map((phase, idx) => { + const isCompleted = idx < activeIdx; + const isActive = idx === activeIdx && !isTerminal; + const isFuture = idx > activeIdx; + + return ( +
+ {isCompleted && ( + + )} + {isActive && ( + + )} + {isFuture && ( + + )} + + {phase.label} + +
+ ); + })} +
+ ); +} + +function CounterRow({ label, value, total }: { label: string; value?: number; total?: number }) { + if (value === undefined) return null; + return ( +
+ {label} + + {value} + {total !== undefined && ` / ${total}`} + +
+ ); +} + +function PhaseDetails({ result }: { result: TaskResult }) { + const phase = result.phase; + + if (phase === 'splitting' || phase === 'splitting_complete') { + return ( +
+

+ Splitting +

+ + {result.subqueries_total !== undefined && ( + + )} + {result.estimated_results !== undefined && ( + + )} +
+ ); + } + + if (phase === 'fetching') { + return ( +
+

+ Fetching +

+ + + +
+ ); + } + + if (phase === 'filtering') { + return ( +
+

+ Filtering +

+ + + +
+ ); + } + + if (phase === 'processing') { + return ( +
+

+ Processing +

+ + + + {(result.failed ?? 0) > 0 && ( +
+ Failed + {result.failed} +
+ )} +
+ ); + } + + return null; +} + +function LogViewer({ logs }: { logs: string[] }) { + const scrollRef = useRef(null); + const isAutoScrolling = useRef(true); + + const handleScroll = () => { + const el = scrollRef.current; + if (!el) return; + const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 30; + isAutoScrolling.current = atBottom; + }; + + useEffect(() => { + if (isAutoScrolling.current && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + }, [logs]); + + return ( +
+ {logs.length === 0 ? ( + Waiting for logs... + ) : ( + logs.map((line, i) => ( +
+ {line} +
+ )) + )} +
+ ); +} + +export function TaskProgressDrawer({ + open, + onOpenChange, + taskResult, + taskStatus, + taskID, + onCancel, + isCancelling, +}: TaskProgressDrawerProps) { + const isInProgress = + taskStatus !== null && + taskStatus !== TaskStatus.SUCCESS && + taskStatus !== TaskStatus.FAILURE && + taskStatus !== TaskStatus.REVOKED; + + const progressPercent = taskResult + ? Math.min((taskResult.progress ?? 0) * 100, 100) + : 0; + + return ( + + + +
+ Crawl Job Progress + +
+ {taskID && ( + + Task ID: {taskID.slice(0, 8)}... + + )} +
+ + {/* Fixed top section: timeline + counters + progress */} +
+ + + {taskResult && } + + {taskResult && taskResult.phase === 'processing' && ( +
+
+
+
+
+ + {taskResult.processed ?? 0} / {taskResult.total ?? '?'} + + {formatEta(taskResult.eta_seconds)} +
+
+ )} + + {taskResult?.message && ( +

{taskResult.message}

+ )} +
+ + {/* Log viewer fills remaining space */} +
+

+ Worker Logs +

+
+ +
+
+ + {isInProgress && ( + + + + )} + + + ); +} diff --git a/crawler/frontend/src/types/index.ts b/crawler/frontend/src/types/index.ts index 5d3d931..14e8a7b 100644 --- a/crawler/frontend/src/types/index.ts +++ b/crawler/frontend/src/types/index.ts @@ -48,13 +48,41 @@ export enum TaskStatus { export interface TaskStatusResponse { status: TaskStatus; - result: string; // JSON string containing { progress: number } + result: string; // JSON string containing TaskResult + message?: string; } +export type TaskPhase = 'splitting' | 'splitting_complete' | 'fetching' | 'filtering' | 'processing' | 'completed'; + export interface TaskResult { progress: number; processed?: number; total?: number; + phase?: TaskPhase; + message?: string; + // Splitting phase + subqueries_probed?: number; + subqueries_initial?: number; + estimated_results?: number; + subqueries_total?: number; + // Fetching phase + subqueries_completed?: number; + ids_collected?: number; + pages_fetched?: number; + // Filtering phase + total_found?: number; + existing_in_db?: number; + new_listings?: number; + // Processing phase + details_fetched?: number; + images_downloaded?: number; + ocr_completed?: number; + failed?: number; + elapsed_seconds?: number; + rate_per_second?: number; + eta_seconds?: number; + // Live logs + logs?: string[]; } export interface RefreshListingsResponse { diff --git a/crawler/listing_processor.py b/crawler/listing_processor.py index e1151ca..c144c65 100644 --- a/crawler/listing_processor.py +++ b/crawler/listing_processor.py @@ -1,6 +1,7 @@ from __future__ import annotations from abc import abstractmethod import asyncio +from collections.abc import Callable from datetime import datetime import logging import multiprocessing @@ -22,6 +23,13 @@ class ListingProcessor: process_steps: list[Step] listing_repository: ListingRepository + # Map step class names to short names for progress reporting + STEP_NAMES: dict[str, str] = { + "FetchListingDetailsStep": "details", + "FetchImagesStep": "images", + "DetectFloorplanStep": "ocr", + } + def __init__(self, listing_repository: ListingRepository): self.semaphore = asyncio.Semaphore(20) self.listing_repository = listing_repository @@ -33,19 +41,28 @@ class ListingProcessor: DetectFloorplanStep(listing_repository), ] - async def process_listing(self, listing_id: int) -> Listing | None: + async def process_listing( + self, + listing_id: int, + on_step_complete: Callable[[str], None] | None = None, + ) -> Listing | None: await self.listing_repository.mark_seen(listing_id) listing = None for step in self.process_steps: if await step.needs_processing(listing_id): async with self.semaphore: - step_name = step.__class__.__name__ + step_class_name = step.__class__.__name__ try: listing = await step.process(listing_id) - logger.debug(f"[{listing_id}] {step_name} completed") + logger.debug(f"[{listing_id}] {step_class_name} completed") + if on_step_complete: + short_name = self.STEP_NAMES.get( + step_class_name, step_class_name + ) + on_step_complete(short_name) except Exception as e: - logger.error(f"[{listing_id}] {step_name} failed: {e}") - celery_logger.error(f"[{listing_id}] {step_name} failed: {e}") + logger.error(f"[{listing_id}] {step_class_name} failed: {e}") + celery_logger.error(f"[{listing_id}] {step_class_name} failed: {e}") return None return listing diff --git a/crawler/services/query_splitter.py b/crawler/services/query_splitter.py index b183ac2..ac99aa6 100644 --- a/crawler/services/query_splitter.py +++ b/crawler/services/query_splitter.py @@ -238,6 +238,8 @@ class QuerySplitter: parameters: Original query parameters to split. session: aiohttp session for making requests. on_progress: Optional callback for progress updates. + Called as on_progress(phase, message, **kwargs) where kwargs + contains structured data like subqueries_probed, etc. Returns: List of SubQuery objects, each under the result threshold. @@ -260,19 +262,32 @@ class QuerySplitter: on_progress( phase="splitting", message=f"Created {len(initial_subqueries)} initial subqueries", + subqueries_initial=len(initial_subqueries), + subqueries_probed=0, ) # Phase 2: Probe and adaptively split semaphore = asyncio.Semaphore(self.config.max_concurrent_requests) refined_subqueries: list[SubQuery] = [] + probed_count = 0 # Probe all initial subqueries in parallel async def probe_and_split(sq: SubQuery) -> list[SubQuery]: + nonlocal probed_count async with semaphore: await asyncio.sleep(self.config.request_delay_ms / 1000) count = await self.probe_result_count(sq, session, parameters) sq = replace(sq, estimated_results=count) + probed_count += 1 + + if on_progress: + on_progress( + phase="splitting", + message=f"Probed {probed_count}/{len(initial_subqueries)} subqueries", + subqueries_initial=len(initial_subqueries), + subqueries_probed=probed_count, + ) if count > self.config.split_threshold: logger.info( @@ -294,10 +309,14 @@ class QuerySplitter: f"Refined to {len(refined_subqueries)} subqueries after splitting" ) + total_estimated = self.calculate_total_estimated_results(refined_subqueries) + if on_progress: on_progress( phase="splitting_complete", message=f"Refined to {len(refined_subqueries)} subqueries", + subqueries_total=len(refined_subqueries), + estimated_results=total_estimated, ) return refined_subqueries diff --git a/crawler/tasks/listing_tasks.py b/crawler/tasks/listing_tasks.py index 60bf2e6..f3f088e 100644 --- a/crawler/tasks/listing_tasks.py +++ b/crawler/tasks/listing_tasks.py @@ -1,6 +1,7 @@ import asyncio import logging import time +from collections import deque from typing import Any from celery import Task from celery.schedules import crontab @@ -31,6 +32,21 @@ if not celery_logger.handlers: celery_logger.setLevel(logging.INFO) SCRAPE_LOCK_NAME = "scrape_listings" +LOG_BUFFER_MAX_LINES = 200 + + +class TaskLogHandler(logging.Handler): + """Captures log records into a deque for inclusion in task state updates.""" + + def __init__(self, buffer: deque[str]) -> None: + super().__init__() + self.buffer = buffer + + def emit(self, record: logging.LogRecord) -> None: + try: + self.buffer.append(self.format(record)) + except Exception: + pass @app.task(bind=True, pydantic=True) @@ -49,9 +65,9 @@ def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]: parsed_parameters = QueryParameters.model_validate_json(parameters_json) celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}") - self.update_state(state="Starting...", meta={"progress": 0}) + self.update_state(state="Starting...", meta={"phase": "splitting", "progress": 0}) asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters)) - return {"progress": 0} + return {"phase": "completed", "progress": 1} async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]: @@ -70,6 +86,39 @@ async def dump_listings_full( *, task: Task, parameters: QueryParameters ) -> list[Listing]: """Fetches all listings, images as well as detects floorplans""" + # Set up log capture: a ring buffer handler that we inject into every + # task.update_state() call so the frontend can display live logs. + log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES) + log_handler = TaskLogHandler(log_buffer) + log_handler.setFormatter( + logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S") + ) + celery_logger.addHandler(log_handler) + + # Wrap task.update_state so every call automatically includes logs + _original_update_state = task.update_state + + def _update_state_with_logs( + state: str | None = None, meta: dict[str, Any] | None = None, **kwargs: Any + ) -> None: + if meta is None: + meta = {} + meta["logs"] = list(log_buffer) + _original_update_state(state=state, meta=meta, **kwargs) + + task.update_state = _update_state_with_logs # type: ignore[assignment] + + try: + return await _dump_listings_full_inner(task=task, parameters=parameters) + finally: + celery_logger.removeHandler(log_handler) + task.update_state = _original_update_state # type: ignore[assignment] + + +async def _dump_listings_full_inner( + *, task: Task, parameters: QueryParameters +) -> list[Listing]: + """Inner implementation — called with log-capturing update_state wrapper.""" start_time = time.time() celery_logger.info("=" * 60) celery_logger.info("PHASE 1: Initializing listing fetch") @@ -77,7 +126,7 @@ async def dump_listings_full( repository = ListingRepository(engine) - task.update_state(state="Identifying missing listings", meta={"progress": 0}) + task.update_state(state="Identifying missing listings", meta={"phase": "splitting", "progress": 0}) celery_logger.info("Querying Rightmove API to identify new listings...") ids_to_process = await get_ids_to_process( parameters=parameters, repository=repository, task=task @@ -92,7 +141,7 @@ async def dump_listings_full( invalidate_cache() task.update_state( state="No new listings found", - meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"}, + meta={"phase": "completed", "progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"}, ) return [] @@ -115,6 +164,18 @@ async def dump_listings_full( invalidate_cache() + # Send final state so the frontend has rich data even after task completes + task.update_state( + state="Completed", + meta={ + "phase": "completed", + "progress": 1, + "processed": len(result), + "total": len(ids_to_process), + "message": f"Processed {len(result)} listings in {elapsed:.0f}s", + }, + ) + return result @@ -124,11 +185,26 @@ async def dump_listings_and_monitor( task_progress = {missing_id: 0 for missing_id in missing_ids} processed_count = 0 failed_count = 0 + details_fetched = 0 + images_downloaded = 0 + ocr_completed = 0 start_time = time.time() async def process(missing_id: int) -> Listing | None: nonlocal processed_count, failed_count - listing = await listing_processor.process_listing(missing_id) + + def step_callback(step_name: str) -> None: + nonlocal details_fetched, images_downloaded, ocr_completed + if step_name == "details": + details_fetched += 1 + elif step_name == "images": + images_downloaded += 1 + elif step_name == "ocr": + ocr_completed += 1 + + listing = await listing_processor.process_listing( + missing_id, on_step_complete=step_callback + ) task_progress[missing_id] = 1 if listing is not None: processed_count += 1 @@ -141,12 +217,12 @@ async def dump_listings_and_monitor( while (progress := sum(task_progress.values())) < len(missing_ids): progress_ratio = round(progress / len(missing_ids), 2) + elapsed = time.time() - start_time + rate = progress / elapsed if elapsed > 0 else 0 + eta = (len(missing_ids) - progress) / rate if rate > 0 else 0 + # Log every 10% progress or at least every update if progress_ratio >= last_progress + 0.1 or progress == 1: - elapsed = time.time() - start_time - rate = progress / elapsed if elapsed > 0 else 0 - eta = (len(missing_ids) - progress) / rate if rate > 0 else 0 - celery_logger.info( f"Progress: {progress_ratio * 100:.0f}% " f"({progress}/{len(missing_ids)}) " @@ -158,7 +234,19 @@ async def dump_listings_and_monitor( task.update_state( state=f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})", - meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)}, + meta={ + "phase": "processing", + "progress": progress_ratio, + "processed": progress, + "total": len(missing_ids), + "details_fetched": details_fetched, + "images_downloaded": images_downloaded, + "ocr_completed": ocr_completed, + "failed": failed_count, + "elapsed_seconds": round(elapsed, 1), + "rate_per_second": round(rate, 2), + "eta_seconds": round(eta, 1), + }, ) await asyncio.sleep(1) @@ -224,8 +312,10 @@ async def get_ids_to_process( # Reset throttle metrics reset_throttle_metrics() - def on_progress(phase: str, message: str) -> None: - task.update_state(state=message, meta={"phase": phase}) + def on_progress(phase: str, message: str, **kwargs: Any) -> None: + meta: dict[str, Any] = {"phase": phase, "message": message} + meta.update(kwargs) + task.update_state(state=message, meta=meta) celery_logger.info(f"[{phase}] {message}") celery_logger.info("Starting query splitting and probing...") @@ -254,7 +344,10 @@ async def get_ids_to_process( state=f"Fetching listings from {len(subqueries)} subqueries...", meta={ "phase": "fetching", - "subqueries": len(subqueries), + "subqueries_completed": 0, + "subqueries_total": len(subqueries), + "ids_collected": 0, + "pages_fetched": 0, "estimated_results": total_estimated, }, ) @@ -275,6 +368,16 @@ async def get_ids_to_process( estimated = sq.estimated_results or 0 if estimated == 0: completed_subqueries += 1 + task.update_state( + state=f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries", + meta={ + "phase": "fetching", + "subqueries_completed": completed_subqueries, + "subqueries_total": len(subqueries), + "ids_collected": len(identifiers), + "pages_fetched": total_pages_fetched, + }, + ) return results # Fetch pages up to max_pages_per_query or until no more results @@ -333,6 +436,16 @@ async def get_ids_to_process( break completed_subqueries += 1 + task.update_state( + state=f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries", + meta={ + "phase": "fetching", + "subqueries_completed": completed_subqueries, + "subqueries_total": len(subqueries), + "ids_collected": len(identifiers), + "pages_fetched": total_pages_fetched, + }, + ) return results # Fetch all subqueries concurrently @@ -391,6 +504,7 @@ async def get_ids_to_process( meta={ "phase": "filtering", "total_found": len(identifiers), + "existing_in_db": len(all_listing_ids), "new_listings": len(new_ids), }, )