wrongmove/services/listing_cache.py
Viktor Barzin a42944a756 wrongmove: round-3 fix sweep — scrape pipeline, BUY tab, filter URL state, render hygiene, map polish
Coordinated fix across 31 bugs found in a parallel QA pass. Findings docs at /tmp/wrongmove-bugs/qa-round-3/qa{1,2,3,4}-*.md.

## Backend / scrape (Fix-1) — 8 bugs

- B1 [P0] Scrape totally broken on prod: pod UID 100 vs NFS dir 1000:1000 mode 775 → PermissionError on every never-seen listing. Switched Dockerfile to explicit `useradd --uid 1000 --gid 1000`; added securityContext + chown initContainer to k8s/{api,celery-beat}-deployment.yaml. Celery worker manifest lives outside this repo — Dockerfile UID change is the load-bearing fix.
- B4 [P1] Celery broker reaped every ~30s by Redis HAProxy idle timeout. Added `broker_transport_options` / `result_backend_transport_options` with `socket_keepalive=True, health_check_interval=25` in celery_app.py + same kwargs on every redis.from_url/Redis call across services/, utils/redis_lock.py, redis_repository.py.
- B5 [P1] dump_listings_task never published terminal FAILURE to the task_progress pub/sub channel — UI polled forever. Wrap body in try/except that publishes FAILURE before re-raising.
- B6 [P1] _process_worker had no per-listing exception handler — one bad listing killed the whole scrape via asyncio.gather. Wrap loop body in try/except Exception (re-raises CancelledError).
- B20 [P2] dump_listings_task gained time_limit=3600, soft_time_limit=3500, acks_late=True.
- B21 [P2] RedisRepository moved off shared db0 (was alongside paperless-ngx) to db3 via REDIS_USER_DB env var; keys prefixed `wrongmove:user:`.
- B32 [P3] redis_lock now uses uuid4() owner token + Lua compare-and-delete.
- B33 [P3] Slack notify in refresh_listings → asyncio.create_task (fire-and-forget).

## Frontend filter system (Fix-2) — 7 bugs

- B2 [P0] BUY tab click triggered "Maximum update depth exceeded" → ErrorBoundary. Replaced the three mutually-triggering useEffects in FilterBar with a single one-way controlled-value flow (URL → parent state → form), guarded by previousListingTypeRef so price-defaults fires once per real transition.
- B3 [P0] Filter values never reached the URL. Wired useFilterParams.setFilterValues into FilterBar/FilterPanel onSubmit + handleRemoveChip + new handleResetAllFilters; fed parsed filterValues into both forms' defaultValues; added URL→form sync via form.reset on browser back/forward.
- B8 [P1] Chip removal now resets form state via new FilterBar onFormReady callback — More badge no longer sticks.
- B12 [P2] Desktop swipe-review FAB added next to header (mobile FAB unchanged).
- B17 [P2] "Reset all" affordance on chip strip.
- B22 [P2] formatPrice precision: 1500 → £1.5k, 2500 → £2.5k (no longer collides with £2k/£3k defaults).
- B30 [P3] last_seen_days input gained min={0}.

## Frontend render hygiene + data integrity (Fix-3) — 8 bugs

- B7 [P1] streamingService bails on first non-NDJSON chunk (HTML response = backend down) and throws StreamParseError so the existing AlertError dialog surfaces a single user-visible error instead of 18× console.error spam.
- B9 [P1] formatDuration widened to (null|undefined|number): returns "—" for non-finite or negative, caps implausibly large values.
- B10 [P1] PropertyCard / PropertyCardCompact / SwipeCard JSX leaves render "—" for null total_price/qm/qmprice (was "£0/0 m²/£0/m²" — looked like free listings).
- B13 [P2] hexgrid worker reduceAverage uses Number.isFinite filter instead of !isNaN (which incorrectly accepted null → 0, biasing per-hex averages low).
- B14 [P2] ListingDetail Overview wraps agency in "Listed by" labelled block so it can't collapse to a bare agency name.
- B15 [P2] Compact POIDistanceBadges iterates all three travel modes with "—" for missing, matching the detail-sheet Travel table.
- B24 [P3] Drawer.Description (sr-only) added to ListingDetailSheet + MobileBottomSheet to silence Radix a11y warning.
- B25 [P3] lastSeenDays clamped to ≥0 so future timestamps don't render as "-7d ago".

