- Remove hard-coded limit=1000 default from listing_geojson and streaming endpoints, allowing all matching results to be returned - Add Redis caching service (db=2, 30min TTL) that caches query results as Redis Lists for fast re-queries with reduced DB load - Integrate cache into streaming endpoint: serve from cache on hit, populate cache on miss during DB streaming - Invalidate cache after scrape completes (both success and no-new-listings) - Replace ScrollArea with react-virtuoso in ListView for virtual scrolling, keeping only ~20-30 DOM nodes regardless of list size - Handle metadata streaming message to show "0 / N" progress from start - Throttle frontend state updates with requestAnimationFrame to prevent UI jank from rapid re-renders during cached response streaming
303 lines
10 KiB
Python
303 lines
10 KiB
Python
"""FastAPI application for the Real Estate Crawler API."""
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
import logging
|
|
import logging.config
|
|
from typing import Annotated, Optional
|
|
from api.auth import get_current_user
|
|
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
|
|
from dotenv import load_dotenv
|
|
from fastapi import Depends, FastAPI, Query
|
|
from fastapi.responses import StreamingResponse
|
|
from api.auth import User
|
|
from models.listing import QueryParameters, ListingType, FurnishType
|
|
from notifications import send_notification
|
|
from repositories.listing_repository import ListingRepository
|
|
from database import engine
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson
|
|
|
|
from services import listing_service, export_service, district_service, task_service
|
|
from services.listing_cache import (
|
|
get_cached_count,
|
|
get_cached_features,
|
|
cache_features_batch,
|
|
)
|
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
|
|
from api.metrics import metrics_app
|
|
from opentelemetry.metrics import get_meter
|
|
|
|
|
|
load_dotenv()
|
|
logger = logging.getLogger("uvicorn")
|
|
|
|
|
|
def get_query_parameters(
|
|
listing_type: ListingType,
|
|
min_bedrooms: int = 1,
|
|
max_bedrooms: int = 999,
|
|
min_price: int = 0,
|
|
max_price: int = 10_000_000,
|
|
min_sqm: Optional[int] = None,
|
|
last_seen_days: Optional[int] = None,
|
|
let_date_available_from: Optional[datetime] = None,
|
|
furnish_types: Optional[str] = None, # comma-separated list
|
|
) -> QueryParameters:
|
|
"""Parse query parameters into QueryParameters model."""
|
|
parsed_furnish_types = None
|
|
if furnish_types:
|
|
parsed_furnish_types = [FurnishType(f.strip()) for f in furnish_types.split(",")]
|
|
|
|
return QueryParameters(
|
|
listing_type=listing_type,
|
|
min_bedrooms=min_bedrooms,
|
|
max_bedrooms=max_bedrooms,
|
|
min_price=min_price,
|
|
max_price=max_price,
|
|
min_sqm=min_sqm,
|
|
last_seen_days=last_seen_days,
|
|
let_date_available_from=let_date_available_from,
|
|
furnish_types=parsed_furnish_types,
|
|
)
|
|
|
|
|
|
app = FastAPI()
|
|
app.mount("/metrics", metrics_app)
|
|
meter = get_meter(__name__)
|
|
request_counter = meter.create_counter(
|
|
name="custom_request_count",
|
|
description="Number of times /hello was called",
|
|
)
|
|
hist = meter.create_histogram(
|
|
name="custom_request_duration",
|
|
description="Duration of /hello requests in seconds",
|
|
)
|
|
|
|
|
|
# Allow CORS (for React frontend)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
@app.get("/api/status")
|
|
async def get_status() -> dict[str, str]:
|
|
request_counter.add(1, {"method": "GET", "path": "/status"})
|
|
hist.record(1.5, {"method": "GET", "path": "/status"})
|
|
return {"status": "OK"}
|
|
|
|
|
|
@app.get("/api/listing")
|
|
async def get_listing(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
limit: int = 5,
|
|
) -> dict[str, list]:
|
|
"""Get listings from the database."""
|
|
repository = ListingRepository(engine)
|
|
result = await listing_service.get_listings(repository, limit=limit)
|
|
logger.info(f"Fetched {result.total_count} listings for {user.email}")
|
|
return {"listings": result.listings}
|
|
|
|
|
|
@app.get("/api/listing_geojson")
|
|
async def get_listing_geojson(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
|
limit: int | None = None,
|
|
) -> dict:
|
|
"""Get listings as GeoJSON for map display."""
|
|
repository = ListingRepository(engine)
|
|
result = await export_service.export_to_geojson(
|
|
repository,
|
|
query_parameters=query_parameters,
|
|
limit=limit,
|
|
)
|
|
return result.data
|
|
|
|
|
|
@app.get("/api/listing_geojson/stream")
|
|
async def stream_listing_geojson(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
|
batch_size: int = 50,
|
|
limit: int | None = None,
|
|
) -> StreamingResponse:
|
|
"""Stream listings as NDJSON for progressive map loading.
|
|
|
|
Returns newline-delimited JSON with three message types:
|
|
- metadata: Initial message with batch_size and total_expected count
|
|
- batch: Array of GeoJSON features
|
|
- complete: Final message with total count
|
|
"""
|
|
async def generate():
|
|
# Check cache first
|
|
cached_count = get_cached_count(query_parameters)
|
|
|
|
if cached_count is not None and cached_count > 0:
|
|
# Cache HIT
|
|
effective_total = min(limit, cached_count) if limit else cached_count
|
|
|
|
yield json.dumps({
|
|
"type": "metadata",
|
|
"batch_size": batch_size,
|
|
"total_expected": effective_total,
|
|
"cached": True,
|
|
}) + "\n"
|
|
|
|
count = 0
|
|
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
|
|
if limit and count + len(feature_batch) > limit:
|
|
feature_batch = feature_batch[:limit - count]
|
|
count += len(feature_batch)
|
|
yield json.dumps({"type": "batch", "features": feature_batch}) + "\n"
|
|
if limit and count >= limit:
|
|
break
|
|
|
|
yield json.dumps({"type": "complete", "total": count}) + "\n"
|
|
else:
|
|
# Cache MISS - query DB and populate cache
|
|
repository = ListingRepository(engine)
|
|
|
|
# Phase 1: Fast count for progress estimation
|
|
total = repository.count_listings(query_parameters)
|
|
effective_total = min(limit, total) if limit else total
|
|
|
|
yield json.dumps({
|
|
"type": "metadata",
|
|
"batch_size": batch_size,
|
|
"total_expected": effective_total,
|
|
"cached": False,
|
|
}) + "\n"
|
|
|
|
# Phase 2: Stream with column projection and keyset pagination
|
|
count = 0
|
|
batch = []
|
|
for row in repository.stream_listings_optimized(
|
|
query_parameters, limit=limit, page_size=batch_size
|
|
):
|
|
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
|
|
batch.append(feature)
|
|
count += 1
|
|
|
|
if len(batch) >= batch_size:
|
|
cache_features_batch(query_parameters, batch)
|
|
yield json.dumps({"type": "batch", "features": batch}) + "\n"
|
|
batch = []
|
|
|
|
# Send remaining
|
|
if batch:
|
|
cache_features_batch(query_parameters, batch)
|
|
yield json.dumps({"type": "batch", "features": batch}) + "\n"
|
|
|
|
# Final message
|
|
yield json.dumps({"type": "complete", "total": count}) + "\n"
|
|
|
|
return StreamingResponse(
|
|
generate(),
|
|
media_type="application/x-ndjson",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no", # Disable nginx buffering
|
|
}
|
|
)
|
|
|
|
|
|
@app.post("/api/refresh_listings")
|
|
async def refresh_listings(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
|
|
) -> dict[str, str]:
|
|
"""Trigger a background task to refresh listings."""
|
|
await send_notification(
|
|
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
|
|
)
|
|
|
|
repository = ListingRepository(engine)
|
|
result = await listing_service.refresh_listings(
|
|
repository,
|
|
query_parameters,
|
|
async_mode=True,
|
|
user_email=user.email,
|
|
)
|
|
|
|
# Track task for user
|
|
if result.task_id:
|
|
task_service.add_task_for_user(user.email, result.task_id)
|
|
|
|
return {"task_id": result.task_id or "", "message": result.message}
|
|
|
|
|
|
@app.get("/api/task_status")
|
|
async def get_task_status(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
task_id: str,
|
|
) -> dict[str, str | int | float | None]:
|
|
"""Get the status of a background task."""
|
|
status = task_service.get_task_status(task_id)
|
|
return {
|
|
"task_id": status.task_id,
|
|
"status": status.status,
|
|
"result": json.dumps(status.result) if status.result else None,
|
|
"progress": status.progress,
|
|
"processed": status.processed,
|
|
"total": status.total,
|
|
"message": status.message,
|
|
"error": status.error,
|
|
"traceback": status.traceback,
|
|
}
|
|
|
|
|
|
@app.get("/api/tasks_for_user")
|
|
async def get_tasks_for_user(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
) -> list[str]:
|
|
"""Get all task IDs for the current user."""
|
|
return task_service.get_user_tasks(user.email)
|
|
|
|
|
|
@app.post("/api/cancel_task")
|
|
async def cancel_task(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
task_id: str = Query(..., description="The task ID to cancel"),
|
|
) -> dict[str, str | bool]:
|
|
"""Cancel a running task and remove it from the user's task list."""
|
|
# Verify user owns this task
|
|
user_tasks = task_service.get_user_tasks(user.email)
|
|
if task_id not in user_tasks:
|
|
return {"success": False, "message": "Task not found or not owned by user"}
|
|
|
|
try:
|
|
task_service.cancel_task(task_id, user_email=user.email)
|
|
logger.info(f"Task {task_id} cancelled by {user.email}")
|
|
return {"success": True, "message": "Task cancelled"}
|
|
except Exception as e:
|
|
logger.error(f"Failed to cancel task {task_id}: {e}")
|
|
return {"success": False, "message": str(e)}
|
|
|
|
|
|
@app.post("/api/clear_all_tasks")
|
|
async def clear_all_tasks(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
) -> dict[str, str | int | bool]:
|
|
"""Clear all tasks for the current user."""
|
|
try:
|
|
count = task_service.clear_all_tasks(user.email)
|
|
logger.info(f"Cleared {count} tasks for {user.email}")
|
|
return {"success": True, "count": count, "message": f"Cleared {count} tasks"}
|
|
except Exception as e:
|
|
logger.error(f"Failed to clear tasks for {user.email}: {e}")
|
|
return {"success": False, "count": 0, "message": str(e)}
|
|
|
|
|
|
@app.get("/api/get_districts")
|
|
async def get_districts(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
) -> dict[str, str]:
|
|
"""Get all available districts."""
|
|
return district_service.get_all_districts()
|
|
|
|
|
|
FastAPIInstrumentor.instrument_app(app)
|