import asyncio import enum import importlib from pathlib import Path from queue import Queue from database import engine from models.listing import Listing, QueryParameters from pydantic import BaseModel from repositories.listing_repository import ListingRepository dump_listings_module = importlib.import_module("1_dump_listings") # In-memory task queue and results store task_queue = Queue(maxsize=1) # Disallow multiple in flight requests for now task_results = {} class TaskStatus(enum.StrEnum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" async def dump_listings_worker() -> None: # global results is updated """Background worker that processes tasks""" repository = ListingRepository(engine) data_dir_path = Path("data/rs") while True: task_id, task_data = task_queue.get() task_results[task_id] = {"status": TaskStatus.PROCESSING} query_parameters = task_data try: new_listings = await dump_listings_module.dump_listings_full( query_parameters, repository, data_dir_path ) task_results[task_id] = { "status": "completed", "result": f"Fetched {len(new_listings)} new listings for query {task_data}", } except Exception as e: task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)} finally: task_queue.task_done()