from datetime import datetime, timedelta import json import logging import logging.config from typing import Annotated from api.auth import get_current_user from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS from dotenv import load_dotenv from fastapi import Depends, FastAPI, Query from api.auth import User from models.listing import QueryParameters from notifications import send_notification from rec import districts from redis_repository import RedisRepository from repositories.listing_repository import ListingRepository from database import engine from fastapi.middleware.cors import CORSMiddleware from tasks import listing_tasks from ui_exporter import export_immoweb load_dotenv() logger = logging.getLogger("uvicorn") # @asynccontextmanager # async def lifespan(app: FastAPI): # alembic_cfg = Config("./alembic.ini") # logger.info("Running alembic migrations") # command.upgrade(alembic_cfg, "head") # logger.info("Finished running alembic migrations") # yield # logger.warning("Shutting down") # app = FastAPI(lifespan=lifespan) app = FastAPI() # Allow CORS (for React frontend) app.add_middleware( CORSMiddleware, allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS], allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/status") async def get_status(): return {"status": "OK"} @app.get("/api/listing") async def get_listing(user: Annotated[User, Depends(get_current_user)]): repository = ListingRepository(engine) listings = await repository.get_listings(limit=5) logger.info(f"Fetched {len(listings)} listings") return {"listings": listings} @app.get("/api/listing_geojson") async def get_listing_geojson( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ): repository = ListingRepository(engine) geojson_data = await export_immoweb( repository, query_parameters=query_parameters, limit=None ) return geojson_data @app.post("/api/refresh_listings") async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Query()], ) -> dict[str, str]: await send_notification( f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}" ) # await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync # return {} # TODO: rate limit expiry_time = datetime.now() + timedelta(minutes=10) task = listing_tasks.dump_listings_task.apply_async( args=(query_parameters.model_dump_json(),), expires=expiry_time, ) redis_repository = RedisRepository.instance() redis_repository.add_task_for_user(user, task.id) return {"task_id": task.id} @app.get("/api/task_status") async def get_task_status( user: Annotated[User, Depends(get_current_user)], task_id: str, ) -> dict[str, str]: task_result = listing_tasks.dump_listings_task.AsyncResult(task_id) try: result = json.dumps(task_result.result) except Exception: result = str(task_result.result) return { "task_id": task_id, "status": task_result.status, "result": result, } @app.get("/api/tasks_for_user") async def get_tasks_for_user( user: Annotated[User, Depends(get_current_user)], ) -> list[str]: redis_repository = RedisRepository.instance() user_tasks = redis_repository.get_tasks_for_user(user) return user_tasks @app.get("/api/get_districts") async def get_districts( user: Annotated[User, Depends(get_current_user)], ) -> dict[str, str]: return districts.get_districts()