"""Unified task service - shared between CLI and HTTP API. Manages background task operations using Celery. """ from dataclasses import dataclass from typing import Any import json @dataclass class TaskStatus: """Status of a background task.""" task_id: str status: str # PENDING, STARTED, SUCCESS, FAILURE, REVOKED, SKIPPED result: Any | None progress: float | None # 0.0 to 1.0 processed: int | None # Number of items processed total: int | None # Total number of items message: str | None # Human-readable status message (e.g., "Fetching listings") error: str | None # Error message if failed traceback: str | None # Full traceback if failed def get_task_status(task_id: str) -> TaskStatus: """Get the status of a background task. Used by: - API: GET /api/task_status Args: task_id: The Celery task ID Returns: TaskStatus with current state """ from tasks.listing_tasks import dump_listings_task task_result = dump_listings_task.AsyncResult(task_id) # Try to serialize result result = None error = None if task_result.failed(): # Extract error message from failed task error = str(task_result.result) if task_result.result else None else: try: result = json.loads(json.dumps(task_result.result)) except (TypeError, json.JSONDecodeError): result = str(task_result.result) if task_result.result else None # Extract traceback if available task_traceback = task_result.traceback if task_result.failed() else None # Extract progress, processed, total, and message from task meta progress = None processed = None total = None message = None if task_result.info and isinstance(task_result.info, dict): progress = task_result.info.get("progress") processed = task_result.info.get("processed") total = task_result.info.get("total") # Use 'message' if available, fall back to 'reason' for SKIPPED tasks message = task_result.info.get("message") or task_result.info.get("reason") # For custom states (like "Fetching listings"), use the state as message # if no message was provided in info if not message and task_result.status not in ( "PENDING", "STARTED", "SUCCESS", "FAILURE", "REVOKED", "RETRY" ): message = task_result.status return TaskStatus( task_id=task_id, status=task_result.status, result=result, progress=progress, processed=processed, total=total, message=message, error=error, traceback=task_traceback, ) def get_user_tasks(user_email: str) -> list[str]: """Get all task IDs for a user. Used by: - API: GET /api/tasks_for_user Args: user_email: The user's email address Returns: List of task IDs """ from redis_repository import RedisRepository from api.auth import User redis_repo = RedisRepository.instance() # Create a minimal User object for the lookup user = User(sub="", email=user_email, name="") return redis_repo.get_tasks_for_user(user) def add_task_for_user(user_email: str, task_id: str) -> None: """Associate a task with a user. Used by: - API: POST /api/refresh_listings Args: user_email: The user's email address task_id: The Celery task ID """ from redis_repository import RedisRepository from api.auth import User redis_repo = RedisRepository.instance() user = User(sub="", email=user_email, name="") redis_repo.add_task_for_user(user, task_id) def cancel_task(task_id: str, user_email: str | None = None) -> bool: """Cancel a running task and remove it from the user's task list. Args: task_id: The Celery task ID user_email: Optional user email to remove task from their list Returns: True if task was cancelled successfully """ from celery_app import app as celery_app # Revoke the task in Celery celery_app.control.revoke(task_id, terminate=True) # Also remove from user's task list if user_email provided if user_email: remove_task_from_user(user_email, task_id) return True def remove_task_from_user(user_email: str, task_id: str) -> bool: """Remove a task from a user's task list without cancelling it. Use this to clean up stuck tasks that can't be cancelled via Celery. Args: user_email: The user's email address task_id: The Celery task ID Returns: True if task was removed, False if not found """ from redis_repository import RedisRepository from api.auth import User redis_repo = RedisRepository.instance() user = User(sub="", email=user_email, name="") return redis_repo.remove_task_for_user(user, task_id) def clear_all_tasks(user_email: str, revoke: bool = True) -> int: """Clear all tasks for a user. Args: user_email: The user's email address revoke: If True, also attempt to revoke tasks in Celery Returns: Number of tasks cleared """ from redis_repository import RedisRepository from celery_app import app as celery_app from api.auth import User redis_repo = RedisRepository.instance() user = User(sub="", email=user_email, name="") # Get tasks before clearing to revoke them if revoke: tasks = redis_repo.get_tasks_for_user(user) for task_id in tasks: try: celery_app.control.revoke(task_id, terminate=True) except Exception: pass # Best effort, continue clearing return redis_repo.clear_tasks_for_user(user)