"""Unified task service - shared between CLI and HTTP API. Manages background task operations using Celery. """ from dataclasses import dataclass from typing import Any import json import logging logger = logging.getLogger(__name__) # Standard Celery states; anything else is treated as a custom state # whose name is used as the human-readable status message. _CELERY_STANDARD_STATES = frozenset( {"PENDING", "STARTED", "SUCCESS", "FAILURE", "REVOKED", "RETRY"} ) @dataclass class TaskStatus: """Status of a background task.""" task_id: str status: str # PENDING, STARTED, SUCCESS, FAILURE, REVOKED, SKIPPED result: Any | None progress: float | None # 0.0 to 1.0 processed: int | None # Number of items processed total: int | None # Total number of items message: str | None # Human-readable status message (e.g., "Fetching listings") error: str | None # Error message if failed traceback: str | None # Full traceback if failed def _make_system_user(email: str) -> Any: """Create a minimal User object used only for Redis key generation. These are *not* real authenticated users -- they exist solely so that RedisRepository can derive the per-user storage key from the email. """ # Lazy import: api.auth imports from api.app which eventually imports # services, so importing at module level would create a circular dependency. from api.auth import User return User(sub="", email=email, name="") def _extract_result(task_result: Any) -> tuple[Any, str | None]: """Extract a serialisable result and an error string from a Celery AsyncResult. Returns: (result, error) -- exactly one of the two will be non-None (or both None for tasks that haven't produced output yet). """ if task_result.failed(): error = str(task_result.result) if task_result.result else None return None, error try: result = json.loads(json.dumps(task_result.result)) except (TypeError, json.JSONDecodeError): result = str(task_result.result) if task_result.result else None return result, None def _extract_progress_info(task_result: Any) -> dict[str, Any]: """Extract progress metadata from a Celery AsyncResult's ``info`` dict. Returns a dict with keys ``progress``, ``processed``, ``total``, and ``message`` (any of which may be None). """ progress: float | None = None processed: int | None = None total: int | None = None message: str | None = None if task_result.info and isinstance(task_result.info, dict): progress = task_result.info.get("progress") processed = task_result.info.get("processed") total = task_result.info.get("total") # Use 'message' if available, fall back to 'reason' for SKIPPED tasks message = task_result.info.get("message") or task_result.info.get("reason") # For custom states (like "Fetching listings"), use the state as message # if no message was provided in info if not message and task_result.status not in _CELERY_STANDARD_STATES: message = task_result.status return { "progress": progress, "processed": processed, "total": total, "message": message, } def get_task_status(task_id: str) -> TaskStatus: """Get the status of a background task. Used by: - API: GET /api/task_status Args: task_id: The Celery task ID Returns: TaskStatus with current state """ # Lazy import: listing_tasks imports the Celery app which in turn # pulls in broker configuration; importing at module level would # create a circular dependency chain. from tasks.listing_tasks import dump_listings_task task_result = dump_listings_task.AsyncResult(task_id) result, error = _extract_result(task_result) task_traceback = task_result.traceback if task_result.failed() else None progress_info = _extract_progress_info(task_result) return TaskStatus( task_id=task_id, status=task_result.status, result=result, error=error, traceback=task_traceback, **progress_info, ) def get_user_tasks(user_email: str) -> list[str]: """Get all task IDs for a user. Used by: - API: GET /api/tasks_for_user Args: user_email: The user's email address Returns: List of task IDs """ # Lazy import: RedisRepository depends on redis which may not be # available at import time in all contexts (CLI, tests). from redis_repository import RedisRepository redis_repo = RedisRepository.instance() user = _make_system_user(user_email) return redis_repo.get_tasks_for_user(user) def add_task_for_user(user_email: str, task_id: str) -> None: """Associate a task with a user. Used by: - API: POST /api/refresh_listings Args: user_email: The user's email address task_id: The Celery task ID """ # Lazy import: see get_user_tasks for rationale. from redis_repository import RedisRepository redis_repo = RedisRepository.instance() user = _make_system_user(user_email) redis_repo.add_task_for_user(user, task_id) def cancel_task(task_id: str, user_email: str | None = None) -> bool: """Cancel a running task and remove it from the user's task list. Args: task_id: The Celery task ID user_email: Optional user email to remove task from their list Returns: True if task was cancelled successfully """ # Lazy import: celery_app bootstraps the broker connection. from celery_app import app as celery_app logger.info("Cancelling task %s (user=%s)", task_id, user_email) # Revoke the task in Celery celery_app.control.revoke(task_id, terminate=True) # Also remove from user's task list if user_email provided if user_email: remove_task_from_user(user_email, task_id) return True def remove_task_from_user(user_email: str, task_id: str) -> bool: """Remove a task from a user's task list without cancelling it. Use this to clean up stuck tasks that can't be cancelled via Celery. Args: user_email: The user's email address task_id: The Celery task ID Returns: True if task was removed, False if not found """ # Lazy import: see get_user_tasks for rationale. from redis_repository import RedisRepository redis_repo = RedisRepository.instance() user = _make_system_user(user_email) return redis_repo.remove_task_for_user(user, task_id) def clear_all_tasks(user_email: str, revoke: bool = True) -> int: """Clear all tasks for a user. Args: user_email: The user's email address revoke: If True, also attempt to revoke tasks in Celery Returns: Number of tasks cleared """ # Lazy imports: see get_user_tasks and cancel_task for rationale. from redis_repository import RedisRepository from celery_app import app as celery_app redis_repo = RedisRepository.instance() user = _make_system_user(user_email) logger.info("Clearing all tasks for user %s (revoke=%s)", user_email, revoke) # Get tasks before clearing to revoke them if revoke: tasks = redis_repo.get_tasks_for_user(user) for task_id in tasks: try: celery_app.control.revoke(task_id, terminate=True) except Exception as e: logger.warning( "Failed to revoke task %s: %s", task_id, e ) return redis_repo.clear_tasks_for_user(user)