"""FastAPI application for the Real Estate Crawler API.""" from datetime import datetime, timedelta import json import logging import logging.config from typing import Annotated, Optional from api.auth import get_current_user from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS from dotenv import load_dotenv from fastapi import Depends, FastAPI, Query from fastapi.responses import StreamingResponse from api.auth import User from models.listing import QueryParameters, ListingType, FurnishType from notifications import send_notification from repositories.listing_repository import ListingRepository from database import engine from fastapi.middleware.cors import CORSMiddleware from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson from services import listing_service, export_service, district_service, task_service from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from api.metrics import metrics_app from opentelemetry.metrics import get_meter load_dotenv() logger = logging.getLogger("uvicorn") def get_query_parameters( listing_type: ListingType, min_bedrooms: int = 1, max_bedrooms: int = 999, min_price: int = 0, max_price: int = 10_000_000, min_sqm: Optional[int] = None, last_seen_days: Optional[int] = None, let_date_available_from: Optional[datetime] = None, furnish_types: Optional[str] = None, # comma-separated list ) -> QueryParameters: """Parse query parameters into QueryParameters model.""" parsed_furnish_types = None if furnish_types: parsed_furnish_types = [FurnishType(f.strip()) for f in furnish_types.split(",")] return QueryParameters( listing_type=listing_type, min_bedrooms=min_bedrooms, max_bedrooms=max_bedrooms, min_price=min_price, max_price=max_price, min_sqm=min_sqm, last_seen_days=last_seen_days, let_date_available_from=let_date_available_from, furnish_types=parsed_furnish_types, ) app = FastAPI() app.mount("/metrics", metrics_app) meter = get_meter(__name__) request_counter = meter.create_counter( name="custom_request_count", description="Number of times /hello was called", ) hist = meter.create_histogram( name="custom_request_duration", description="Duration of /hello requests in seconds", ) # Allow CORS (for React frontend) app.add_middleware( CORSMiddleware, allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS], allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/status") async def get_status() -> dict[str, str]: request_counter.add(1, {"method": "GET", "path": "/status"}) hist.record(1.5, {"method": "GET", "path": "/status"}) return {"status": "OK"} @app.get("/api/listing") async def get_listing( user: Annotated[User, Depends(get_current_user)], limit: int = 5, ) -> dict[str, list]: """Get listings from the database.""" repository = ListingRepository(engine) result = await listing_service.get_listings(repository, limit=limit) logger.info(f"Fetched {result.total_count} listings for {user.email}") return {"listings": result.listings} @app.get("/api/listing_geojson") async def get_listing_geojson( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)], limit: int = 1000, # Default limit to prevent timeout ) -> dict: """Get listings as GeoJSON for map display.""" repository = ListingRepository(engine) result = await export_service.export_to_geojson( repository, query_parameters=query_parameters, limit=limit, ) return result.data @app.get("/api/listing_geojson/stream") async def stream_listing_geojson( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)], batch_size: int = 50, limit: int = 1000, ) -> StreamingResponse: """Stream listings as NDJSON for progressive map loading. Returns newline-delimited JSON with three message types: - metadata: Initial message with batch_size and total_expected count - batch: Array of GeoJSON features - complete: Final message with total count """ async def generate(): repository = ListingRepository(engine) # Phase 1: Fast count for progress estimation total = repository.count_listings(query_parameters) effective_total = min(limit, total) if limit else total yield json.dumps({ "type": "metadata", "batch_size": batch_size, "total_expected": effective_total, }) + "\n" # Phase 2: Stream with column projection and keyset pagination count = 0 batch = [] for row in repository.stream_listings_optimized( query_parameters, limit=limit, page_size=batch_size ): feature = convert_row_to_geojson(row, query_parameters.listing_type.value) batch.append(feature) count += 1 if len(batch) >= batch_size: yield json.dumps({"type": "batch", "features": batch}) + "\n" batch = [] # Send remaining if batch: yield json.dumps({"type": "batch", "features": batch}) + "\n" # Final message yield json.dumps({"type": "complete", "total": count}) + "\n" return StreamingResponse( generate(), media_type="application/x-ndjson", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # Disable nginx buffering } ) @app.post("/api/refresh_listings") async def refresh_listings( user: Annotated[User, Depends(get_current_user)], query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)], ) -> dict[str, str]: """Trigger a background task to refresh listings.""" await send_notification( f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}" ) repository = ListingRepository(engine) result = await listing_service.refresh_listings( repository, query_parameters, async_mode=True, user_email=user.email, ) # Track task for user if result.task_id: task_service.add_task_for_user(user.email, result.task_id) return {"task_id": result.task_id or "", "message": result.message} @app.get("/api/task_status") async def get_task_status( user: Annotated[User, Depends(get_current_user)], task_id: str, ) -> dict[str, str]: """Get the status of a background task.""" status = task_service.get_task_status(task_id) return { "task_id": status.task_id, "status": status.status, "result": json.dumps(status.result) if status.result else "", } @app.get("/api/tasks_for_user") async def get_tasks_for_user( user: Annotated[User, Depends(get_current_user)], ) -> list[str]: """Get all task IDs for the current user.""" return task_service.get_user_tasks(user.email) @app.post("/api/cancel_task") async def cancel_task( user: Annotated[User, Depends(get_current_user)], task_id: str = Query(..., description="The task ID to cancel"), ) -> dict[str, str | bool]: """Cancel a running task and remove it from the user's task list.""" # Verify user owns this task user_tasks = task_service.get_user_tasks(user.email) if task_id not in user_tasks: return {"success": False, "message": "Task not found or not owned by user"} try: task_service.cancel_task(task_id, user_email=user.email) logger.info(f"Task {task_id} cancelled by {user.email}") return {"success": True, "message": "Task cancelled"} except Exception as e: logger.error(f"Failed to cancel task {task_id}: {e}") return {"success": False, "message": str(e)} @app.post("/api/clear_all_tasks") async def clear_all_tasks( user: Annotated[User, Depends(get_current_user)], ) -> dict[str, str | int | bool]: """Clear all tasks for the current user.""" try: count = task_service.clear_all_tasks(user.email) logger.info(f"Cleared {count} tasks for {user.email}") return {"success": True, "count": count, "message": f"Cleared {count} tasks"} except Exception as e: logger.error(f"Failed to clear tasks for {user.email}: {e}") return {"success": False, "count": 0, "message": str(e)} @app.get("/api/get_districts") async def get_districts( user: Annotated[User, Depends(get_current_user)], ) -> dict[str, str]: """Get all available districts.""" return district_service.get_all_districts() FastAPIInstrumentor.instrument_app(app)