Fix stuck Celery tasks and add purge all tasks functionality

This commit is contained in:
Viktor Barzin 2026-02-01 20:40:07 +00:00
parent 93f7f57de3
commit 835a2a9d53
8 changed files with 413 additions and 16 deletions

View file

@ -223,14 +223,14 @@ async def cancel_task(
user: Annotated[User, Depends(get_current_user)], user: Annotated[User, Depends(get_current_user)],
task_id: str = Query(..., description="The task ID to cancel"), task_id: str = Query(..., description="The task ID to cancel"),
) -> dict[str, str | bool]: ) -> dict[str, str | bool]:
"""Cancel a running task.""" """Cancel a running task and remove it from the user's task list."""
# Verify user owns this task # Verify user owns this task
user_tasks = task_service.get_user_tasks(user.email) user_tasks = task_service.get_user_tasks(user.email)
if task_id not in user_tasks: if task_id not in user_tasks:
return {"success": False, "message": "Task not found or not owned by user"} return {"success": False, "message": "Task not found or not owned by user"}
try: try:
task_service.cancel_task(task_id) task_service.cancel_task(task_id, user_email=user.email)
logger.info(f"Task {task_id} cancelled by {user.email}") logger.info(f"Task {task_id} cancelled by {user.email}")
return {"success": True, "message": "Task cancelled"} return {"success": True, "message": "Task cancelled"}
except Exception as e: except Exception as e:
@ -238,6 +238,20 @@ async def cancel_task(
return {"success": False, "message": str(e)} return {"success": False, "message": str(e)}
@app.post("/api/clear_all_tasks")
async def clear_all_tasks(
user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str | int | bool]:
"""Clear all tasks for the current user."""
try:
count = task_service.clear_all_tasks(user.email)
logger.info(f"Cleared {count} tasks for {user.email}")
return {"success": True, "count": count, "message": f"Cleared {count} tasks"}
except Exception as e:
logger.error(f"Failed to clear tasks for {user.email}: {e}")
return {"success": False, "count": 0, "message": str(e)}
@app.get("/api/get_districts") @app.get("/api/get_districts")
async def get_districts( async def get_districts(
user: Annotated[User, Depends(get_current_user)], user: Annotated[User, Depends(get_current_user)],

View file

@ -1,12 +1,12 @@
import { getUser } from '@/auth/authService'; import { getUser } from '@/auth/authService';
import { POLLING_INTERVALS } from '@/constants'; import { POLLING_INTERVALS } from '@/constants';
import { fetchTaskStatus, cancelTask } from '@/services'; import { fetchTaskStatus, cancelTask, clearAllTasks } from '@/services';
import { TaskStatus, type TaskResult } from '@/types'; import { TaskStatus, type TaskResult } from '@/types';
import type { User } from 'oidc-client-ts'; import type { User } from 'oidc-client-ts';
import { useEffect, useState } from 'react'; import { useEffect, useState } from 'react';
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from './ui/tooltip'; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from './ui/tooltip';
import { Button } from './ui/button'; import { Button } from './ui/button';
import { Loader2, CheckCircle2, XCircle, X } from 'lucide-react'; import { Loader2, CheckCircle2, XCircle, X, Trash2 } from 'lucide-react';
interface TaskIndicatorProps { interface TaskIndicatorProps {
taskID: string | null; taskID: string | null;
@ -20,6 +20,7 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
const [total, setTotal] = useState<number | null>(null); const [total, setTotal] = useState<number | null>(null);
const [taskStatus, setTaskStatus] = useState<TaskStatus | null>(null); const [taskStatus, setTaskStatus] = useState<TaskStatus | null>(null);
const [isCancelling, setIsCancelling] = useState(false); const [isCancelling, setIsCancelling] = useState(false);
const [isClearing, setIsClearing] = useState(false);
useEffect(() => { useEffect(() => {
getUser().then(setUser); getUser().then(setUser);
@ -104,6 +105,23 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
} }
}; };
const handleClearAll = async () => {
if (!user || isClearing) return;
setIsClearing(true);
try {
const result = await clearAllTasks(user);
if (result.success) {
setTaskStatus(null);
onTaskCancelled?.();
}
} catch {
// Ignore clear errors
} finally {
setIsClearing(false);
}
};
if (!taskID || !taskStatus) { if (!taskID || !taskStatus) {
return null; return null;
} }
@ -195,6 +213,22 @@ export function TaskIndicator({ taskID, onTaskCancelled }: TaskIndicatorProps) {
</TooltipContent> </TooltipContent>
</Tooltip> </Tooltip>
)} )}
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="ghost"
size="icon"
onClick={handleClearAll}
disabled={isClearing}
className="h-6 w-6 text-muted-foreground hover:text-destructive"
>
<Trash2 className="h-3 w-3" />
</Button>
</TooltipTrigger>
<TooltipContent side="bottom">
<p>Clear all tasks</p>
</TooltipContent>
</Tooltip>
</div> </div>
</TooltipProvider> </TooltipProvider>
); );

