From 1ae00b7cbf56d4eacad157b0b847793bdf293590 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Mon, 23 Feb 2026 20:09:36 +0000 Subject: [PATCH] Add multi-layer caching: 24h Redis TTL, stale-while-revalidate, frontend LRU cache - Increase Redis cache TTL from 30 minutes to 24 hours - Add stale-while-revalidate: serve stale cache (>4h) immediately while repopulating in background with SETNX lock to prevent concurrent rebuilds - Add in-memory frontend LRU cache (5 entries) so repeat filter visits are instant without network requests - Invalidate frontend cache on listing refresh and task completion - Add unit tests for get_cache_age, is_cache_stale, acquire_repopulation_lock --- api/app.py | 34 +++++++ frontend/src/App.tsx | 17 ++++ frontend/src/services/listingCache.ts | 45 +++++++++ services/listing_cache.py | 46 ++++++++- tests/unit/test_listing_cache.py | 129 ++++++++++++++++++++++++++ 5 files changed, 270 insertions(+), 1 deletion(-) create mode 100644 frontend/src/services/listingCache.ts diff --git a/api/app.py b/api/app.py index c2fb880..f52d176 100644 --- a/api/app.py +++ b/api/app.py @@ -1,4 +1,5 @@ """FastAPI application for the Real Estate Crawler API.""" +import asyncio from datetime import datetime, timedelta import json import logging @@ -40,6 +41,8 @@ from services.listing_cache import ( cache_features_batch_staged, finalize_cache_population, delete_staging_key, + is_cache_stale, + acquire_repopulation_lock, ) from repositories.poi_repository import POIRepository from repositories.decision_repository import DecisionRepository @@ -278,6 +281,7 @@ async def _stream_from_cache( limit: int | None, user_email: str | None = None, decision_filter: str = "all", + stale: bool = False, ) -> AsyncGenerator[str, None]: """Stream GeoJSON features from the Redis cache (cache-hit path).""" cached_count = get_cached_count(query_parameters) @@ -288,6 +292,7 @@ async def _stream_from_cache( "batch_size": batch_size, "total_expected": effective_total, "cached": True, + "stale": stale, }) + "\n" # Resolve decision IDs (deferred to after metadata is sent) @@ -432,6 +437,30 @@ async def _stream_from_db( delete_staging_key(staging_key) +async def _repopulate_cache_background(query_parameters: QueryParameters) -> None: + """Repopulate the cache from DB in the background (fire-and-forget).""" + if not acquire_repopulation_lock(query_parameters): + logger.debug("Skipping background repopulation — already in progress") + return + try: + logger.info("Starting background cache repopulation for stale entry") + repository = ListingRepository(engine) + staging_key = begin_cache_population(query_parameters) + try: + for row in repository.stream_listings_optimized( + query_parameters, limit=None, page_size=DEFAULT_BATCH_SIZE + ): + feature = convert_row_to_geojson(row, query_parameters.listing_type.value) + cache_features_batch_staged(staging_key, [feature]) + finalize_cache_population(staging_key, query_parameters) + logger.info("Background cache repopulation completed") + except Exception: + delete_staging_key(staging_key) + raise + except Exception: + logger.exception("Background cache repopulation failed") + + @app.get("/api/listing_geojson/stream") async def stream_listing_geojson( user: Annotated[User, Depends(get_current_user)], @@ -470,10 +499,15 @@ async def stream_listing_geojson( if cached_count is not None and cached_count > 0 and not include_poi_distances: app_metrics.geojson_cache_operations.add(1, {"result": "hit"}) + stale = is_cache_stale(query_parameters) + if stale: + # Fire-and-forget background repopulation + asyncio.create_task(_repopulate_cache_background(query_parameters)) generator = _stream_from_cache( query_parameters, batch_size, limit, user_email=user.email, decision_filter=decision_filter, + stale=stale, ) else: app_metrics.geojson_cache_operations.add(1, {"result": "miss"}) diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 56aacb1..218dff9 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -18,6 +18,7 @@ import { Button } from './components/ui/button'; import { Filter, Heart } from 'lucide-react'; import type { GeoJSONFeatureCollection, PropertyProperties, PropertyFeature, POI, POITravelFilter } from '@/types'; import { refreshListings, streamListingGeoJSON, fetchUserPOIs, fetchBulkPOIDistances, type StreamingProgress } from '@/services'; +import { getCached, setCached, invalidateAll as invalidateListingCache } from '@/services/listingCache'; import { setOnUnauthorized } from '@/services/apiClient'; import { clearPasskeyUser } from './auth/passkeyService'; import { poiMetricPropertyName, injectPoiMetricProperty } from '@/utils/poiUtils'; @@ -131,6 +132,18 @@ function App() { const loadListings = useCallback(async (parameters: ParameterValues) => { if (!user) return; + // Check in-memory cache first + const cached = getCached(parameters); + if (cached) { + setQueryParameters(parameters); + setMobileFilterOpen(false); + accumulatedFeaturesRef.current = cached; + setListingData({ type: 'FeatureCollection', features: [...cached] }); + setIsLoading(false); + setStreamingProgress(null); + return; + } + // Abort any in-flight streaming request if (abortControllerRef.current) { abortControllerRef.current.abort(); @@ -183,6 +196,8 @@ function App() { } // Final flush to ensure all data is rendered flushUpdate(); + // Store successful result in frontend cache + setCached(parameters, accumulatedFeaturesRef.current); } catch (error) { // Silently ignore AbortError — it means we intentionally cancelled if (error instanceof DOMException && error.name === 'AbortError') { @@ -305,6 +320,7 @@ function App() { }, [user, loadListings]); const handleTaskCompleted = useCallback(() => { + invalidateListingCache(); if (queryParameters) { loadListings(queryParameters); } @@ -326,6 +342,7 @@ function App() { if (action === 'visualize') { loadListings(parameters); } else if (action === 'fetch-data') { + invalidateListingCache(); setQueryParameters(parameters); setMobileFilterOpen(false); setIsLoading(true); diff --git a/frontend/src/services/listingCache.ts b/frontend/src/services/listingCache.ts new file mode 100644 index 0000000..28fdae8 --- /dev/null +++ b/frontend/src/services/listingCache.ts @@ -0,0 +1,45 @@ +/** + * In-memory LRU cache for streaming listing results. + * + * Keyed by a deterministic hash of query parameters so that repeat visits + * to the same filter combination are instant (no network request). + */ +import type { PropertyFeature } from '@/types'; +import type { ParameterValues } from '@/components/FilterPanel'; + +interface CacheEntry { + features: PropertyFeature[]; + timestamp: number; +} + +const cache = new Map(); +const MAX_ENTRIES = 5; + +export function makeCacheKey(params: ParameterValues): string { + const sorted = Object.entries(params) + .filter(([, v]) => v !== undefined && v !== null && v !== '') + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}=${v instanceof Date ? v.toISOString() : v}`); + return sorted.join('&'); +} + +export function getCached(params: ParameterValues): PropertyFeature[] | null { + const key = makeCacheKey(params); + const entry = cache.get(key); + if (!entry) return null; + return entry.features; +} + +export function setCached(params: ParameterValues, features: PropertyFeature[]): void { + const key = makeCacheKey(params); + if (cache.size >= MAX_ENTRIES && !cache.has(key)) { + // Evict oldest entry (first inserted) + const oldest = cache.keys().next().value; + if (oldest) cache.delete(oldest); + } + cache.set(key, { features, timestamp: Date.now() }); +} + +export function invalidateAll(): void { + cache.clear(); +} diff --git a/services/listing_cache.py b/services/listing_cache.py index 0ef62ec..c3795a1 100644 --- a/services/listing_cache.py +++ b/services/listing_cache.py @@ -15,7 +15,9 @@ logger = logging.getLogger(__name__) CACHE_PREFIX = "listings:geojson:" STAGING_PREFIX = "listings:geojson:staging:" -CACHE_TTL_SECONDS = 30 * 60 # 30 minutes +CACHE_TTL_SECONDS = 24 * 60 * 60 # 24 hours +STALE_AFTER_SECONDS = 4 * 60 * 60 # 4 hours — serve stale, revalidate in background +REPOPULATING_PREFIX = "listings:geojson:repopulating:" STAGING_TTL_SECONDS = 5 * 60 # 5 minutes safety net for orphaned staging keys CACHE_DB = 2 @@ -153,3 +155,45 @@ def invalidate_cache() -> None: logger.info(f"Invalidated {deleted} listing cache entries") except redis.RedisError as e: logger.warning(f"Redis cache invalidation error: {e}") + + +def get_cache_age(query_params: QueryParameters) -> int | None: + """Return the age in seconds of a cache entry, or None if not cached.""" + try: + client = _get_redis_client() + key = make_cache_key(query_params) + ttl = client.ttl(key) + if ttl < 0: + # -2 = key doesn't exist, -1 = no expiry + return None + return CACHE_TTL_SECONDS - ttl + except redis.RedisError as e: + logger.warning(f"Redis cache age check error: {e}") + return None + + +def is_cache_stale(query_params: QueryParameters) -> bool: + """Return True if the cache entry exists but is older than STALE_AFTER_SECONDS.""" + age = get_cache_age(query_params) + if age is None: + return False + return age > STALE_AFTER_SECONDS + + +def acquire_repopulation_lock(query_params: QueryParameters) -> bool: + """Try to acquire a lock to prevent concurrent repopulations. + + Returns True if the lock was acquired, False if another repopulation + is already in progress for the same query. + """ + try: + client = _get_redis_client() + key = make_cache_key(query_params) + hash_suffix = key.removeprefix(CACHE_PREFIX) + lock_key = f"{REPOPULATING_PREFIX}{hash_suffix}" + # SETNX with 60-second TTL + acquired: bool = bool(client.set(lock_key, "1", nx=True, ex=60)) + return acquired + except redis.RedisError as e: + logger.warning(f"Redis repopulation lock error: {e}") + return False diff --git a/tests/unit/test_listing_cache.py b/tests/unit/test_listing_cache.py index 062c7c8..cfbd4e2 100644 --- a/tests/unit/test_listing_cache.py +++ b/tests/unit/test_listing_cache.py @@ -8,11 +8,16 @@ import redis from models.listing import ListingType, QueryParameters from services.listing_cache import ( CACHE_PREFIX, + CACHE_TTL_SECONDS, + STALE_AFTER_SECONDS, _get_redis_client, + acquire_repopulation_lock, cache_features_batch, + get_cache_age, get_cached_count, get_cached_features, invalidate_cache, + is_cache_stale, make_cache_key, ) @@ -227,3 +232,127 @@ class TestInvalidateCache: invalidate_cache() mock_client.pipeline.assert_not_called() + + +class TestCacheTTLConstants: + """Tests for cache TTL constants.""" + + def test_cache_ttl_is_24_hours(self): + """CACHE_TTL_SECONDS should be 24 hours.""" + assert CACHE_TTL_SECONDS == 24 * 60 * 60 + + def test_stale_after_is_4_hours(self): + """STALE_AFTER_SECONDS should be 4 hours.""" + assert STALE_AFTER_SECONDS == 4 * 60 * 60 + + def test_stale_after_less_than_ttl(self): + """Stale threshold must be less than the hard TTL.""" + assert STALE_AFTER_SECONDS < CACHE_TTL_SECONDS + + +class TestGetCacheAge: + """Tests for get_cache_age().""" + + @mock.patch("services.listing_cache._get_redis_client") + def test_returns_none_when_key_missing(self, mock_get_client): + """Returns None when key does not exist (ttl returns -2).""" + mock_client = mock.MagicMock() + mock_client.ttl.return_value = -2 + mock_get_client.return_value = mock_client + + result = get_cache_age(_make_query()) + assert result is None + + @mock.patch("services.listing_cache._get_redis_client") + def test_returns_none_when_no_expiry(self, mock_get_client): + """Returns None when key has no TTL set (ttl returns -1).""" + mock_client = mock.MagicMock() + mock_client.ttl.return_value = -1 + mock_get_client.return_value = mock_client + + result = get_cache_age(_make_query()) + assert result is None + + @mock.patch("services.listing_cache._get_redis_client") + def test_computes_age_from_ttl(self, mock_get_client): + """Age = CACHE_TTL_SECONDS - remaining TTL.""" + mock_client = mock.MagicMock() + remaining = CACHE_TTL_SECONDS - 3600 # 1 hour old + mock_client.ttl.return_value = remaining + mock_get_client.return_value = mock_client + + result = get_cache_age(_make_query()) + assert result == 3600 + + @mock.patch("services.listing_cache._get_redis_client") + def test_returns_none_on_redis_error(self, mock_get_client): + """Returns None when Redis raises an error.""" + mock_get_client.side_effect = redis.RedisError("connection refused") + + result = get_cache_age(_make_query()) + assert result is None + + +class TestIsCacheStale: + """Tests for is_cache_stale().""" + + @mock.patch("services.listing_cache.get_cache_age") + def test_not_stale_when_young(self, mock_age): + """Returns False when cache is younger than STALE_AFTER_SECONDS.""" + mock_age.return_value = 100 # 100 seconds old + assert is_cache_stale(_make_query()) is False + + @mock.patch("services.listing_cache.get_cache_age") + def test_stale_when_old(self, mock_age): + """Returns True when cache is older than STALE_AFTER_SECONDS.""" + mock_age.return_value = STALE_AFTER_SECONDS + 1 + assert is_cache_stale(_make_query()) is True + + @mock.patch("services.listing_cache.get_cache_age") + def test_not_stale_when_missing(self, mock_age): + """Returns False when cache does not exist.""" + mock_age.return_value = None + assert is_cache_stale(_make_query()) is False + + @mock.patch("services.listing_cache.get_cache_age") + def test_not_stale_at_exact_threshold(self, mock_age): + """Returns False when cache age equals STALE_AFTER_SECONDS exactly.""" + mock_age.return_value = STALE_AFTER_SECONDS + assert is_cache_stale(_make_query()) is False + + +class TestAcquireRepopulationLock: + """Tests for acquire_repopulation_lock().""" + + @mock.patch("services.listing_cache._get_redis_client") + def test_acquires_lock_successfully(self, mock_get_client): + """Returns True when lock is acquired (SETNX succeeds).""" + mock_client = mock.MagicMock() + mock_client.set.return_value = True + mock_get_client.return_value = mock_client + + result = acquire_repopulation_lock(_make_query()) + assert result is True + mock_client.set.assert_called_once() + # Verify nx=True and ex=60 were passed + call_kwargs = mock_client.set.call_args[1] + assert call_kwargs["nx"] is True + assert call_kwargs["ex"] == 60 + + @mock.patch("services.listing_cache._get_redis_client") + def test_returns_false_when_locked(self, mock_get_client): + """Returns False when lock already held (SETNX returns None).""" + mock_client = mock.MagicMock() + mock_client.set.return_value = None + mock_get_client.return_value = mock_client + + result = acquire_repopulation_lock(_make_query()) + assert result is False + + @mock.patch("services.listing_cache._get_redis_client") + def test_returns_false_on_redis_error(self, mock_get_client): + """Returns False when Redis raises an error.""" + mock_get_client.side_effect = redis.RedisError("connection refused") + + result = acquire_repopulation_lock(_make_query()) + assert result is False