wrongmove/crawler/api/app.py
Viktor Barzin 5514fa6381 Remove 1000-result limit, add Redis caching and virtual scrolling
- Remove hard-coded limit=1000 default from listing_geojson and streaming
  endpoints, allowing all matching results to be returned
- Add Redis caching service (db=2, 30min TTL) that caches query results
  as Redis Lists for fast re-queries with reduced DB load
- Integrate cache into streaming endpoint: serve from cache on hit,
  populate cache on miss during DB streaming
- Invalidate cache after scrape completes (both success and no-new-listings)
- Replace ScrollArea with react-virtuoso in ListView for virtual scrolling,
  keeping only ~20-30 DOM nodes regardless of list size
- Handle metadata streaming message to show "0 / N" progress from start
- Throttle frontend state updates with requestAnimationFrame to prevent
  UI jank from rapid re-renders during cached response streaming
2026-02-06 20:47:36 +00:00

303 lines
10 KiB
Python

"""FastAPI application for the Real Estate Crawler API."""
from datetime import datetime, timedelta
import json
import logging
import logging.config
from typing import Annotated, Optional
from api.auth import get_current_user
from api.config import DEV_TIER_ORIGINS, PROD_TIER_ORIGINS
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, Query
from fastapi.responses import StreamingResponse
from api.auth import User
from models.listing import QueryParameters, ListingType, FurnishType
from notifications import send_notification
from repositories.listing_repository import ListingRepository
from database import engine
from fastapi.middleware.cors import CORSMiddleware
from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson
from services import listing_service, export_service, district_service, task_service
from services.listing_cache import (
get_cached_count,
get_cached_features,
cache_features_batch,
)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from api.metrics import metrics_app
from opentelemetry.metrics import get_meter
load_dotenv()
logger = logging.getLogger("uvicorn")
def get_query_parameters(
listing_type: ListingType,
min_bedrooms: int = 1,
max_bedrooms: int = 999,
min_price: int = 0,
max_price: int = 10_000_000,
min_sqm: Optional[int] = None,
last_seen_days: Optional[int] = None,
let_date_available_from: Optional[datetime] = None,
furnish_types: Optional[str] = None, # comma-separated list
) -> QueryParameters:
"""Parse query parameters into QueryParameters model."""
parsed_furnish_types = None
if furnish_types:
parsed_furnish_types = [FurnishType(f.strip()) for f in furnish_types.split(",")]
return QueryParameters(
listing_type=listing_type,
min_bedrooms=min_bedrooms,
max_bedrooms=max_bedrooms,
min_price=min_price,
max_price=max_price,
min_sqm=min_sqm,
last_seen_days=last_seen_days,
let_date_available_from=let_date_available_from,
furnish_types=parsed_furnish_types,
)
app = FastAPI()
app.mount("/metrics", metrics_app)
meter = get_meter(__name__)
request_counter = meter.create_counter(
name="custom_request_count",
description="Number of times /hello was called",
)
hist = meter.create_histogram(
name="custom_request_duration",
description="Duration of /hello requests in seconds",
)
# Allow CORS (for React frontend)
app.add_middleware(
CORSMiddleware,
allow_origins=[*DEV_TIER_ORIGINS, *PROD_TIER_ORIGINS],
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/status")
async def get_status() -> dict[str, str]:
request_counter.add(1, {"method": "GET", "path": "/status"})
hist.record(1.5, {"method": "GET", "path": "/status"})
return {"status": "OK"}
@app.get("/api/listing")
async def get_listing(
user: Annotated[User, Depends(get_current_user)],
limit: int = 5,
) -> dict[str, list]:
"""Get listings from the database."""
repository = ListingRepository(engine)
result = await listing_service.get_listings(repository, limit=limit)
logger.info(f"Fetched {result.total_count} listings for {user.email}")
return {"listings": result.listings}
@app.get("/api/listing_geojson")
async def get_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
limit: int | None = None,
) -> dict:
"""Get listings as GeoJSON for map display."""
repository = ListingRepository(engine)
result = await export_service.export_to_geojson(
repository,
query_parameters=query_parameters,
limit=limit,
)
return result.data
@app.get("/api/listing_geojson/stream")
async def stream_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
batch_size: int = 50,
limit: int | None = None,
) -> StreamingResponse:
"""Stream listings as NDJSON for progressive map loading.
Returns newline-delimited JSON with three message types:
- metadata: Initial message with batch_size and total_expected count
- batch: Array of GeoJSON features
- complete: Final message with total count
"""
async def generate():
# Check cache first
cached_count = get_cached_count(query_parameters)
if cached_count is not None and cached_count > 0:
# Cache HIT
effective_total = min(limit, cached_count) if limit else cached_count
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
"cached": True,
}) + "\n"
count = 0
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
if limit and count + len(feature_batch) > limit:
feature_batch = feature_batch[:limit - count]
count += len(feature_batch)
yield json.dumps({"type": "batch", "features": feature_batch}) + "\n"
if limit and count >= limit:
break
yield json.dumps({"type": "complete", "total": count}) + "\n"
else:
# Cache MISS - query DB and populate cache
repository = ListingRepository(engine)
# Phase 1: Fast count for progress estimation
total = repository.count_listings(query_parameters)
effective_total = min(limit, total) if limit else total
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
"cached": False,
}) + "\n"
# Phase 2: Stream with column projection and keyset pagination
count = 0
batch = []
for row in repository.stream_listings_optimized(
query_parameters, limit=limit, page_size=batch_size
):
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
batch.append(feature)
count += 1
if len(batch) >= batch_size:
cache_features_batch(query_parameters, batch)
yield json.dumps({"type": "batch", "features": batch}) + "\n"
batch = []
# Send remaining
if batch:
cache_features_batch(query_parameters, batch)
yield json.dumps({"type": "batch", "features": batch}) + "\n"
# Final message
yield json.dumps({"type": "complete", "total": count}) + "\n"
return StreamingResponse(
generate(),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@app.post("/api/refresh_listings")
async def refresh_listings(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
) -> dict[str, str]:
"""Trigger a background task to refresh listings."""
await send_notification(
f"{user.email} refreshing listings with query parameters {query_parameters.model_dump_json()}"
)
repository = ListingRepository(engine)
result = await listing_service.refresh_listings(
repository,
query_parameters,
async_mode=True,
user_email=user.email,
)
# Track task for user
if result.task_id:
task_service.add_task_for_user(user.email, result.task_id)
return {"task_id": result.task_id or "", "message": result.message}
@app.get("/api/task_status")
async def get_task_status(
user: Annotated[User, Depends(get_current_user)],
task_id: str,
) -> dict[str, str | int | float | None]:
"""Get the status of a background task."""
status = task_service.get_task_status(task_id)
return {
"task_id": status.task_id,
"status": status.status,
"result": json.dumps(status.result) if status.result else None,
"progress": status.progress,
"processed": status.processed,
"total": status.total,
"message": status.message,
"error": status.error,
"traceback": status.traceback,
}
@app.get("/api/tasks_for_user")
async def get_tasks_for_user(
user: Annotated[User, Depends(get_current_user)],
) -> list[str]:
"""Get all task IDs for the current user."""
return task_service.get_user_tasks(user.email)
@app.post("/api/cancel_task")
async def cancel_task(
user: Annotated[User, Depends(get_current_user)],
task_id: str = Query(..., description="The task ID to cancel"),
) -> dict[str, str | bool]:
"""Cancel a running task and remove it from the user's task list."""
# Verify user owns this task
user_tasks = task_service.get_user_tasks(user.email)
if task_id not in user_tasks:
return {"success": False, "message": "Task not found or not owned by user"}
try:
task_service.cancel_task(task_id, user_email=user.email)
logger.info(f"Task {task_id} cancelled by {user.email}")
return {"success": True, "message": "Task cancelled"}
except Exception as e:
logger.error(f"Failed to cancel task {task_id}: {e}")
return {"success": False, "message": str(e)}
@app.post("/api/clear_all_tasks")
async def clear_all_tasks(
user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str | int | bool]:
"""Clear all tasks for the current user."""
try:
count = task_service.clear_all_tasks(user.email)
logger.info(f"Cleared {count} tasks for {user.email}")
return {"success": True, "count": count, "message": f"Cleared {count} tasks"}
except Exception as e:
logger.error(f"Failed to clear tasks for {user.email}: {e}")
return {"success": False, "count": 0, "message": str(e)}
@app.get("/api/get_districts")
async def get_districts(
user: Annotated[User, Depends(get_current_user)],
) -> dict[str, str]:
"""Get all available districts."""
return district_service.get_all_districts()
FastAPIInstrumentor.instrument_app(app)