import asyncio from pathlib import Path import queue from threading import Thread from typing import Annotated import uuid from api.auth import get_current_user from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS from api.worker import ( DumpListingsWorker, TaskStatus, WorkerManager, task_queue, task_results, ) from fastapi import Depends, FastAPI, HTTPException, Query from api.auth import User from logger import get_logger from models.listing import QueryParameters from repositories.listing_repository import ListingRepository from repositories.listing_repository import ListingRepository from database import engine from fastapi.middleware.cors import CORSMiddleware from ui_exporter import export_immoweb from alembic import command from alembic.config import Config from contextlib import asynccontextmanager logger = get_logger(__file__) @asynccontextmanager async def lifespan(app: FastAPI): alembic_cfg = Config("./alembic.ini") logger.info("Running alembic migrations") command.upgrade(alembic_cfg, "head") logger.info("Finished running alembic migrations") yield logger.warning("Shutting down") app = FastAPI(lifespan=lifespan) # Start worker thread WorkerManager(DumpListingsWorker()).start() # Allow CORS (for React frontend) app.add_middleware( CORSMiddleware, allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS], allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/listing") async def get_listing(user: Annotated[User, Depends(get_current_user)]): repository = ListingRepository(engine) listings = await repository.get_listings(limit=5) logger.info(f"Fetched {len(listings)} listings") return {"listings": listings} @app.get("/api/listing_geojson") async def get_listing_geojson( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ): repository = ListingRepository(engine) geojson_data = await export_immoweb( repository, query_parameters=query_parameters, limit=None ) return geojson_data @app.post("/api/refresh_listings") async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: # Submit processing task task_id = str(uuid.uuid4()) task_results[task_id] = {"status": TaskStatus.QUEUED} try: task_queue.put_nowait( (task_id, query_parameters), ) except queue.Full: raise HTTPException( status_code=429, detail="Already processing at maximum capacity. Please try again later", ) return {"task_id": task_id} @app.get("/api/task_status") async def get_task_status( user: Annotated[User, Depends(get_current_user)], task_id: str, ) -> dict[str, str]: if task_id not in task_results: return {"status": "not_found"} return task_results[task_id]