From 835a2a9d53a91c8b3c24d0a7a35e33e5b531698c Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 1 Feb 2026 20:40:07 +0000 Subject: [PATCH] Fix stuck Celery tasks and add purge all tasks functionality --- crawler/api/app.py | 18 +- .../frontend/src/components/TaskIndicator.tsx | 38 ++++- .../frontend/src/constants/colorSchemes.ts | 83 +++++++++ crawler/frontend/src/constants/index.ts | 63 +++++++ crawler/frontend/src/services/index.ts | 6 + crawler/frontend/src/services/taskService.ts | 15 ++ crawler/redis_repository.py | 45 +++-- crawler/services/task_service.py | 161 ++++++++++++++++++ 8 files changed, 413 insertions(+), 16 deletions(-) create mode 100644 crawler/frontend/src/constants/colorSchemes.ts create mode 100644 crawler/frontend/src/constants/index.ts create mode 100644 crawler/frontend/src/services/index.ts create mode 100644 crawler/services/task_service.py diff --git a/crawler/api/app.py b/crawler/api/app.py index a96679e..96f8446 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -223,14 +223,14 @@ async def cancel_task( user: Annotated[User, Depends(get_current_user)], task_id: str = Query(..., description="The task ID to cancel"), ) -> dict[str, str | bool]: - """Cancel a running task.""" + """Cancel a running task and remove it from the user's task list.""" # Verify user owns this task user_tasks = task_service.get_user_tasks(user.email) if task_id not in user_tasks: return {"success": False, "message": "Task not found or not owned by user"} try: - task_service.cancel_task(task_id) + task_service.cancel_task(task_id, user_email=user.email) logger.info(f"Task {task_id} cancelled by {user.email}") return {"success": True, "message": "Task cancelled"} except Exception as e: @@ -238,6 +238,20 @@ async def cancel_task( return {"success": False, "message": str(e)} +@app.post("/api/clear_all_tasks") +async def clear_all_tasks( + user: Annotated[User, Depends(get_current_user)], +) -> dict[str, str | int | bool]: + """Clear all tasks for the current user.""" + try: + count = task_service.clear_all_tasks(user.email) + logger.info(f"Cleared {count} tasks for {user.email}") + return {"success": True, "count": count, "message": f"Cleared {count} tasks"} + except Exception as e: + logger.error(f"Failed to clear tasks for {user.email}: {e}") + return {"success": False, "count": 0, "message": str(e)} + + @app.get("/api/get_districts") async def get_districts( user: Annotated[User, Depends(get_current_user)], diff --git a/crawler/frontend/src/components/TaskIndicator.tsx b/crawler/frontend/src/components/TaskIndicator.tsx index c9ebf2d..f396dfd 100644 --- a/crawler/frontend/src/components/TaskIndicator.tsx +++ b/crawler/frontend/src/components/TaskIndicator.tsx @@ -1,12 +1,12 @@ import { getUser } from '@/auth/authService'; import { POLLING_INTERVALS } from '@/constants'; -import { fetchTaskStatus, cancelTask } from '@/services'; +import { fetchTaskStatus, cancelTask, clearAllTasks } from '@/services'; import { TaskStatus, type TaskResult } from '@/types'; import type { User } from 'oidc-client-ts'; import { useEffect, useState } from 'react'; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from './ui/tooltip'; import { Button } from './ui/button'; -import { Loader2, CheckCircle2, XCircle, X } from 'lucide-react'; +import { Loader2, CheckCircle2, XCircle, X, Trash2 } from 'lucide-react'; interface TaskIndicatorProps { taskID: string | null; @@ -20,6 +20,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { const [total, setTotal] = useState(null); const [taskStatus, setTaskStatus] = useState(null); const [isCancelling, setIsCancelling] = useState(false); + const [isClearing, setIsClearing] = useState(false); useEffect(() => { getUser().then(setUser); @@ -104,6 +105,23 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { } }; + const handleClearAll = async () => { + if (!user || isClearing) return; + + setIsClearing(true); + try { + const result = await clearAllTasks(user); + if (result.success) { + setTaskStatus(null); + onTaskCancelled?.(); + } + } catch { + // Ignore clear errors + } finally { + setIsClearing(false); + } + }; + if (!taskID || !taskStatus) { return null; } @@ -195,6 +213,22 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) { )} + + + + + +

Clear all tasks