View file

@ -0,0 +1,83 @@
// Color schemes for map visualization
// Different color schemes for different metrics to improve clarity
import { Metric } from "@/components/Parameters";
// For metrics where LOW is GOOD (price, price per sqm): Green → Yellow → Red
export const LOW_IS_GOOD_COLOR_STOPS: [number, string][] = [
[0, 'rgba(34, 197, 94, 0.7)'], // Green - good deal
[25, 'rgba(132, 204, 22, 0.7)'], // Lime
[50, 'rgba(250, 204, 21, 0.7)'], // Yellow - neutral
[75, 'rgba(249, 115, 22, 0.7)'], // Orange
[100, 'rgba(239, 68, 68, 0.7)'], // Red - expensive
];
// For metrics where HIGH is GOOD (size, rooms): Red → Yellow → Green
export const HIGH_IS_GOOD_COLOR_STOPS: [number, string][] = [
[0, 'rgba(239, 68, 68, 0.7)'], // Red - small
[25, 'rgba(249, 115, 22, 0.7)'], // Orange
[50, 'rgba(250, 204, 21, 0.7)'], // Yellow - medium
[75, 'rgba(132, 204, 22, 0.7)'], // Lime
[100, 'rgba(34, 197, 94, 0.7)'], // Green - large
];
// Legacy color stops (for backwards compatibility)
export const LEGACY_COLOR_STOPS: [number, string][] = [
[0, 'rgba(0,185,243,0)'],
[25, 'rgba(0,185,243,0.24)'],
[60, 'rgba(255,223,0,0.3)'],
[100, 'rgba(255,105,0,0.3)'],
];
// Get the appropriate color scheme based on metric type
export function getColorSchemeForMetric(metric: Metric | string): [number, string][] {
switch (metric) {
case Metric.qmprice:
case Metric.price:
case 'qmprice':
case 'total_price':
// Lower price is better → Green for low, Red for high
return LOW_IS_GOOD_COLOR_STOPS;
case Metric.qm:
case Metric.rooms:
case 'qm':
case 'rooms':
// Higher value is better → Green for high, Red for low
return HIGH_IS_GOOD_COLOR_STOPS;
default:
return LOW_IS_GOOD_COLOR_STOPS;
}
}
// Get interpretation text for legend
export function getMetricInterpretation(metric: Metric | string): { low: string; high: string; name: string } {
switch (metric) {
case Metric.qmprice:
case 'qmprice':
return { low: 'Good deal', high: 'Expensive', name: 'Price per m²' };
case Metric.price:
case 'total_price':
return { low: 'Good deal', high: 'Expensive', name: 'Total Price' };
case Metric.qm:
case 'qm':
return { low: 'Small', high: 'Large', name: 'Size (m²)' };
case Metric.rooms:
case 'rooms':
return { low: 'Few rooms', high: 'Many rooms', name: 'Bedrooms' };
default:
return { low: 'Low', high: 'High', name: 'Value' };
}
}
// Color scheme names for display
export const COLOR_SCHEME_NAMES = {
LOW_IS_GOOD: 'Green → Red (low is good)',
HIGH_IS_GOOD: 'Red → Green (high is good)',
LEGACY: 'Classic (blue → orange)',
} as const;

View file

@ -0,0 +1,63 @@
// Application constants and configuration
// Re-export color schemes
export * from './colorSchemes';
// API endpoints
export const API_ENDPOINTS = {
LISTING_GEOJSON: '/api/listing_geojson',
LISTING_GEOJSON_STREAM: '/api/listing_geojson/stream',
REFRESH_LISTINGS: '/api/refresh_listings',
TASK_STATUS: '/api/task_status',
TASKS_FOR_USER: '/api/tasks_for_user',
CANCEL_TASK: '/api/cancel_task',
CLEAR_ALL_TASKS: '/api/clear_all_tasks',
GET_DISTRICTS: '/api/get_districts',
} as const;
// Map configuration
export const MAP_CONFIG = {
MAPBOX_TOKEN: import.meta.env.VITE_MAPBOX_TOKEN || 'pk.eyJ1IjoiZGktdG8iLCJhIjoiY2o0bnBoYXcxMW1mNzJ3bDhmc2xiNWttaiJ9.ZccatVk_4shzoAsEUXXecA',
DEFAULT_CENTER: [13.38032, 49.994210] as [number, number],
DEFAULT_ZOOM: 5,
STYLE: 'mapbox://styles/mapbox/light-v9',
} as const;
// Heatmap configuration
export const HEATMAP_CONFIG = {
INTENSITY: 9,
SPREAD: 0.05,
CELL_DENSITY: 0.5, // Smaller value = bigger hexagons
SEARCH_BUFFER: 0.001, // ~100m for click detection
} as const;
// Percentile configuration for data visualization
export const PERCENTILE_CONFIG = {
MIN_BOUND: 0.05, // 5th percentile for color scale minimum
MAX_BOUND: 0.95, // 95th percentile for color scale maximum
BOUNDS_CLIP_MIN: 0.01, // 1st percentile for bounding box
BOUNDS_CLIP_MAX: 0.99, // 99th percentile for bounding box
} as const;
// Heatmap color gradient stops
export const HEATMAP_COLOR_STOPS: [number, string][] = [
[0, 'rgba(0,185,243,0)'],
[25, 'rgba(0,185,243,0.24)'],
[60, 'rgba(255,223,0,0.3)'],
[100, 'rgba(255,105,0,0.3)'],
];
// Default form values
export const DEFAULT_FORM_VALUES = {
min_bedrooms: 1,
max_bedrooms: 3,
max_price: 3000,
min_price: 2000,
min_sqm: 50,
last_seen_days: 28,
} as const;
// Polling intervals
export const POLLING_INTERVALS = {
TASK_STATUS_MS: 5000, // 5 seconds
} as const;

