import asyncio import dataclasses from datetime import datetime, timedelta import json import logging import logging.config from pathlib import Path import queue from threading import Thread from typing import Annotated import uuid from api.auth import get_current_user from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS from dotenv import load_dotenv from fastapi import Depends, FastAPI, HTTPException, Query from api.auth import User from models.listing import QueryParameters from rec import districts from repositories.listing_repository import ListingRepository from repositories.listing_repository import ListingRepository from database import engine from fastapi.middleware.cors import CORSMiddleware from tasks import listing_tasks from ui_exporter import export_immoweb from alembic import command from alembic.config import Config from contextlib import asynccontextmanager from celery.exceptions import TaskRevokedError load_dotenv() logger = logging.getLogger("uvicorn") # @asynccontextmanager # async def lifespan(app: FastAPI): # alembic_cfg = Config("./alembic.ini") # logger.info("Running alembic migrations") # command.upgrade(alembic_cfg, "head") # logger.info("Finished running alembic migrations") # yield # logger.warning("Shutting down") # app = FastAPI(lifespan=lifespan) app = FastAPI() # Allow CORS (for React frontend) app.add_middleware( CORSMiddleware, allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS], allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/listing") async def get_listing(user: Annotated[User, Depends(get_current_user)]): repository = ListingRepository(engine) listings = await repository.get_listings(limit=5) logger.info(f"Fetched {len(listings)} listings") return {"listings": listings} @app.get("/api/listing_geojson") async def get_listing_geojson( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ): repository = ListingRepository(engine) geojson_data = await export_immoweb( repository, query_parameters=query_parameters, limit=None ) return geojson_data @app.post("/api/refresh_listings") async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( args=(query_parameters.model_dump_json(),), expires=expiry_time, ) return {"task_id": task.id} @app.get("/api/task_status") async def get_task_status( user: Annotated[User, Depends(get_current_user)], task_id: str, ) -> dict[str, str]: task_result = listing_tasks.dump_listings_task.AsyncResult(task_id) try: result = json.dumps(task_result.result) except: result = str(task_result.result) return { "task_id": task_id, "status": task_result.status, "result": result, } @app.get("/api/get_districts") async def get_districts( user: Annotated[User, Depends(get_current_user)], ) -> dict[str, str]: return districts.get_districts()