Coordinated fix across 31 bugs found in a parallel QA pass. Findings docs at /tmp/wrongmove-bugs/qa-round-3/qa{1,2,3,4}-*.md.
## Backend / scrape (Fix-1) — 8 bugs
- B1 [P0] Scrape totally broken on prod: pod UID 100 vs NFS dir 1000:1000 mode 775 → PermissionError on every never-seen listing. Switched Dockerfile to explicit `useradd --uid 1000 --gid 1000`; added securityContext + chown initContainer to k8s/{api,celery-beat}-deployment.yaml. Celery worker manifest lives outside this repo — Dockerfile UID change is the load-bearing fix.
- B4 [P1] Celery broker reaped every ~30s by Redis HAProxy idle timeout. Added `broker_transport_options` / `result_backend_transport_options` with `socket_keepalive=True, health_check_interval=25` in celery_app.py + same kwargs on every redis.from_url/Redis call across services/, utils/redis_lock.py, redis_repository.py.
- B5 [P1] dump_listings_task never published terminal FAILURE to the task_progress pub/sub channel — UI polled forever. Wrap body in try/except that publishes FAILURE before re-raising.
- B6 [P1] _process_worker had no per-listing exception handler — one bad listing killed the whole scrape via asyncio.gather. Wrap loop body in try/except Exception (re-raises CancelledError).
- B20 [P2] dump_listings_task gained time_limit=3600, soft_time_limit=3500, acks_late=True.
- B21 [P2] RedisRepository moved off shared db0 (was alongside paperless-ngx) to db3 via REDIS_USER_DB env var; keys prefixed `wrongmove:user:`.
- B32 [P3] redis_lock now uses uuid4() owner token + Lua compare-and-delete.
- B33 [P3] Slack notify in refresh_listings → asyncio.create_task (fire-and-forget).
## Frontend filter system (Fix-2) — 7 bugs
- B2 [P0] BUY tab click triggered "Maximum update depth exceeded" → ErrorBoundary. Replaced the three mutually-triggering useEffects in FilterBar with a single one-way controlled-value flow (URL → parent state → form), guarded by previousListingTypeRef so price-defaults fires once per real transition.
- B3 [P0] Filter values never reached the URL. Wired useFilterParams.setFilterValues into FilterBar/FilterPanel onSubmit + handleRemoveChip + new handleResetAllFilters; fed parsed filterValues into both forms' defaultValues; added URL→form sync via form.reset on browser back/forward.
- B8 [P1] Chip removal now resets form state via new FilterBar onFormReady callback — More badge no longer sticks.
- B12 [P2] Desktop swipe-review FAB added next to header (mobile FAB unchanged).
- B17 [P2] "Reset all" affordance on chip strip.
- B22 [P2] formatPrice precision: 1500 → £1.5k, 2500 → £2.5k (no longer collides with £2k/£3k defaults).
- B30 [P3] last_seen_days input gained min={0}.
## Frontend render hygiene + data integrity (Fix-3) — 8 bugs
- B7 [P1] streamingService bails on first non-NDJSON chunk (HTML response = backend down) and throws StreamParseError so the existing AlertError dialog surfaces a single user-visible error instead of 18× console.error spam.
- B9 [P1] formatDuration widened to (null|undefined|number): returns "—" for non-finite or negative, caps implausibly large values.
- B10 [P1] PropertyCard / PropertyCardCompact / SwipeCard JSX leaves render "—" for null total_price/qm/qmprice (was "£0/0 m²/£0/m²" — looked like free listings).
- B13 [P2] hexgrid worker reduceAverage uses Number.isFinite filter instead of !isNaN (which incorrectly accepted null → 0, biasing per-hex averages low).
- B14 [P2] ListingDetail Overview wraps agency in "Listed by" labelled block so it can't collapse to a bare agency name.
- B15 [P2] Compact POIDistanceBadges iterates all three travel modes with "—" for missing, matching the detail-sheet Travel table.
- B24 [P3] Drawer.Description (sr-only) added to ListingDetailSheet + MobileBottomSheet to silence Radix a11y warning.
- B25 [P3] lastSeenDays clamped to ≥0 so future timestamps don't render as "-7d ago".
## Frontend map / carousel / tasks polish (Fix-4) — 8 bugs
- B11 [P2] HexgridHeatmapClient destroy race: Map.tsx adds .catch() + ref guard so post-destroy promise rejections are silent no-ops. Verified by browser smoke (24 rapid Map↔List toggles → 0 pageErrors).
- B16 [P2] PhotoCarousel + inner CardCarousel gained keyboard nav (Arrow keys).
- B18 [P2] Default map center moved from Czech Republic to London (zoom 10).
- B19+B29 [P2/P3] Mapbox token: no longer hard-coded fallback; reads env-only and shows a clear "Map unavailable — set VITE_MAPBOX_TOKEN" banner when missing.
- B23 [P3] PhotoCarousel suppresses "1/1" counter for single-photo listings; added onError fallback for broken URLs.
- B26 [P3] PhotoCarousel only enables loop when photos.length > 1.
- B27 [P3] TaskIndicator cancel/clear-all buttons gained aria-label + data-testid.
- B28 [P3] useTaskProgress strips terminal-local task IDs from the polling union — no more forever-poll on completed tasks.
## Tests
74 new vitest tests + 18 new pytest tests. Local: tsc clean, 201 vitest tests pass, 633 pytest tests pass.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
import sys
|
|
import time
|
|
from celery import Celery
|
|
from celery.signals import worker_ready, task_prerun, task_postrun
|
|
from dotenv import load_dotenv
|
|
import os
|
|
|
|
from logging_config import configure_logging
|
|
|
|
load_dotenv()
|
|
|
|
configure_logging(os.getenv("SERVICE_NAME", "celery-worker"))
|
|
|
|
app = Celery(
|
|
"celery_app",
|
|
broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
|
|
backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
|
|
include=["tasks.listing_tasks", "tasks.poi_tasks"],
|
|
)
|
|
|
|
# Keep broker / result-backend connections alive when sitting behind an
|
|
# HAProxy / load balancer that idles TCP connections (the in-cluster Redis
|
|
# HAProxy reaps idle conns after 30s). Without these options the worker
|
|
# logs a "Connection closed by server" every ~30s and progress publishes
|
|
# silently drop on the closed socket.
|
|
app.conf.update(
|
|
task_serializer="json",
|
|
result_serializer="json",
|
|
accept_content=["json"],
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
broker_transport_options={
|
|
"socket_keepalive": True,
|
|
"health_check_interval": 25,
|
|
},
|
|
result_backend_transport_options={
|
|
"socket_keepalive": True,
|
|
"health_check_interval": 25,
|
|
},
|
|
broker_heartbeat=10,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Celery metrics via prometheus_client
|
|
# ---------------------------------------------------------------------------
|
|
CELERY_METRICS_PORT = int(os.getenv("CELERY_METRICS_PORT", "9090"))
|
|
|
|
# Track task start times for duration measurement
|
|
_task_start_times: dict[str, float] = {}
|
|
|
|
# Initialise OTel metrics at module level so prefork children inherit the
|
|
# MeterProvider and PrometheusMetricReader. The prometheus_client collectors
|
|
# are registered in the default registry before fork, so child-process
|
|
# recordings are visible to the HTTP server started in the main process.
|
|
from api.metrics import init_metrics as _init_metrics # noqa: E402
|
|
_init_metrics(os.getenv("SERVICE_NAME", "celery-worker"))
|
|
|
|
|
|
@worker_ready.connect
|
|
def _start_metrics_server(**kwargs: object) -> None:
|
|
"""Start a lightweight Prometheus HTTP server in the main worker process."""
|
|
from prometheus_client import start_http_server
|
|
start_http_server(CELERY_METRICS_PORT)
|
|
|
|
|
|
@task_prerun.connect
|
|
def _on_task_prerun(task_id: str, task: object, **kwargs: object) -> None:
|
|
import api.metrics as m
|
|
task_name = getattr(task, "name", "unknown")
|
|
m.celery_tasks_active.add(1, {"task_name": task_name})
|
|
_task_start_times[task_id] = time.monotonic()
|
|
|
|
|
|
@task_postrun.connect
|
|
def _on_task_postrun(
|
|
task_id: str, task: object, state: str | None = None, **kwargs: object
|
|
) -> None:
|
|
import api.metrics as m
|
|
task_name = getattr(task, "name", "unknown")
|
|
status = state or "UNKNOWN"
|
|
|
|
m.celery_tasks_active.add(-1, {"task_name": task_name})
|
|
m.celery_tasks_total.add(1, {"task_name": task_name, "status": status})
|
|
|
|
start = _task_start_times.pop(task_id, None)
|
|
if start is not None:
|
|
m.celery_task_duration_seconds.record(
|
|
time.monotonic() - start, {"task_name": task_name}
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
with app.connection() as conn:
|
|
conn.ensure_connection(max_retries=0)
|
|
print("Broker connection OK")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"Broker connection failed: {e}")
|
|
sys.exit(1)
|