Add crawl job progress drawer with phase tracking and live logs

- Add phase-aware progress reporting across all crawl phases (splitting,
  fetching, filtering, processing) with per-step counters
- Add TaskProgressDrawer component with phase timeline stepper, detail
  counters, progress bar with ETA, and live worker log viewer
- Add on_step_complete callback to ListingProcessor for granular tracking
  of details/images/OCR steps
- Extend QuerySplitter on_progress callback with structured counter data
- Capture celery worker logs via ring buffer handler and inject into task
  state updates for frontend display
- Guard taskResult updates with phase presence check to prevent drawer
  from blanking during state transitions
This commit is contained in:
Viktor Barzin 2026-02-06 22:37:53 +00:00
parent 4018503723
commit b4837e1603
No known key found for this signature in database
GPG key ID: 0EB088298288D958
6 changed files with 617 additions and 24 deletions

View file

@ -7,6 +7,7 @@ import { useEffect, useState } from 'react';
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from './ui/tooltip';
import { Button } from './ui/button';
import { Loader2, CheckCircle2, XCircle, X, Trash2 } from 'lucide-react';
import { TaskProgressDrawer } from './TaskProgressDrawer';
interface TaskIndicatorProps {
taskID: string | null;
@ -19,8 +20,10 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
const [processed, setProcessed] = useState<number | null>(null);
const [total, setTotal] = useState<number | null>(null);
const [taskStatus, setTaskStatus] = useState<TaskStatus | null>(null);
const [taskResult, setTaskResult] = useState<TaskResult | null>(null);
const [isCancelling, setIsCancelling] = useState(false);
const [isClearing, setIsClearing] = useState(false);
const [drawerOpen, setDrawerOpen] = useState(false);
useEffect(() => {
getUser().then(setUser);
@ -29,6 +32,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
useEffect(() => {
if (!user || !taskID) {
setTaskStatus(null);
setTaskResult(null);
return;
}
@ -37,6 +41,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
setProgressPercentage(0);
setProcessed(null);
setTotal(null);
setTaskResult(null);
const pollTaskStatus = async () => {
try {
@ -46,6 +51,20 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
if (status === TaskStatus.SUCCESS) {
setProgressPercentage(100);
// Parse final result for the drawer to show completed state.
// Only update taskResult if the new result has phase info;
// otherwise keep the last in-progress result which has richer data
// than the bare SUCCESS return value.
if (data.result) {
try {
const parsedResult: TaskResult = JSON.parse(data.result);
if (parsedResult.phase) {
setTaskResult(parsedResult);
}
} catch {
// Ignore parsing errors
}
}
return true; // Stop polling
}
@ -57,7 +76,18 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
if (data.result) {
try {
const parsedResult: TaskResult = JSON.parse(data.result);
setProgressPercentage(parsedResult.progress * 100);
// Only update taskResult if the parsed data has a phase field.
// This prevents blanking the drawer when the backend sends a
// state update without phase info (e.g. during brief transitions).
if (parsedResult.phase) {
setTaskResult(parsedResult);
}
// Only update progress/processed/total when the fields are
// actually present — otherwise keep the previous values so
// the UI doesn't flash back to 0 during phase transitions.
if (parsedResult.progress !== undefined) {
setProgressPercentage(parsedResult.progress * 100);
}
if (parsedResult.processed !== undefined) {
setProcessed(parsedResult.processed);
}
@ -113,6 +143,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
const result = await clearAllTasks(user);
if (result.success) {
setTaskStatus(null);
setTaskResult(null);
onTaskCancelled?.();
}
} catch {
@ -144,18 +175,27 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
if (processed !== null && total !== null && total > 0) {
return `${processed} / ${total}`;
}
if (taskResult?.phase && taskResult.phase !== 'processing') {
const phaseLabels: Record<string, string> = {
splitting: 'Splitting',
splitting_complete: 'Split done',
fetching: 'Fetching',
filtering: 'Filtering',
};
return phaseLabels[taskResult.phase] ?? `${Math.round(progressPercentage)}%`;
}
return `${Math.round(progressPercentage)}%`;
};
const getTooltipContent = () => {
if (isInProgress) {
if (processed !== null && total !== null && total > 0) {
return `Processing: ${processed} / ${total} listings (${Math.round(progressPercentage)}%)`;
return `Processing: ${processed} / ${total} listings (${Math.round(progressPercentage)}%) — click for details`;
}
return `Task running: ${Math.round(progressPercentage)}%`;
return `Task running: ${getProgressText()} — click for details`;
}
if (taskStatus === TaskStatus.SUCCESS) {
return 'Task completed successfully';
return 'Task completed successfully — click for details';
}
if (taskStatus === TaskStatus.REVOKED) {
return 'Task was cancelled';
@ -168,7 +208,10 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
<div className="flex items-center gap-2">
<Tooltip>
<TooltipTrigger asChild>
<div className="flex items-center gap-2 cursor-default">
<div
className="flex items-center gap-2 cursor-pointer"
onClick={() => setDrawerOpen(true)}
>
{getStatusIcon()}
{isInProgress && (
<div className="flex items-center gap-2">
@ -230,6 +273,15 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
</TooltipContent>
</Tooltip>
</div>
<TaskProgressDrawer
open={drawerOpen}
onOpenChange={setDrawerOpen}
taskResult={taskResult}
taskStatus={taskStatus}
taskID={taskID}
onCancel={handleCancel}
isCancelling={isCancelling}
/>
</TooltipProvider>
);
}

View file

@ -0,0 +1,363 @@
import { TaskStatus, type TaskPhase, type TaskResult } from '@/types';
import {
Sheet,
SheetContent,
SheetHeader,
SheetTitle,
SheetDescription,
SheetFooter,
} from './ui/sheet';
import { Button } from './ui/button';
import { CheckCircle2, Circle, Loader2, XCircle } from 'lucide-react';
import { useEffect, useRef } from 'react';
interface TaskProgressDrawerProps {
open: boolean;
onOpenChange: (open: boolean) => void;
taskResult: TaskResult | null;
taskStatus: TaskStatus | null;
taskID: string | null;
onCancel: () => void;
isCancelling: boolean;
}
const PHASES: { key: TaskPhase; label: string }[] = [
{ key: 'splitting', label: 'Splitting queries' },
{ key: 'fetching', label: 'Fetching listings' },
{ key: 'filtering', label: 'Filtering results' },
{ key: 'processing', label: 'Processing listings' },
];
function getPhaseIndex(phase: TaskPhase | undefined): number {
if (!phase) return -1;
if (phase === 'splitting_complete') return 1; // splitting done, fetching is next
if (phase === 'completed') return PHASES.length;
return PHASES.findIndex((p) => p.key === phase);
}
function formatEta(seconds: number | undefined): string {
if (seconds === undefined || seconds <= 0) return '';
const mins = Math.floor(seconds / 60);
const secs = Math.round(seconds % 60);
if (mins > 0) {
return `~${mins}m ${secs}s remaining`;
}
return `~${secs}s remaining`;
}
function StatusBadge({ status }: { status: TaskStatus | null }) {
if (!status) return null;
const isInProgress =
status !== TaskStatus.SUCCESS &&
status !== TaskStatus.FAILURE &&
status !== TaskStatus.REVOKED;
if (isInProgress) {
return (
<span className="inline-flex items-center gap-1 rounded-full bg-blue-100 px-2 py-0.5 text-xs font-medium text-blue-700">
<Loader2 className="h-3 w-3 animate-spin" />
Running
</span>
);
}
if (status === TaskStatus.SUCCESS) {
return (
<span className="inline-flex items-center gap-1 rounded-full bg-green-100 px-2 py-0.5 text-xs font-medium text-green-700">
<CheckCircle2 className="h-3 w-3" />
Complete
</span>
);
}
if (status === TaskStatus.REVOKED) {
return (
<span className="inline-flex items-center gap-1 rounded-full bg-yellow-100 px-2 py-0.5 text-xs font-medium text-yellow-700">
<XCircle className="h-3 w-3" />
Cancelled
</span>
);
}
return (
<span className="inline-flex items-center gap-1 rounded-full bg-red-100 px-2 py-0.5 text-xs font-medium text-red-700">
<XCircle className="h-3 w-3" />
Failed
</span>
);
}
function PhaseTimeline({
currentPhase,
taskStatus,
}: {
currentPhase: TaskPhase | undefined;
taskStatus: TaskStatus | null;
}) {
const isTerminal =
taskStatus === TaskStatus.SUCCESS ||
taskStatus === TaskStatus.FAILURE ||
taskStatus === TaskStatus.REVOKED;
const activeIdx = isTerminal ? PHASES.length : getPhaseIndex(currentPhase);
return (
<div className="flex flex-col gap-1">
{PHASES.map((phase, idx) => {
const isCompleted = idx < activeIdx;
const isActive = idx === activeIdx && !isTerminal;
const isFuture = idx > activeIdx;
return (
<div key={phase.key} className="flex items-center gap-2">
{isCompleted && (
<CheckCircle2 className="h-4 w-4 text-green-500 shrink-0" />
)}
{isActive && (
<Loader2 className="h-4 w-4 animate-spin text-blue-500 shrink-0" />
)}
{isFuture && (
<Circle className="h-4 w-4 text-muted-foreground/40 shrink-0" />
)}
<span
className={
isActive
? 'text-sm font-medium text-foreground'
: isCompleted
? 'text-sm text-muted-foreground'
: 'text-sm text-muted-foreground/40'
}
>
{phase.label}
</span>
</div>
);
})}
</div>
);
}
function CounterRow({ label, value, total }: { label: string; value?: number; total?: number }) {
if (value === undefined) return null;
return (
<div className="flex justify-between text-sm">
<span className="text-muted-foreground">{label}</span>
<span className="font-mono tabular-nums">
{value}
{total !== undefined && ` / ${total}`}
</span>
</div>
);
}
function PhaseDetails({ result }: { result: TaskResult }) {
const phase = result.phase;
if (phase === 'splitting' || phase === 'splitting_complete') {
return (
<div className="rounded-md border p-3 space-y-1">
<p className="text-xs font-medium text-muted-foreground uppercase tracking-wide mb-2">
Splitting
</p>
<CounterRow
label="Subqueries probed"
value={result.subqueries_probed}
total={result.subqueries_initial}
/>
{result.subqueries_total !== undefined && (
<CounterRow label="Final subqueries" value={result.subqueries_total} />
)}
{result.estimated_results !== undefined && (
<CounterRow label="Estimated results" value={result.estimated_results} />
)}
</div>
);
}
if (phase === 'fetching') {
return (
<div className="rounded-md border p-3 space-y-1">
<p className="text-xs font-medium text-muted-foreground uppercase tracking-wide mb-2">
Fetching
</p>
<CounterRow
label="Subqueries completed"
value={result.subqueries_completed}
total={result.subqueries_total}
/>
<CounterRow label="IDs collected" value={result.ids_collected} />
<CounterRow label="Pages fetched" value={result.pages_fetched} />
</div>
);
}
if (phase === 'filtering') {
return (
<div className="rounded-md border p-3 space-y-1">
<p className="text-xs font-medium text-muted-foreground uppercase tracking-wide mb-2">
Filtering
</p>
<CounterRow label="Total from API" value={result.total_found} />
<CounterRow label="Already in DB" value={result.existing_in_db} />
<CounterRow label="New to process" value={result.new_listings} />
</div>
);
}
if (phase === 'processing') {
return (
<div className="rounded-md border p-3 space-y-1">
<p className="text-xs font-medium text-muted-foreground uppercase tracking-wide mb-2">
Processing
</p>
<CounterRow
label="Details fetched"
value={result.details_fetched}
total={result.total}
/>
<CounterRow label="Images downloaded" value={result.images_downloaded} />
<CounterRow label="OCR completed" value={result.ocr_completed} />
{(result.failed ?? 0) > 0 && (
<div className="flex justify-between text-sm">
<span className="text-red-500">Failed</span>
<span className="font-mono tabular-nums text-red-500">{result.failed}</span>
</div>
)}
</div>
);
}
return null;
}
function LogViewer({ logs }: { logs: string[] }) {
const scrollRef = useRef<HTMLDivElement>(null);
const isAutoScrolling = useRef(true);
const handleScroll = () => {
const el = scrollRef.current;
if (!el) return;
const atBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 30;
isAutoScrolling.current = atBottom;
};
useEffect(() => {
if (isAutoScrolling.current && scrollRef.current) {
scrollRef.current.scrollTop = scrollRef.current.scrollHeight;
}
}, [logs]);
return (
<div
ref={scrollRef}
onScroll={handleScroll}
className="rounded-md bg-zinc-950 p-3 overflow-y-auto font-mono text-[11px] leading-4 text-zinc-300 min-h-[100px] h-full"
>
{logs.length === 0 ? (
<span className="text-zinc-500 italic">Waiting for logs...</span>
) : (
logs.map((line, i) => (
<div key={i} className="whitespace-pre-wrap break-all">
{line}
</div>
))
)}
</div>
);
}
export function TaskProgressDrawer({
open,
onOpenChange,
taskResult,
taskStatus,
taskID,
onCancel,
isCancelling,
}: TaskProgressDrawerProps) {
const isInProgress =
taskStatus !== null &&
taskStatus !== TaskStatus.SUCCESS &&
taskStatus !== TaskStatus.FAILURE &&
taskStatus !== TaskStatus.REVOKED;
const progressPercent = taskResult
? Math.min((taskResult.progress ?? 0) * 100, 100)
: 0;
return (
<Sheet open={open} onOpenChange={onOpenChange}>
<SheetContent side="right" className="flex flex-col w-full sm:!max-w-lg">
<SheetHeader>
<div className="flex items-center justify-between pr-6">
<SheetTitle>Crawl Job Progress</SheetTitle>
<StatusBadge status={taskStatus} />
</div>
{taskID && (
<SheetDescription>
Task ID: {taskID.slice(0, 8)}...
</SheetDescription>
)}
</SheetHeader>
{/* Fixed top section: timeline + counters + progress */}
<div className="space-y-3 px-4 shrink-0">
<PhaseTimeline
currentPhase={taskResult?.phase}
taskStatus={taskStatus}
/>
{taskResult && <PhaseDetails result={taskResult} />}
{taskResult && taskResult.phase === 'processing' && (
<div className="space-y-1">
<div className="w-full h-2 bg-primary/20 rounded-full overflow-hidden">
<div
className="h-full bg-primary transition-all duration-300 ease-out rounded-full"
style={{ width: `${progressPercent}%` }}
/>
</div>
<div className="flex justify-between text-xs text-muted-foreground">
<span>
{taskResult.processed ?? 0} / {taskResult.total ?? '?'}
</span>
<span>{formatEta(taskResult.eta_seconds)}</span>
</div>
</div>
)}
{taskResult?.message && (
<p className="text-sm text-muted-foreground">{taskResult.message}</p>
)}
</div>
{/* Log viewer fills remaining space */}
<div className="flex-1 min-h-0 flex flex-col gap-1 px-4 pb-2">
<p className="text-xs font-medium text-muted-foreground uppercase tracking-wide shrink-0">
Worker Logs
</p>
<div className="flex-1 min-h-0">
<LogViewer logs={taskResult?.logs ?? []} />
</div>
</div>
{isInProgress && (
<SheetFooter>
<Button
variant="destructive"
onClick={onCancel}
disabled={isCancelling}
className="w-full"
>
{isCancelling ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Cancelling...
</>
) : (
'Cancel Job'
)}
</Button>
</SheetFooter>
)}
</SheetContent>
</Sheet>
);
}

View file

@ -48,13 +48,41 @@ export enum TaskStatus {
export interface TaskStatusResponse {
status: TaskStatus;
result: string; // JSON string containing { progress: number }
result: string; // JSON string containing TaskResult
message?: string;
}
export type TaskPhase = 'splitting' | 'splitting_complete' | 'fetching' | 'filtering' | 'processing' | 'completed';
export interface TaskResult {
progress: number;
processed?: number;
total?: number;
phase?: TaskPhase;
message?: string;
// Splitting phase
subqueries_probed?: number;
subqueries_initial?: number;
estimated_results?: number;
subqueries_total?: number;
// Fetching phase
subqueries_completed?: number;
ids_collected?: number;
pages_fetched?: number;
// Filtering phase
total_found?: number;
existing_in_db?: number;
new_listings?: number;
// Processing phase
details_fetched?: number;
images_downloaded?: number;
ocr_completed?: number;
failed?: number;
elapsed_seconds?: number;
rate_per_second?: number;
eta_seconds?: number;
// Live logs
logs?: string[];
}
export interface RefreshListingsResponse {

View file

@ -1,6 +1,7 @@
from __future__ import annotations
from abc import abstractmethod
import asyncio
from collections.abc import Callable
from datetime import datetime
import logging
import multiprocessing
@ -22,6 +23,13 @@ class ListingProcessor:
process_steps: list[Step]
listing_repository: ListingRepository
# Map step class names to short names for progress reporting
STEP_NAMES: dict[str, str] = {
"FetchListingDetailsStep": "details",
"FetchImagesStep": "images",
"DetectFloorplanStep": "ocr",
}
def __init__(self, listing_repository: ListingRepository):
self.semaphore = asyncio.Semaphore(20)
self.listing_repository = listing_repository
@ -33,19 +41,28 @@ class ListingProcessor:
DetectFloorplanStep(listing_repository),
]
async def process_listing(self, listing_id: int) -> Listing | None:
async def process_listing(
self,
listing_id: int,
on_step_complete: Callable[[str], None] | None = None,
) -> Listing | None:
await self.listing_repository.mark_seen(listing_id)
listing = None
for step in self.process_steps:
if await step.needs_processing(listing_id):
async with self.semaphore:
step_name = step.__class__.__name__
step_class_name = step.__class__.__name__
try:
listing = await step.process(listing_id)
logger.debug(f"[{listing_id}] {step_name} completed")
logger.debug(f"[{listing_id}] {step_class_name} completed")
if on_step_complete:
short_name = self.STEP_NAMES.get(
step_class_name, step_class_name
)
on_step_complete(short_name)
except Exception as e:
logger.error(f"[{listing_id}] {step_name} failed: {e}")
celery_logger.error(f"[{listing_id}] {step_name} failed: {e}")
logger.error(f"[{listing_id}] {step_class_name} failed: {e}")
celery_logger.error(f"[{listing_id}] {step_class_name} failed: {e}")
return None
return listing

View file

@ -238,6 +238,8 @@ class QuerySplitter:
parameters: Original query parameters to split.
session: aiohttp session for making requests.
on_progress: Optional callback for progress updates.
Called as on_progress(phase, message, **kwargs) where kwargs
contains structured data like subqueries_probed, etc.
Returns:
List of SubQuery objects, each under the result threshold.
@ -260,19 +262,32 @@ class QuerySplitter:
on_progress(
phase="splitting",
message=f"Created {len(initial_subqueries)} initial subqueries",
subqueries_initial=len(initial_subqueries),
subqueries_probed=0,
)
# Phase 2: Probe and adaptively split
semaphore = asyncio.Semaphore(self.config.max_concurrent_requests)
refined_subqueries: list[SubQuery] = []
probed_count = 0
# Probe all initial subqueries in parallel
async def probe_and_split(sq: SubQuery) -> list[SubQuery]:
nonlocal probed_count
async with semaphore:
await asyncio.sleep(self.config.request_delay_ms / 1000)
count = await self.probe_result_count(sq, session, parameters)
sq = replace(sq, estimated_results=count)
probed_count += 1
if on_progress:
on_progress(
phase="splitting",
message=f"Probed {probed_count}/{len(initial_subqueries)} subqueries",
subqueries_initial=len(initial_subqueries),
subqueries_probed=probed_count,
)
if count > self.config.split_threshold:
logger.info(
@ -294,10 +309,14 @@ class QuerySplitter:
f"Refined to {len(refined_subqueries)} subqueries after splitting"
)
total_estimated = self.calculate_total_estimated_results(refined_subqueries)
if on_progress:
on_progress(
phase="splitting_complete",
message=f"Refined to {len(refined_subqueries)} subqueries",
subqueries_total=len(refined_subqueries),
estimated_results=total_estimated,
)
return refined_subqueries

View file

@ -1,6 +1,7 @@
import asyncio
import logging
import time
from collections import deque
from typing import Any
from celery import Task
from celery.schedules import crontab
@ -31,6 +32,21 @@ if not celery_logger.handlers:
celery_logger.setLevel(logging.INFO)
SCRAPE_LOCK_NAME = "scrape_listings"
LOG_BUFFER_MAX_LINES = 200
class TaskLogHandler(logging.Handler):
"""Captures log records into a deque for inclusion in task state updates."""
def __init__(self, buffer: deque[str]) -> None:
super().__init__()
self.buffer = buffer
def emit(self, record: logging.LogRecord) -> None:
try:
self.buffer.append(self.format(record))
except Exception:
pass
@app.task(bind=True, pydantic=True)
@ -49,9 +65,9 @@ def dump_listings_task(self: Task, parameters_json: str) -> dict[str, Any]:
parsed_parameters = QueryParameters.model_validate_json(parameters_json)
celery_logger.info(f"Starting scrape with parameters: {parsed_parameters}")
self.update_state(state="Starting...", meta={"progress": 0})
self.update_state(state="Starting...", meta={"phase": "splitting", "progress": 0})
asyncio.run(dump_listings_full(task=self, parameters=parsed_parameters))
return {"progress": 0}
return {"phase": "completed", "progress": 1}
async def async_dump_listings_task(parameters_json: str) -> dict[str, Any]:
@ -70,6 +86,39 @@ async def dump_listings_full(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Fetches all listings, images as well as detects floorplans"""
# Set up log capture: a ring buffer handler that we inject into every
# task.update_state() call so the frontend can display live logs.
log_buffer: deque[str] = deque(maxlen=LOG_BUFFER_MAX_LINES)
log_handler = TaskLogHandler(log_buffer)
log_handler.setFormatter(
logging.Formatter("%(asctime)s %(message)s", datefmt="%H:%M:%S")
)
celery_logger.addHandler(log_handler)
# Wrap task.update_state so every call automatically includes logs
_original_update_state = task.update_state
def _update_state_with_logs(
state: str | None = None, meta: dict[str, Any] | None = None, **kwargs: Any
) -> None:
if meta is None:
meta = {}
meta["logs"] = list(log_buffer)
_original_update_state(state=state, meta=meta, **kwargs)
task.update_state = _update_state_with_logs # type: ignore[assignment]
try:
return await _dump_listings_full_inner(task=task, parameters=parameters)
finally:
celery_logger.removeHandler(log_handler)
task.update_state = _original_update_state # type: ignore[assignment]
async def _dump_listings_full_inner(
*, task: Task, parameters: QueryParameters
) -> list[Listing]:
"""Inner implementation — called with log-capturing update_state wrapper."""
start_time = time.time()
celery_logger.info("=" * 60)
celery_logger.info("PHASE 1: Initializing listing fetch")
@ -77,7 +126,7 @@ async def dump_listings_full(
repository = ListingRepository(engine)
task.update_state(state="Identifying missing listings", meta={"progress": 0})
task.update_state(state="Identifying missing listings", meta={"phase": "splitting", "progress": 0})
celery_logger.info("Querying Rightmove API to identify new listings...")
ids_to_process = await get_ids_to_process(
parameters=parameters, repository=repository, task=task
@ -92,7 +141,7 @@ async def dump_listings_full(
invalidate_cache()
task.update_state(
state="No new listings found",
meta={"progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"},
meta={"phase": "completed", "progress": 1, "processed": 0, "total": 0, "message": "All listings are up to date"},
)
return []
@ -115,6 +164,18 @@ async def dump_listings_full(
invalidate_cache()
# Send final state so the frontend has rich data even after task completes
task.update_state(
state="Completed",
meta={
"phase": "completed",
"progress": 1,
"processed": len(result),
"total": len(ids_to_process),
"message": f"Processed {len(result)} listings in {elapsed:.0f}s",
},
)
return result
@ -124,11 +185,26 @@ async def dump_listings_and_monitor(
task_progress = {missing_id: 0 for missing_id in missing_ids}
processed_count = 0
failed_count = 0
details_fetched = 0
images_downloaded = 0
ocr_completed = 0
start_time = time.time()
async def process(missing_id: int) -> Listing | None:
nonlocal processed_count, failed_count
listing = await listing_processor.process_listing(missing_id)
def step_callback(step_name: str) -> None:
nonlocal details_fetched, images_downloaded, ocr_completed
if step_name == "details":
details_fetched += 1
elif step_name == "images":
images_downloaded += 1
elif step_name == "ocr":
ocr_completed += 1
listing = await listing_processor.process_listing(
missing_id, on_step_complete=step_callback
)
task_progress[missing_id] = 1
if listing is not None:
processed_count += 1
@ -141,12 +217,12 @@ async def dump_listings_and_monitor(
while (progress := sum(task_progress.values())) < len(missing_ids):
progress_ratio = round(progress / len(missing_ids), 2)
elapsed = time.time() - start_time
rate = progress / elapsed if elapsed > 0 else 0
eta = (len(missing_ids) - progress) / rate if rate > 0 else 0
# Log every 10% progress or at least every update
if progress_ratio >= last_progress + 0.1 or progress == 1:
elapsed = time.time() - start_time
rate = progress / elapsed if elapsed > 0 else 0
eta = (len(missing_ids) - progress) / rate if rate > 0 else 0
celery_logger.info(
f"Progress: {progress_ratio * 100:.0f}% "
f"({progress}/{len(missing_ids)}) "
@ -158,7 +234,19 @@ async def dump_listings_and_monitor(
task.update_state(
state=f"Processing: {progress_ratio * 100:.0f}% ({progress}/{len(missing_ids)})",
meta={"progress": progress_ratio, "processed": progress, "total": len(missing_ids)},
meta={
"phase": "processing",
"progress": progress_ratio,
"processed": progress,
"total": len(missing_ids),
"details_fetched": details_fetched,
"images_downloaded": images_downloaded,
"ocr_completed": ocr_completed,
"failed": failed_count,
"elapsed_seconds": round(elapsed, 1),
"rate_per_second": round(rate, 2),
"eta_seconds": round(eta, 1),
},
)
await asyncio.sleep(1)
@ -224,8 +312,10 @@ async def get_ids_to_process(
# Reset throttle metrics
reset_throttle_metrics()
def on_progress(phase: str, message: str) -> None:
task.update_state(state=message, meta={"phase": phase})
def on_progress(phase: str, message: str, **kwargs: Any) -> None:
meta: dict[str, Any] = {"phase": phase, "message": message}
meta.update(kwargs)
task.update_state(state=message, meta=meta)
celery_logger.info(f"[{phase}] {message}")
celery_logger.info("Starting query splitting and probing...")
@ -254,7 +344,10 @@ async def get_ids_to_process(
state=f"Fetching listings from {len(subqueries)} subqueries...",
meta={
"phase": "fetching",
"subqueries": len(subqueries),
"subqueries_completed": 0,
"subqueries_total": len(subqueries),
"ids_collected": 0,
"pages_fetched": 0,
"estimated_results": total_estimated,
},
)
@ -275,6 +368,16 @@ async def get_ids_to_process(
estimated = sq.estimated_results or 0
if estimated == 0:
completed_subqueries += 1
task.update_state(
state=f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries",
meta={
"phase": "fetching",
"subqueries_completed": completed_subqueries,
"subqueries_total": len(subqueries),
"ids_collected": len(identifiers),
"pages_fetched": total_pages_fetched,
},
)
return results
# Fetch pages up to max_pages_per_query or until no more results
@ -333,6 +436,16 @@ async def get_ids_to_process(
break
completed_subqueries += 1
task.update_state(
state=f"Fetching: {completed_subqueries}/{len(subqueries)} subqueries",
meta={
"phase": "fetching",
"subqueries_completed": completed_subqueries,
"subqueries_total": len(subqueries),
"ids_collected": len(identifiers),
"pages_fetched": total_pages_fetched,
},
)
return results
# Fetch all subqueries concurrently
@ -391,6 +504,7 @@ async def get_ids_to_process(
meta={
"phase": "filtering",
"total_found": len(identifiers),
"existing_in_db": len(all_listing_ids),
"new_listings": len(new_ids),
},
)