## Frontend map / carousel / tasks polish (Fix-4) — 8 bugs

- B11 [P2] HexgridHeatmapClient destroy race: Map.tsx adds .catch() + ref guard so post-destroy promise rejections are silent no-ops. Verified by browser smoke (24 rapid Map↔List toggles → 0 pageErrors).
- B16 [P2] PhotoCarousel + inner CardCarousel gained keyboard nav (Arrow keys).
- B18 [P2] Default map center moved from Czech Republic to London (zoom 10).
- B19+B29 [P2/P3] Mapbox token: no longer hard-coded fallback; reads env-only and shows a clear "Map unavailable — set VITE_MAPBOX_TOKEN" banner when missing.
- B23 [P3] PhotoCarousel suppresses "1/1" counter for single-photo listings; added onError fallback for broken URLs.
- B26 [P3] PhotoCarousel only enables loop when photos.length > 1.
- B27 [P3] TaskIndicator cancel/clear-all buttons gained aria-label + data-testid.
- B28 [P3] useTaskProgress strips terminal-local task IDs from the polling union — no more forever-poll on completed tasks.

## Tests

74 new vitest tests + 18 new pytest tests. Local: tsc clean, 201 vitest tests pass, 633 pytest tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 22:27:29 +00:00

225 lines
7.9 KiB
Python

