wrongmove/crawler/api/worker.py

80 lines
2.3 KiB
Python
Raw Normal View History

from __future__ import annotations
from abc import abstractmethod
import asyncio
import atexit
import enum
import importlib
from pathlib import Path
from queue import Queue
import queue
from threading import Thread
from database import engine
from repositories.listing_repository import ListingRepository
dump_listings_module = importlib.import_module("1_dump_listings")
# In-memory task queue and results store
task_queue = Queue(maxsize=1) # Disallow multiple in flight requests for now
task_results = {}
class WorkerManager:
def __init__(self, worker: WorkerThread):
super().__init__()
self._worker = worker
atexit.register(asyncio.run, self.stop())
async def stop(self) -> None:
await self._worker.stop()
self._worker_thread.join()
def start(self):
self._worker_thread = Thread(
target=asyncio.run, args=[self._worker.run()], daemon=True
)
self._worker_thread.start()
class WorkerThread:
@abstractmethod
async def stop(self) -> None: ...
@abstractmethod
async def run(self) -> None: ...
class DumpListingsWorker(WorkerThread):
should_stop = False
async def stop(self) -> None:
self.should_stop = True
async def run(self) -> None: # global results is updated
"""Background worker that processes tasks"""
repository = ListingRepository(engine)
data_dir_path = Path("data/rs")
while not self.should_stop:
task_id, task_data = task_queue.get()
task_results[task_id] = {"status": TaskStatus.PROCESSING}
query_parameters = task_data
try:
new_listings = await dump_listings_module.dump_listings_full(
query_parameters, repository, data_dir_path
)
task_results[task_id] = {
"status": "completed",
"result": f"Fetched {len(new_listings)} new listings for query {task_data}",
}
except Exception as e:
task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)}
finally:
task_queue.task_done()
class TaskStatus(enum.StrEnum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"