View file

@ -0,0 +1,6 @@
// Re-export all services
export { apiRequest } from './apiClient';
export { fetchListingGeoJSON, refreshListings } from './listingService';
export { streamListingGeoJSON, type StreamingProgress } from './streamingService';
export { fetchTasksForUser, fetchTaskStatus, cancelTask, clearAllTasks, type CancelTaskResponse, type ClearAllTasksResponse } from './taskService';
export { checkBackendHealth, type HealthStatus, type HealthCheckResult } from './healthService';

View file

@ -10,6 +10,12 @@ export interface CancelTaskResponse {
message: string; message: string;
} }
export interface ClearAllTasksResponse {
success: boolean;
count: number;
message: string;
}
/** /**
* Fetch all active tasks for the current user * Fetch all active tasks for the current user
*/ */
@ -41,3 +47,12 @@ export async function cancelTask(
params: { task_id: taskId }, params: { task_id: taskId },
}); });
} }
/**
* Clear all tasks for the current user
*/
export async function clearAllTasks(user: User): Promise<ClearAllTasksResponse> {
return apiRequest<ClearAllTasksResponse>(user, API_ENDPOINTS.CLEAR_ALL_TASKS, {
method: 'POST',
});
}

View file

@ -2,29 +2,31 @@ from datetime import timedelta
from functools import lru_cache from functools import lru_cache
import json import json
from string import Template from string import Template
from typing import Any from typing import Any, TypeVar
from api.auth import User from api.auth import User
import redis import redis
from celery_app import app from celery_app import app
T = TypeVar("T")
class RedisRepository: class RedisRepository:
redis_client: redis.Redis redis_client: redis.Redis # type: ignore[type-arg]
tasks_key_template = Template("user:{user_id}/tasks") tasks_key_template: Template = Template("user:{user_id}/tasks")
def __init__(self): def __init__(self) -> None:
redis_hostname = app.broker_connection().info()["hostname"] redis_hostname: str = app.broker_connection().info()["hostname"]
redis_port = app.broker_connection().info()["port"] redis_port: int = app.broker_connection().info()["port"]
self.redis_client = redis.Redis( self.redis_client = redis.Redis(
host=redis_hostname, port=redis_port, db=0, decode_responses=True host=redis_hostname, port=redis_port, db=0, decode_responses=True
) # decode_responses=True returns str, not bytes ) # decode_responses=True returns str, not bytes
@lru_cache(maxsize=None)
@staticmethod @staticmethod
def instance(): @lru_cache(maxsize=None)
def instance() -> "RedisRepository":
return RedisRepository() return RedisRepository()
def set_key(self, key: str, value, ttl: timedelta | None = None) -> None: def set_key(self, key: str, value: Any, ttl: timedelta | None = None) -> None:
serialized_value = self.__serialize_value(value) serialized_value = self.__serialize_value(value)
self.redis_client.set(key, serialized_value) self.redis_client.set(key, serialized_value)
@ -37,9 +39,9 @@ class RedisRepository:
return None return None
return self.__deserialize_value(serialized_value) return self.__deserialize_value(serialized_value)
def add_task_for_user(self, user: User, task_id: str): def add_task_for_user(self, user: User, task_id: str) -> None:
# Add the task ID to the Redis set for the user # Add the task ID to the Redis set for the user
current_tasks = ( current_tasks: list[str] = (
self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or [] self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or []
) )
self.set_key( self.set_key(
@ -53,8 +55,27 @@ class RedisRepository:
self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or [] self.get_key(self.tasks_key_template.substitute(user_id=user.email)) or []
) )
def remove_task_for_user(self, user: User, task_id: str) -> bool:
"""Remove a specific task from the user's task list."""
current_tasks: list[str] = self.get_tasks_for_user(user)
if task_id not in current_tasks:
return False
updated_tasks = [t for t in current_tasks if t != task_id]
self.set_key(
self.tasks_key_template.substitute(user_id=user.email),
updated_tasks,
)
return True
def clear_tasks_for_user(self, user: User) -> int:
"""Clear all tasks for a user. Returns the number of tasks cleared."""
current_tasks: list[str] = self.get_tasks_for_user(user)
count = len(current_tasks)
self.redis_client.delete(self.tasks_key_template.substitute(user_id=user.email))
return count
def __serialize_value(self, value: Any) -> str: def __serialize_value(self, value: Any) -> str:
return json.dumps(value) return json.dumps(value)
def __deserialize_value(self, value_str) -> Any: def __deserialize_value(self, value_str: str) -> Any:
return json.loads(value_str) return json.loads(value_str)

View file

@ -0,0 +1,161 @@
"""Unified task service - shared between CLI and HTTP API.
Manages background task operations using Celery.
"""
from dataclasses import dataclass
from typing import Any
import json
@dataclass
class TaskStatus:
"""Status of a background task."""
task_id: str
status: str # PENDING, STARTED, SUCCESS, FAILURE, REVOKED
result: Any | None
progress: float | None # 0.0 to 1.0
def get_task_status(task_id: str) -> TaskStatus:
"""Get the status of a background task.
Used by:
- API: GET /api/task_status
Args:
task_id: The Celery task ID
Returns:
TaskStatus with current state
"""
from tasks.listing_tasks import dump_listings_task
task_result = dump_listings_task.AsyncResult(task_id)
# Try to serialize result
try:
result = json.loads(json.dumps(task_result.result))
except (TypeError, json.JSONDecodeError):
result = str(task_result.result) if task_result.result else None
# Extract progress from task meta if available
progress = None
if task_result.info and isinstance(task_result.info, dict):
progress = task_result.info.get("progress")
return TaskStatus(
task_id=task_id,
status=task_result.status,
result=result,
progress=progress,
)
def get_user_tasks(user_email: str) -> list[str]:
"""Get all task IDs for a user.
Used by:
- API: GET /api/tasks_for_user
Args:
user_email: The user's email address
Returns:
List of task IDs
"""
from redis_repository import RedisRepository
from api.auth import User
redis_repo = RedisRepository.instance()
# Create a minimal User object for the lookup
user = User(sub="", email=user_email, name="")
return redis_repo.get_tasks_for_user(user)
def add_task_for_user(user_email: str, task_id: str) -> None:
"""Associate a task with a user.
Used by:
- API: POST /api/refresh_listings
Args:
user_email: The user's email address
task_id: The Celery task ID
"""
from redis_repository import RedisRepository
from api.auth import User
redis_repo = RedisRepository.instance()
user = User(sub="", email=user_email, name="")
redis_repo.add_task_for_user(user, task_id)
def cancel_task(task_id: str, user_email: str | None = None) -> bool:
"""Cancel a running task and remove it from the user's task list.
Args:
task_id: The Celery task ID
user_email: Optional user email to remove task from their list
Returns:
True if task was cancelled successfully
"""
from celery_app import app as celery_app
# Revoke the task in Celery
celery_app.control.revoke(task_id, terminate=True)
# Also remove from user's task list if user_email provided
if user_email:
remove_task_from_user(user_email, task_id)
return True
def remove_task_from_user(user_email: str, task_id: str) -> bool:
"""Remove a task from a user's task list without cancelling it.
Use this to clean up stuck tasks that can't be cancelled via Celery.
Args:
user_email: The user's email address
task_id: The Celery task ID
Returns:
True if task was removed, False if not found
"""
from redis_repository import RedisRepository
from api.auth import User
redis_repo = RedisRepository.instance()
user = User(sub="", email=user_email, name="")
return redis_repo.remove_task_for_user(user, task_id)
def clear_all_tasks(user_email: str, revoke: bool = True) -> int:
"""Clear all tasks for a user.
Args:
user_email: The user's email address
revoke: If True, also attempt to revoke tasks in Celery
Returns:
Number of tasks cleared
"""
from redis_repository import RedisRepository
from celery_app import app as celery_app
from api.auth import User
redis_repo = RedisRepository.instance()
user = User(sub="", email=user_email, name="")
# Get tasks before clearing to revoke them
if revoke:
tasks = redis_repo.get_tasks_for_user(user)
for task_id in tasks:
try:
celery_app.control.revoke(task_id, terminate=True)
except Exception:
pass # Best effort, continue clearing
return redis_repo.clear_tasks_for_user(user)