Add configurable scheduling, UI health/task indicators, and auto-load map with default filters

This commit is contained in:
Viktor Barzin 2026-02-01 17:28:37 +00:00
parent 1c8c3e4657
commit c7ac448f15
18 changed files with 2287 additions and 656 deletions

View file

@ -1,30 +1,25 @@
"""FastAPI application for the Real Estate Crawler API."""
from datetime import datetime, timedelta
import json
import logging
import logging.config
from typing import Annotated
from typing import Annotated, Optional
from api.auth import get_current_user
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Query
from fastapi.responses import StreamingResponse
from api.auth import User
from models.listing import QueryParameters
from models.listing import QueryParameters, ListingType, FurnishType
from notifications import send_notification
from rec import districts
from redis_repository import RedisRepository
from repositories.listing_repository import ListingRepository
from database import engine
from fastapi.middleware.cors import CORSMiddleware
from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson
from tasks import listing_tasks
from ui_exporter import export_immoweb
from alembic import command
from alembic.config import Config
from contextlib import asynccontextmanager
from celery.exceptions import TaskRevokedError
from celery_app import app as celery_app
from services import listing_service, export_service, district_service, task_service
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from api.metrics import metrics_app # Import the Prometheus ASGI app
from api.metrics import metrics_app
from opentelemetry.metrics import get_meter
@ -32,17 +27,35 @@ load_dotenv()
logger = logging.getLogger("uvicorn")
# @asynccontextmanager
# async def lifespan(app: FastAPI):
# alembic_cfg = Config("./alembic.ini")
# logger.info("Running alembic migrations")
# command.upgrade(alembic_cfg, "head")
# logger.info("Finished running alembic migrations")
# yield
# logger.warning("Shutting down")
def get_query_parameters(
listing_type: ListingType,
min_bedrooms: int = 1,
max_bedrooms: int = 999,
min_price: int = 0,
max_price: int = 10_000_000,
min_sqm: Optional[int] = None,
last_seen_days: Optional[int] = None,
let_date_available_from: Optional[datetime] = None,
furnish_types: Optional[str] = None, # comma-separated list
) -> QueryParameters:
"""Parse query parameters into QueryParameters model."""
parsed_furnish_types = None
if furnish_types:
parsed_furnish_types = [FurnishType(f.strip()) for f in furnish_types.split(",")]
return QueryParameters(
listing_type=listing_type,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
min_sqm=min_sqm,
last_seen_days=last_seen_days,
let_date_available_from=let_date_available_from,
furnish_types=parsed_furnish_types,
)
# app = FastAPI(lifespan=lifespan)
app = FastAPI()
app.mount("/metrics", metrics_app)
meter = get_meter(__name__)
@ -66,52 +79,121 @@ app.add_middleware(
@app.get("/api/status")
async def get_status():
async def get_status() -> dict[str, str]:
request_counter.add(1, {"method": "GET", "path": "/status"})
hist.record(1.5, {"method": "GET", "path": "/status"})
return {"status": "OK"}
@app.get("/api/listing")
async def get_listing(user: Annotated[User, Depends(get_current_user)]):
async def get_listing(
user: Annotated[User, Depends(get_current_user)],
limit: int = 5,
) -> dict[str, list]:
"""Get listings from the database."""
repository = ListingRepository(engine)
listings = await repository.get_listings(limit=5)
logger.info(f"Fetched {len(listings)} listings")
return {"listings": listings}
result = await listing_service.get_listings(repository, limit=limit)
logger.info(f"Fetched {result.total_count} listings for {user.email}")
return {"listings": result.listings}
@app.get("/api/listing_geojson")
async def get_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
):
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
limit: int = 1000, # Default limit to prevent timeout
) -> dict:
"""Get listings as GeoJSON for map display."""
repository = ListingRepository(engine)
geojson_data = await export_immoweb(
repository, query_parameters=query_parameters, limit=None
result = await export_service.export_to_geojson(
repository,
query_parameters=query_parameters,
limit=limit,
)
return result.data
@app.get("/api/listing_geojson/stream")
async def stream_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
batch_size: int = 50,
limit: int = 1000,
) -> StreamingResponse:
"""Stream listings as NDJSON for progressive map loading.
Returns newline-delimited JSON with three message types:
- metadata: Initial message with batch_size and total_expected count
- batch: Array of GeoJSON features
- complete: Final message with total count
"""
async def generate():
repository = ListingRepository(engine)
# Phase 1: Fast count for progress estimation
total = repository.count_listings(query_parameters)
effective_total = min(limit, total) if limit else total
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
}) + "\n"
# Phase 2: Stream with column projection and keyset pagination
count = 0
batch = []
for row in repository.stream_listings_optimized(
query_parameters, limit=limit, page_size=batch_size
):
feature = convert_row_to_geojson(row)
batch.append(feature)
count += 1
if len(batch) >= batch_size:
yield json.dumps({"type": "batch", "features": batch}) + "\n"
batch = []
# Send remaining
if batch:
yield json.dumps({"type": "batch", "features": batch}) + "\n"
# Final message
yield json.dumps({"type": "complete", "total": count}) + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
return geojson_data
@app.post("/api/refresh_listings")
async def refresh_listings(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Query()],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
) -> dict[str, str]:
"""Trigger a background task to refresh listings."""
await send_notification(
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
)
# await listing_tasks.async_dump_listings_task(query_parameters.model_dump_json()) # Use this for local debugging - run task in sync
# return {}
# TODO: rate limit
expiry_time = datetime.now() + timedelta(minutes=10)
task = listing_tasks.dump_listings_task.apply_async(
args=(query_parameters.model_dump_json(),),
expires=expiry_time,
repository = ListingRepository(engine)
result = await listing_service.refresh_listings(
repository,
query_parameters,
async_mode=True,
user_email=user.email,
)
redis_repository = RedisRepository.instance()
redis_repository.add_task_for_user(user, task.id)
return {"task_id": task.id}
# Track task for user
if result.task_id:
task_service.add_task_for_user(user.email, result.task_id)
return {"task_id": result.task_id or "", "message": result.message}
@app.get("/api/task_status")
@ -119,16 +201,12 @@ async def get_task_status(
user: Annotated[User, Depends(get_current_user)],
task_id: str,
) -> dict[str, str]:
task_result = listing_tasks.dump_listings_task.AsyncResult(task_id)
try:
result = json.dumps(task_result.result)
except Exception:
result = str(task_result.result)
"""Get the status of a background task."""
status = task_service.get_task_status(task_id)
return {
"task_id": task_id,
"status": task_result.status,
"result": result,
"task_id": status.task_id,
"status": status.status,
"result": json.dumps(status.result) if status.result else "",
}
@ -136,16 +214,36 @@ async def get_task_status(
async def get_tasks_for_user(
user: Annotated[User, Depends(get_current_user)],
) -> list[str]:
redis_repository = RedisRepository.instance()
user_tasks = redis_repository.get_tasks_for_user(user)
return user_tasks
"""Get all task IDs for the current user."""
return task_service.get_user_tasks(user.email)
@app.post("/api/cancel_task")
async def cancel_task(
user: Annotated[User, Depends(get_current_user)],
task_id: str = Query(..., description="The task ID to cancel"),
) -> dict[str, str | bool]:
"""Cancel a running task."""
# Verify user owns this task
user_tasks = task_service.get_user_tasks(user.email)
if task_id not in user_tasks:
return {"success": False, "message": "Task not found or not owned by user"}
try:
task_service.cancel_task(task_id)
logger.info(f"Task {task_id} cancelled by {user.email}")
return {"success": True, "message": "Task cancelled"}
except Exception as e:
logger.error(f"Failed to cancel task {task_id}: {e}")
return {"success": False, "message": str(e)}
@app.get("/api/get_districts")
async def get_districts(
user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str]:
return districts.get_districts()
"""Get all available districts."""
return district_service.get_all_districts()
FastAPIInstrumentor.instrument_app(app)