setup boilerplate for creating background workers and add 1 for dumping listings

This commit is contained in:
Viktor Barzin 2025-06-21 17:25:56 +00:00
parent 4e4a5ece15
commit 5bd4562205
No known key found for this signature in database
GPG key ID: 4056458DBDBF8863
2 changed files with 67 additions and 24 deletions

View file

@ -6,7 +6,13 @@ from typing import Annotated
import uuid
from api.auth import get_current_user
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
from api.worker import TaskStatus, dump_listings_worker, task_queue, task_results
from api.worker import (
DumpListingsWorker,
TaskStatus,
WorkerManager,
task_queue,
task_results,
)
from fastapi import Depends, FastAPI, HTTPException, Query
from api.auth import User
from models.listing import QueryParameters
@ -19,7 +25,8 @@ from ui_exporter import export_immoweb
app = FastAPI()
# Start worker thread
Thread(target=asyncio.run, args=[dump_listings_worker()], daemon=True).start()
WorkerManager(DumpListingsWorker()).start()
# Allow CORS (for React frontend)
app.add_middleware(

View file

@ -1,8 +1,13 @@
from __future__ import annotations
from abc import abstractmethod
import asyncio
import atexit
import enum
import importlib
from pathlib import Path
from queue import Queue
import queue
from threading import Thread
from database import engine
from models.listing import Listing, QueryParameters
@ -16,30 +21,61 @@ task_queue = Queue(maxsize=1) # Disallow multiple in flight requests for now
task_results = {}
class WorkerManager:
def __init__(self, worker: WorkerThread):
super().__init__()
self._worker = worker
atexit.register(asyncio.run, self.stop())
async def stop(self) -> None:
await self._worker.stop()
self._worker_thread.join()
def start(self):
self._worker_thread = Thread(
target=asyncio.run, args=[self._worker.run()], daemon=True
)
self._worker_thread.start()
class WorkerThread:
@abstractmethod
async def stop(self) -> None: ...
@abstractmethod
async def run(self) -> None: ...
class DumpListingsWorker(WorkerThread):
should_stop = False
async def stop(self) -> None:
self.should_stop = True
async def run(self) -> None: # global results is updated
"""Background worker that processes tasks"""
repository = ListingRepository(engine)
data_dir_path = Path("data/rs")
while not self.should_stop:
task_id, task_data = task_queue.get()
task_results[task_id] = {"status": TaskStatus.PROCESSING}
query_parameters = task_data
try:
new_listings = await dump_listings_module.dump_listings_full(
query_parameters, repository, data_dir_path
)
task_results[task_id] = {
"status": "completed",
"result": f"Fetched {len(new_listings)} new listings for query {task_data}",
}
except Exception as e:
task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)}
finally:
task_queue.task_done()
class TaskStatus(enum.StrEnum):
QUEUED = "queued"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
async def dump_listings_worker() -> None: # global results is updated
"""Background worker that processes tasks"""
repository = ListingRepository(engine)
data_dir_path = Path("data/rs")
while True:
task_id, task_data = task_queue.get()
task_results[task_id] = {"status": TaskStatus.PROCESSING}
query_parameters = task_data
try:
new_listings = await dump_listings_module.dump_listings_full(
query_parameters, repository, data_dir_path
)
task_results[task_id] = {
"status": "completed",
"result": f"Fetched {len(new_listings)} new listings for query {task_data}",
}
except Exception as e:
task_results[task_id] = {"status": TaskStatus.FAILED, "error": str(e)}
finally:
task_queue.task_done()