+
+
); diff --git a/crawler/frontend/src/constants/colorSchemes.ts b/crawler/frontend/src/constants/colorSchemes.ts new file mode 100644 index 0000000..cd93bef --- /dev/null +++ b/crawler/frontend/src/constants/colorSchemes.ts @@ -0,0 +1,83 @@ +// Color schemes for map visualization +// Different color schemes for different metrics to improve clarity + +import { Metric } from "@/components/Parameters"; + +// For metrics where LOW is GOOD (price, price per sqm): Green → Yellow → Red +export const LOW_IS_GOOD_COLOR_STOPS: [number, string][] = [ + [0, 'rgba(34, 197, 94, 0.7)'], // Green - good deal + [25, 'rgba(132, 204, 22, 0.7)'], // Lime + [50, 'rgba(250, 204, 21, 0.7)'], // Yellow - neutral + [75, 'rgba(249, 115, 22, 0.7)'], // Orange + [100, 'rgba(239, 68, 68, 0.7)'], // Red - expensive +]; + +// For metrics where HIGH is GOOD (size, rooms): Red → Yellow → Green +export const HIGH_IS_GOOD_COLOR_STOPS: [number, string][] = [ + [0, 'rgba(239, 68, 68, 0.7)'], // Red - small + [25, 'rgba(249, 115, 22, 0.7)'], // Orange + [50, 'rgba(250, 204, 21, 0.7)'], // Yellow - medium + [75, 'rgba(132, 204, 22, 0.7)'], // Lime + [100, 'rgba(34, 197, 94, 0.7)'], // Green - large +]; + +// Legacy color stops (for backwards compatibility) +export const LEGACY_COLOR_STOPS: [number, string][] = [ + [0, 'rgba(0,185,243,0)'], + [25, 'rgba(0,185,243,0.24)'], + [60, 'rgba(255,223,0,0.3)'], + [100, 'rgba(255,105,0,0.3)'], +]; + +// Get the appropriate color scheme based on metric type +export function getColorSchemeForMetric(metric: Metric | string): [number, string][] { + switch (metric) { + case Metric.qmprice: + case Metric.price: + case 'qmprice': + case 'total_price': + // Lower price is better → Green for low, Red for high + return LOW_IS_GOOD_COLOR_STOPS; + + case Metric.qm: + case Metric.rooms: + case 'qm': + case 'rooms': + // Higher value is better → Green for high, Red for low + return HIGH_IS_GOOD_COLOR_STOPS; + + default: + return LOW_IS_GOOD_COLOR_STOPS; + } +} + +// Get interpretation text for legend +export function getMetricInterpretation(metric: Metric | string): { low: string; high: string; name: string } { + switch (metric) { + case Metric.qmprice: + case 'qmprice': + return { low: 'Good deal', high: 'Expensive', name: 'Price per m²' }; + + case Metric.price: + case 'total_price': + return { low: 'Good deal', high: 'Expensive', name: 'Total Price' }; + + case Metric.qm: + case 'qm': + return { low: 'Small', high: 'Large', name: 'Size (m²)' }; + + case Metric.rooms: + case 'rooms': + return { low: 'Few rooms', high: 'Many rooms', name: 'Bedrooms' }; + + default: + return { low: 'Low', high: 'High', name: 'Value' }; + } +} + +// Color scheme names for display +export const COLOR_SCHEME_NAMES = { + LOW_IS_GOOD: 'Green → Red (low is good)', + HIGH_IS_GOOD: 'Red → Green (high is good)', + LEGACY: 'Classic (blue → orange)', +} as const; diff --git a/crawler/frontend/src/constants/index.ts b/crawler/frontend/src/constants/index.ts new file mode 100644 index 0000000..4efda18 --- /dev/null +++ b/crawler/frontend/src/constants/index.ts @@ -0,0 +1,63 @@ +// Application constants and configuration + +// Re-export color schemes +export * from './colorSchemes'; + +// API endpoints +export const API_ENDPOINTS = { + LISTING_GEOJSON: '/api/listing_geojson', + LISTING_GEOJSON_STREAM: '/api/listing_geojson/stream', + REFRESH_LISTINGS: '/api/refresh_listings', + TASK_STATUS: '/api/task_status', + TASKS_FOR_USER: '/api/tasks_for_user', + CANCEL_TASK: '/api/cancel_task', + CLEAR_ALL_TASKS: '/api/clear_all_tasks', + GET_DISTRICTS: '/api/get_districts', +} as const; + +// Map configuration +export const MAP_CONFIG = { + MAPBOX_TOKEN: import.meta.env.VITE_MAPBOX_TOKEN || 'pk.eyJ1IjoiZGktdG8iLCJhIjoiY2o0bnBoYXcxMW1mNzJ3bDhmc2xiNWttaiJ9.ZccatVk_4shzoAsEUXXecA', + DEFAULT_CENTER: [13.38032, 49.994210] as [number, number], + DEFAULT_ZOOM: 5, + STYLE: 'mapbox://styles/mapbox/light-v9', +} as const; + +// Heatmap configuration +export const HEATMAP_CONFIG = { + INTENSITY: 9, + SPREAD: 0.05, + CELL_DENSITY: 0.5, // Smaller value = bigger hexagons + SEARCH_BUFFER: 0.001, // ~100m for click detection +} as const; + +// Percentile configuration for data visualization +export const PERCENTILE_CONFIG = { + MIN_BOUND: 0.05, // 5th percentile for color scale minimum + MAX_BOUND: 0.95, // 95th percentile for color scale maximum + BOUNDS_CLIP_MIN: 0.01, // 1st percentile for bounding box + BOUNDS_CLIP_MAX: 0.99, // 99th percentile for bounding box +} as const; + +// Heatmap color gradient stops +export const HEATMAP_COLOR_STOPS: [number, string][] = [ + [0, 'rgba(0,185,243,0)'], + [25, 'rgba(0,185,243,0.24)'], + [60, 'rgba(255,223,0,0.3)'], + [100, 'rgba(255,105,0,0.3)'], +]; + +// Default form values +export const DEFAULT_FORM_VALUES = { + min_bedrooms: 1, + max_bedrooms: 3, + max_price: 3000, + min_price: 2000, + min_sqm: 50, + last_seen_days: 28, +} as const; + +// Polling intervals +export const POLLING_INTERVALS = { + TASK_STATUS_MS: 5000, // 5 seconds +} as const; diff --git a/crawler/frontend/src/services/index.ts b/crawler/frontend/src/services/index.ts new file mode 100644 index 0000000..6807b63 --- /dev/null +++ b/crawler/frontend/src/services/index.ts @@ -0,0 +1,6 @@ +// Re-export all services +export { apiRequest } from './apiClient'; +export { fetchListingGeoJSON, refreshListings } from './listingService'; +export { streamListingGeoJSON, type StreamingProgress } from './streamingService'; +export { fetchTasksForUser, fetchTaskStatus, cancelTask, clearAllTasks, type CancelTaskResponse, type ClearAllTasksResponse } from './taskService'; +export { checkBackendHealth, type HealthStatus, type HealthCheckResult } from './healthService'; diff --git a/crawler/frontend/src/services/taskService.ts b/crawler/frontend/src/services/taskService.ts index cd94bf3..81318a4 100644 --- a/crawler/frontend/src/services/taskService.ts +++ b/crawler/frontend/src/services/taskService.ts @@ -10,6 +10,12 @@ export interface CancelTaskResponse { message: string; } +export interface ClearAllTasksResponse { + success: boolean; + count: number; + message: string; +} + /** * Fetch all active tasks for the current user */ @@ -41,3 +47,12 @@ export async function cancelTask( params: { task_id: taskId }, }); } + +/** + * Clear all tasks for the current user + */ +export async function clearAllTasks(user: User): Promise { + return apiRequest(user, API_ENDPOINTS.CLEAR_ALL_TASKS, { + method: 'POST', + }); +} diff --git a/crawler/redis_repository.py b/crawler/redis_repository.py index 9ece35e..3799e25 100644 --- a/crawler/redis_repository.py +++ b/crawler/redis_repository.py @@ -2,29 +2,31 @@ from datetime import timedelta from functools import lru_cache import json from string import Template -from typing import Any +from typing import Any, TypeVar from api.auth import User import redis from celery_app import app +T = TypeVar("T") + class RedisRepository: - redis_client: redis.Redis - tasks_key_template = Template("user:{user_id}/tasks") + redis_client: redis.Redis # type: ignore[type-arg] + tasks_key_template: Template = Template("user:{user_id}/tasks") - def __init__(self): - redis_hostname = app.broker_connection().info()["hostname"] - redis_port = app.broker_connection().info()["port"] + def __init__(self) -> None: + redis_hostname: str = app.broker_connection().info()["hostname"] + redis_port: int = app.broker_connection().info()["port"] self.redis_client = redis.Redis( host=redis_hostname, port=redis_port, db=0, decode_responses=True ) # decode_responses=True returns str, not bytes - @lru_cache(maxsize=None) @staticmethod - def instance(): + @lru_cache(maxsize=None) + def instance() -> "RedisRepository": return RedisRepository() - def set_key(self, key: str, value, ttl: timedelta | None = None) -> None: + def set_key(self, key: str, value: Any, ttl: timedelta | None = None) -> None: serialized_value = self.__serialize_value(value) self.redis_client.set(key, serialized_value) @@ -37,9 +39,9 @@ class RedisRepository: return None return self.__deserialize_value(serialized_value) - def add_task_for_user(self, user: User, task_id: str): + def add_task_for_user(self, user: User, task_id: str) -> None: # Add the task ID to the Redis set for the user - current_tasks = ( + current_tasks: list[str] = ( self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or [] ) self.set_key( @@ -53,8 +55,27 @@ class RedisRepository: self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or [] ) + def remove_task_for_user(self, user: User, task_id: str) -> bool: + """Remove a specific task from the user's task list.""" + current_tasks: list[str] = self.get_tasks_for_user(user) + if task_id not in current_tasks: + return False + updated_tasks = [t for t in current_tasks if t != task_id] + self.set_key( + self.tasks_key_template.substitute(user_id=user.email), + updated_tasks, + ) + return True + + def clear_tasks_for_user(self, user: User) -> int: + """Clear all tasks for a user. Returns the number of tasks cleared.""" + current_tasks: list[str] = self.get_tasks_for_user(user) + count = len(current_tasks) + self.redis_client.delete(self.tasks_key_template.substitute(user_id=user.email)) + return count + def __serialize_value(self, value: Any) -> str: return json.dumps(value) - def __deserialize_value(self, value_str) -> Any: + def __deserialize_value(self, value_str: str) -> Any: return json.loads(value_str) diff --git a/crawler/services/task_service.py b/crawler/services/task_service.py new file mode 100644 index 0000000..69eb296 --- /dev/null +++ b/crawler/services/task_service.py @@ -0,0 +1,161 @@ +"""Unified task service - shared between CLI and HTTP API. + +Manages background task operations using Celery. +""" +from dataclasses import dataclass +from typing import Any +import json + + +@dataclass +class TaskStatus: + """Status of a background task.""" + task_id: str + status: str # PENDING, STARTED, SUCCESS, FAILURE, REVOKED + result: Any | None + progress: float | None # 0.0 to 1.0 + + +def get_task_status(task_id: str) -> TaskStatus: + """Get the status of a background task. + + Used by: + - API: GET /api/task_status + + Args: + task_id: The Celery task ID + + Returns: + TaskStatus with current state + """ + from tasks.listing_tasks import dump_listings_task + + task_result = dump_listings_task.AsyncResult(task_id) + + # Try to serialize result + try: + result = json.loads(json.dumps(task_result.result)) + except (TypeError, json.JSONDecodeError): + result = str(task_result.result) if task_result.result else None + + # Extract progress from task meta if available + progress = None + if task_result.info and isinstance(task_result.info, dict): + progress = task_result.info.get("progress") + + return TaskStatus( + task_id=task_id, + status=task_result.status, + result=result, + progress=progress, + ) + + +def get_user_tasks(user_email: str) -> list[str]: + """Get all task IDs for a user. + + Used by: + - API: GET /api/tasks_for_user + + Args: + user_email: The user's email address + + Returns: + List of task IDs + """ + from redis_repository import RedisRepository + from api.auth import User + + redis_repo = RedisRepository.instance() + # Create a minimal User object for the lookup + user = User(sub="", email=user_email, name="") + return redis_repo.get_tasks_for_user(user) + + +def add_task_for_user(user_email: str, task_id: str) -> None: + """Associate a task with a user. + + Used by: + - API: POST /api/refresh_listings + + Args: + user_email: The user's email address + task_id: The Celery task ID + """ + from redis_repository import RedisRepository + from api.auth import User + + redis_repo = RedisRepository.instance() + user = User(sub="", email=user_email, name="") + redis_repo.add_task_for_user(user, task_id) + + +def cancel_task(task_id: str, user_email: str | None = None) -> bool: + """Cancel a running task and remove it from the user's task list. + + Args: + task_id: The Celery task ID + user_email: Optional user email to remove task from their list + + Returns: + True if task was cancelled successfully + """ + from celery_app import app as celery_app + + # Revoke the task in Celery + celery_app.control.revoke(task_id, terminate=True) + + # Also remove from user's task list if user_email provided + if user_email: + remove_task_from_user(user_email, task_id) + + return True + + +def remove_task_from_user(user_email: str, task_id: str) -> bool: + """Remove a task from a user's task list without cancelling it. + + Use this to clean up stuck tasks that can't be cancelled via Celery. + + Args: + user_email: The user's email address + task_id: The Celery task ID + + Returns: + True if task was removed, False if not found + """ + from redis_repository import RedisRepository + from api.auth import User + + redis_repo = RedisRepository.instance() + user = User(sub="", email=user_email, name="") + return redis_repo.remove_task_for_user(user, task_id) + + +def clear_all_tasks(user_email: str, revoke: bool = True) -> int: + """Clear all tasks for a user. + + Args: + user_email: The user's email address + revoke: If True, also attempt to revoke tasks in Celery + + Returns: + Number of tasks cleared + """ + from redis_repository import RedisRepository + from celery_app import app as celery_app + from api.auth import User + + redis_repo = RedisRepository.instance() + user = User(sub="", email=user_email, name="") + + # Get tasks before clearing to revoke them + if revoke: + tasks = redis_repo.get_tasks_for_user(user) + for task_id in tasks: + try: + celery_app.control.revoke(task_id, terminate=True) + except Exception: + pass # Best effort, continue clearing + + return redis_repo.clear_tasks_for_user(user)