wrongmove/crawler/api/app.py

105 lines
2.9 KiB
Python
Raw Normal View History

import asyncio
from pathlib import Path
import queue
from threading import Thread
2025-06-11 21:08:11 +00:00
from typing import Annotated
import uuid
from api.auth import get_current_user
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
from api.worker import (
DumpListingsWorker,
TaskStatus,
WorkerManager,
task_queue,
task_results,
)
from fastapi import Depends, FastAPI, HTTPException, Query
from api.auth import User
from logger import get_logger
from models.listing import QueryParameters
from repositories.listing_repository import ListingRepository
from repositories.listing_repository import ListingRepository
from database import engine
from fastapi.middleware.cors import CORSMiddleware
from ui_exporter import export_immoweb
2025-06-21 21:52:51 +00:00
from alembic import command
from alembic.config import Config
from contextlib import asynccontextmanager
logger = get_logger(__file__)
2025-06-21 21:52:51 +00:00
@asynccontextmanager
async def lifespan(app: FastAPI):
alembic_cfg = Config("./alembic.ini")
logger.info("Running alembic migrations")
2025-06-21 21:52:51 +00:00
command.upgrade(alembic_cfg, "head")
logger.info("Finished running alembic migrations")
2025-06-21 21:52:51 +00:00
yield
logger.warning("Shutting down")
2025-06-21 21:52:51 +00:00
app = FastAPI(lifespan=lifespan)
2025-06-11 21:08:11 +00:00
# Start worker thread
WorkerManager(DumpListingsWorker()).start()
# Allow CORS (for React frontend)
app.add_middleware(
CORSMiddleware,
allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/listing")
async def get_listing(user: Annotated[User, Depends(get_current_user)]):
repository = ListingRepository(engine)
listings = await repository.get_listings(limit=5)
logger.info(f"Fetched {len(listings)} listings")
return {"listings": listings}
@app.get("/api/listing_geojson")
async def get_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
):
repository = ListingRepository(engine)
geojson_data = await export_immoweb(
repository, query_parameters=query_parameters, limit=None
)
return geojson_data
@app.post("/api/refresh_listings")
async def refresh_listings(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
) -> dict[str, str]:
# Submit processing task
task_id = str(uuid.uuid4())
task_results[task_id] = {"status": TaskStatus.QUEUED}
try:
task_queue.put_nowait(
(task_id, query_parameters),
)
except queue.Full:
raise HTTPException(
status_code=429,
detail="Already processing at maximum capacity. Please try again later",
)
return {"task_id": task_id}
@app.get("/api/task_status")
async def get_task_status(
user: Annotated[User, Depends(get_current_user)],
task_id: str,
) -> dict[str, str]:
if task_id not in task_results:
return {"status": "not_found"}
return task_results[task_id]