"""Redis-based caching for listing GeoJSON query results."""
import hashlib
import json
import logging
import os
import time
import uuid
from typing import Generator
from urllib.parse import urlparse, urlunparse
import redis
import api.metrics as app_metrics
from models.listing import QueryParameters
logger = logging.getLogger(__name__)
def _record_cache_op(operation: str, duration: float) -> None:
"""Record a cache operation timing metric, no-op if metrics aren't initialized."""
if app_metrics._meter is None:
return
app_metrics.cache_operation_duration_seconds.record(duration, {"operation": operation})
CACHE_PREFIX = "listings:geojson:"
STAGING_PREFIX = "listings:geojson:staging:"
CACHE_TTL_SECONDS = 24 * 60 * 60 # 24 hours
STALE_AFTER_SECONDS = 4 * 60 * 60 # 4 hours — serve stale, revalidate in background
REPOPULATING_PREFIX = "listings:geojson:repopulating:"
STAGING_TTL_SECONDS = 5 * 60 # 5 minutes safety net for orphaned staging keys
CACHE_DB = 2
def _get_redis_client() -> redis.Redis:
"""Get Redis client using Celery broker URL but overriding to db=2."""
broker_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
parsed = urlparse(broker_url)
cache_url = urlunparse(parsed._replace(path=f"/{CACHE_DB}"))
# socket_keepalive + health_check_interval keep the connection alive
# across the Redis HAProxy 30s idle timeout (see celery_app.py).
return redis.from_url(
cache_url,
decode_responses=True,
socket_keepalive=True,
health_check_interval=25,
)
def make_cache_key(query_params: QueryParameters) -> str:
"""Generate a cache key from query parameters."""
params_json = query_params.model_dump_json()
hash_suffix = hashlib.sha256(params_json.encode()).hexdigest()[:16]
return f"{CACHE_PREFIX}{hash_suffix}"
def get_cached_count(query_params: QueryParameters) -> int | None:
"""Return the number of cached features for a query, or None if not cached."""
try:
t0 = time.monotonic()
client = _get_redis_client()
key = make_cache_key(query_params)
if not client.exists(key):
_record_cache_op("check", time.monotonic() - t0)
return None
count = client.llen(key)
_record_cache_op("check", time.monotonic() - t0)
return count
except redis.RedisError as e:
logger.warning(f"Redis cache read error: {e}")
return None
def get_cached_features(
query_params: QueryParameters, batch_size: int = 50
) -> Generator[list[dict], None, None]:
"""Yield batches of cached GeoJSON features."""
try:
client = _get_redis_client()
key = make_cache_key(query_params)
total = client.llen(key)
for start in range(0, total, batch_size):
end = start + batch_size - 1
t0 = time.monotonic()
items = client.lrange(key, start, end)
_record_cache_op("read_batch", time.monotonic() - t0)
batch = [json.loads(item) for item in items]
if batch:
yield batch
except redis.RedisError as e:
logger.warning(f"Redis cache read error during streaming: {e}")
def cache_features_batch(query_params: QueryParameters, features: list[dict]) -> None:
"""Append a batch of features to the cache list."""
if not features:
return
try:
client = _get_redis_client()
key = make_cache_key(query_params)
pipeline = client.pipeline()
for feature in features:
pipeline.rpush(key, json.dumps(feature))
# Set/refresh TTL
pipeline.expire(key, CACHE_TTL_SECONDS)
pipeline.execute()
except redis.RedisError as e:
logger.warning(f"Redis cache write error: {e}")
def begin_cache_population(query_params: QueryParameters) -> str:
"""Begin staged cache population. Returns a unique staging key.
The staging key gets its TTL set by cache_features_batch_staged on the
first rpush, so no pre-creation is needed here.
"""
return f"{STAGING_PREFIX}{uuid.uuid4().hex}"
def cache_features_batch_staged(staging_key: str, features: list[dict]) -> None:
"""Append a batch of features to a staging key."""
if not features:
return
try:
t0 = time.monotonic()
client = _get_redis_client()
pipeline = client.pipeline()
for feature in features:
pipeline.rpush(staging_key, json.dumps(feature))
pipeline.expire(staging_key, STAGING_TTL_SECONDS)
pipeline.execute()
_record_cache_op("write_batch", time.monotonic() - t0)
except redis.RedisError as e:
logger.warning(f"Redis staged cache write error: {e}")
def finalize_cache_population(staging_key: str, query_params: QueryParameters) -> None:
"""Atomically rename the staging key to the live cache key and set TTL."""
try:
t0 = time.monotonic()
client = _get_redis_client()
live_key = make_cache_key(query_params)
# RENAME is atomic — replaces the live key in one operation
client.rename(staging_key, live_key)
client.expire(live_key, CACHE_TTL_SECONDS)
_record_cache_op("finalize", time.monotonic() - t0)
logger.debug(f"Finalized cache population for {live_key}")
except redis.RedisError as e:
logger.warning(f"Redis cache finalize error: {e}")
def delete_staging_key(staging_key: str) -> None:
"""Delete an orphaned staging key (used in error cleanup)."""
try:
client = _get_redis_client()
client.delete(staging_key)
except redis.RedisError as e:
logger.warning(f"Redis staging key cleanup error: {e}")
def invalidate_cache() -> None:
"""Delete all listing GeoJSON cache entries, including staging keys."""
try:
client = _get_redis_client()
cursor = 0
deleted = 0
# Clean both live cache keys and staging keys
for pattern in [f"{CACHE_PREFIX}*", f"{STAGING_PREFIX}*"]:
cursor = 0
while True:
cursor, keys = client.scan(cursor, match=pattern, count=100)
if keys:
pipeline = client.pipeline()
for key in keys:
pipeline.delete(key)
pipeline.execute()
deleted += len(keys)
if cursor == 0:
break
if deleted:
logger.info(f"Invalidated {deleted} listing cache entries")
except redis.RedisError as e:
logger.warning(f"Redis cache invalidation error: {e}")
def get_cache_age(query_params: QueryParameters) -> int | None:
"""Return the age in seconds of a cache entry, or None if not cached."""
try:
client = _get_redis_client()
key = make_cache_key(query_params)
ttl = client.ttl(key)
if ttl < 0:
# -2 = key doesn't exist, -1 = no expiry
return None
return CACHE_TTL_SECONDS - ttl
except redis.RedisError as e:
logger.warning(f"Redis cache age check error: {e}")
return None
def is_cache_stale(query_params: QueryParameters) -> bool:
"""Return True if the cache entry exists but is older than STALE_AFTER_SECONDS."""
age = get_cache_age(query_params)
if age is None:
return False
return age > STALE_AFTER_SECONDS
def acquire_repopulation_lock(query_params: QueryParameters) -> bool:
"""Try to acquire a lock to prevent concurrent repopulations.
Returns True if the lock was acquired, False if another repopulation
is already in progress for the same query.
"""
try:
client = _get_redis_client()
key = make_cache_key(query_params)
hash_suffix = key.removeprefix(CACHE_PREFIX)
lock_key = f"{REPOPULATING_PREFIX}{hash_suffix}"
# SETNX with 60-second TTL
acquired: bool = bool(client.set(lock_key, "1", nx=True, ex=60))
return acquired
except redis.RedisError as e:
logger.warning(f"Redis repopulation lock error: {e}")
return False