From 5bd4562205ac59a7ed7013a58ae5b3c346076927 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 21 Jun 2025 17:25:56 +0000 Subject: [PATCH] setup boilerplate for creating background workers and add 1 for dumping listings --- crawler/api/app.py | 11 ++++-- crawler/api/worker.py | 80 +++++++++++++++++++++++++++++++------------ 2 files changed, 67 insertions(+), 24 deletions(-) diff --git a/crawler/api/app.py b/crawler/api/app.py index fd0ad16..289c5db 100644 --- a/crawler/api/app.py +++ b/crawler/api/app.py @@ -6,7 +6,13 @@ from typing import Annotated import uuid from api.auth import get_current_user from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS -from api.worker import TaskStatus, dump_listings_worker, task_queue, task_results +from api.worker import ( + DumpListingsWorker, + TaskStatus, + WorkerManager, + task_queue, + task_results, +) from fastapi import Depends, FastAPI, HTTPException, Query from api.auth import User from models.listing import QueryParameters @@ -19,7 +25,8 @@ from ui_exporter import export_immoweb app = FastAPI() # Start worker thread -Thread(target=asyncio.run, args=[dump_listings_worker()], daemon=True).start() +WorkerManager(DumpListingsWorker()).start() + # Allow CORS (for React frontend) app.add_middleware( diff --git a/crawler/api/worker.py b/crawler/api/worker.py index 5404a09..a8e7fe5 100644 --- a/crawler/api/worker.py +++ b/crawler/api/worker.py @@ -1,8 +1,13 @@ +from __future__ import annotations +from abc import abstractmethod import asyncio +import atexit import enum import importlib from pathlib import Path from queue import Queue +import queue +from threading import Thread from database import engine from models.listing import Listing, QueryParameters @@ -16,30 +21,61 @@ task_queue = Queue(maxsize=1) # Disallow multiple in flight requests for now task_results = {} +class WorkerManager: + def __init__(self, worker: WorkerThread): + super().__init__() + self._worker = worker + atexit.register(asyncio.run, self.stop()) + + async def stop(self) -> None: + await self._worker.stop() + self._worker_thread.join() + + def start(self): + self._worker_thread = Thread( + target=asyncio.run, args=[self._worker.run()], daemon=True + ) + self._worker_thread.start() + + +class WorkerThread: + @abstractmethod + async def stop(self) -> None: ... + + @abstractmethod + async def run(self) -> None: ... + + +class DumpListingsWorker(WorkerThread): + should_stop = False + + async def stop(self) -> None: + self.should_stop = True + + async def run(self) -> None: # global results is updated + """Background worker that processes tasks""" + repository = ListingRepository(engine) + data_dir_path = Path("data/rs") + while not self.should_stop: + task_id, task_data = task_queue.get() + task_results[task_id] = {"status": TaskStatus.PROCESSING} + query_parameters = task_data + try: + new_listings = await dump_listings_module.dump_listings_full( + query_parameters, repository, data_dir_path + ) + task_results[task_id] = { + "status": "completed", + "result": f"Fetched {len(new_listings)} new listings for query {task_data}", + } + except Exception as e: + task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)} + finally: + task_queue.task_done() + + class TaskStatus(enum.StrEnum): QUEUED = "queued" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" - - -async def dump_listings_worker() -> None: # global results is updated - """Background worker that processes tasks""" - repository = ListingRepository(engine) - data_dir_path = Path("data/rs") - while True: - task_id, task_data = task_queue.get() - task_results[task_id] = {"status": TaskStatus.PROCESSING} - query_parameters = task_data - try: - new_listings = await dump_listings_module.dump_listings_full( - query_parameters, repository, data_dir_path - ) - task_results[task_id] = { - "status": "completed", - "result": f"Fetched {len(new_listings)} new listings for query {task_data}", - } - except Exception as e: - task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)} - finally: - task_queue.task_done()