Remove 1000-result limit, add Redis caching and virtual scrolling

- Remove hard-coded limit=1000 default from listing_geojson and streaming
  endpoints, allowing all matching results to be returned
- Add Redis caching service (db=2, 30min TTL) that caches query results
  as Redis Lists for fast re-queries with reduced DB load
- Integrate cache into streaming endpoint: serve from cache on hit,
  populate cache on miss during DB streaming
- Invalidate cache after scrape completes (both success and no-new-listings)
- Replace ScrollArea with react-virtuoso in ListView for virtual scrolling,
  keeping only ~20-30 DOM nodes regardless of list size
- Handle metadata streaming message to show "0 / N" progress from start
- Throttle frontend state updates with requestAnimationFrame to prevent
  UI jank from rapid re-renders during cached response streaming
This commit is contained in:
Viktor Barzin 2026-02-06 20:34:50 +00:00 committed by Viktor Barzin
parent c4b11ccfe9
commit 5514fa6381
8 changed files with 695 additions and 78 deletions

View file

@ -18,6 +18,11 @@ from fastapi.middleware.cors import CORSMiddleware
from ui_exporter import convert_to_geojson_feature, convert_row_to_geojson
from services import listing_service, export_service, district_service, task_service
from services.listing_cache import (
get_cached_count,
get_cached_features,
cache_features_batch,
)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from api.metrics import metrics_app
from opentelemetry.metrics import get_meter
@ -101,7 +106,7 @@ async def get_listing(
async def get_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
limit: int = 1000, # Default limit to prevent timeout
limit: int | None = None,
) -> dict:
"""Get listings as GeoJSON for map display."""
repository = ListingRepository(engine)
@ -118,7 +123,7 @@ async def stream_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
query_parameters: Annotated[QueryParameters, Depends(get_query_parameters)],
batch_size: int = 50,
limit: int = 1000,
limit: int | None = None,
) -> StreamingResponse:
"""Stream listings as NDJSON for progressive map loading.
@ -128,38 +133,67 @@ async def stream_listing_geojson(
- complete: Final message with total count
"""
async def generate():
repository = ListingRepository(engine)
# Check cache first
cached_count = get_cached_count(query_parameters)
# Phase 1: Fast count for progress estimation
total = repository.count_listings(query_parameters)
effective_total = min(limit, total) if limit else total
if cached_count is not None and cached_count > 0:
# Cache HIT
effective_total = min(limit, cached_count) if limit else cached_count
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
}) + "\n"
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
"cached": True,
}) + "\n"
# Phase 2: Stream with column projection and keyset pagination
count = 0
batch = []
for row in repository.stream_listings_optimized(
query_parameters, limit=limit, page_size=batch_size
):
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
batch.append(feature)
count += 1
count = 0
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
if limit and count + len(feature_batch) > limit:
feature_batch = feature_batch[:limit - count]
count += len(feature_batch)
yield json.dumps({"type": "batch", "features": feature_batch}) + "\n"
if limit and count >= limit:
break
if len(batch) >= batch_size:
yield json.dumps({"type": "complete", "total": count}) + "\n"
else:
# Cache MISS - query DB and populate cache
repository = ListingRepository(engine)
# Phase 1: Fast count for progress estimation
total = repository.count_listings(query_parameters)
effective_total = min(limit, total) if limit else total
yield json.dumps({
"type": "metadata",
"batch_size": batch_size,
"total_expected": effective_total,
"cached": False,
}) + "\n"
# Phase 2: Stream with column projection and keyset pagination
count = 0
batch = []
for row in repository.stream_listings_optimized(
query_parameters, limit=limit, page_size=batch_size
):
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
batch.append(feature)
count += 1
if len(batch) >= batch_size:
cache_features_batch(query_parameters, batch)
yield json.dumps({"type": "batch", "features": batch}) + "\n"
batch = []
# Send remaining
if batch:
cache_features_batch(query_parameters, batch)
yield json.dumps({"type": "batch", "features": batch}) + "\n"
batch = []
# Send remaining
if batch:
yield json.dumps({"type": "batch", "features": batch}) + "\n"
# Final message
yield json.dumps({"type": "complete", "total": count}) + "\n"
# Final message
yield json.dumps({"type": "complete", "total": count}) + "\n"
return StreamingResponse(
generate(),
@ -200,13 +234,19 @@ async def refresh_listings(
async def get_task_status(
user: Annotated[User, Depends(get_current_user)],
task_id: str,
) -> dict[str, str]:
) -> dict[str, str | int | float | None]:
"""Get the status of a background task."""
status = task_service.get_task_status(task_id)
return {
"task_id": status.task_id,
"status": status.status,
"result": json.dumps(status.result) if status.result else "",
"result": json.dumps(status.result) if status.result else None,
"progress": status.progress,
"processed": status.processed,
"total": status.total,
"message": status.message,
"error": status.error,
"traceback": status.traceback,
}