Add navigation & usage metrics for end-user experience visibility

Instrument DB query timing (11 operations across 3 repositories),
streaming lifecycle (TTFB, duration, feature count), cache operation
latency, listing detail step breakdown, and frontend page load /
time-to-first-listing / stream download / detail load metrics.

Adds 16 new OTel instruments, extends the perf ingestion endpoint
with 4 new frontend metrics, and adds ~20 Grafana dashboard panels
across 4 new rows (DB Query Performance, Streaming Performance,
Listing Detail Breakdown, Cache Performance, Frontend Navigation).
This commit is contained in:
Viktor Barzin 2026-02-23 20:28:42 +00:00
parent 1ae00b7cbf
commit 35f1987ac1
No known key found for this signature in database
GPG key ID: 0EB088298288D958
11 changed files with 1236 additions and 26 deletions

View file

@ -440,8 +440,10 @@ async def _stream_from_db(
async def _repopulate_cache_background(query_parameters: QueryParameters) -> None:
"""Repopulate the cache from DB in the background (fire-and-forget)."""
if not acquire_repopulation_lock(query_parameters):
app_metrics.cache_repopulation_total.add(1, {"result": "skipped"})
logger.debug("Skipping background repopulation — already in progress")
return
app_metrics.cache_repopulation_total.add(1, {"result": "started"})
try:
logger.info("Starting background cache repopulation for stale entry")
repository = ListingRepository(engine)
@ -453,14 +455,46 @@ async def _repopulate_cache_background(query_parameters: QueryParameters) -> Non
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
cache_features_batch_staged(staging_key, [feature])
finalize_cache_population(staging_key, query_parameters)
app_metrics.cache_repopulation_total.add(1, {"result": "completed"})
logger.info("Background cache repopulation completed")
except Exception:
delete_staging_key(staging_key)
raise
except Exception:
app_metrics.cache_repopulation_total.add(1, {"result": "failed"})
logger.exception("Background cache repopulation failed")
async def _instrumented_stream(
inner: AsyncGenerator[str, None],
source: str,
) -> AsyncGenerator[str, None]:
"""Wrap a streaming generator to record TTFB, total duration, and feature count."""
t0 = time.monotonic()
first_yielded = False
feature_count = 0
try:
async for chunk in inner:
if not first_yielded:
app_metrics.stream_time_to_first_byte_seconds.record(
time.monotonic() - t0, {"source": source}
)
first_yielded = True
# Count features from batch messages
try:
msg = json.loads(chunk)
if msg.get("type") == "batch" and "features" in msg:
feature_count += len(msg["features"])
except (json.JSONDecodeError, TypeError):
pass
yield chunk
finally:
duration = time.monotonic() - t0
app_metrics.stream_total_duration_seconds.record(duration, {"source": source})
app_metrics.stream_features_total.add(feature_count, {"source": source})
app_metrics.stream_requests_total.add(1, {"source": source})
@app.get("/api/listing_geojson/stream")
async def stream_listing_geojson(
user: Annotated[User, Depends(get_current_user)],
@ -501,21 +535,28 @@ async def stream_listing_geojson(
app_metrics.geojson_cache_operations.add(1, {"result": "hit"})
stale = is_cache_stale(query_parameters)
if stale:
app_metrics.cache_stale_serves_total.add(1)
# Fire-and-forget background repopulation
asyncio.create_task(_repopulate_cache_background(query_parameters))
generator = _stream_from_cache(
query_parameters, batch_size, limit,
user_email=user.email,
decision_filter=decision_filter,
stale=stale,
generator = _instrumented_stream(
_stream_from_cache(
query_parameters, batch_size, limit,
user_email=user.email,
decision_filter=decision_filter,
stale=stale,
),
source="cache",
)
else:
app_metrics.geojson_cache_operations.add(1, {"result": "miss"})
generator = _stream_from_db(
query_parameters, batch_size, limit, poi_distances_lookup,
skip_cache=include_poi_distances,
user_email=user.email,
decision_filter=decision_filter,
generator = _instrumented_stream(
_stream_from_db(
query_parameters, batch_size, limit, poi_distances_lookup,
skip_cache=include_poi_distances,
user_email=user.email,
decision_filter=decision_filter,
),
source="db",
)
return StreamingResponse(
@ -660,9 +701,13 @@ async def get_listing_detail(
"""Get detailed information for a single listing."""
repository = ListingRepository(engine)
lt = ListingType(listing_type)
t_step = time.monotonic()
listings = await repository.get_listings(
only_ids=[listing_id], listing_type=lt
)
app_metrics.listing_detail_step_duration_seconds.record(
time.monotonic() - t_step, {"step": "fetch_listing"}
)
if not listings:
raise HTTPException(status_code=404, detail="Listing not found")
@ -737,6 +782,7 @@ async def get_listing_detail(
furnish_type_val = str(ft)
# Load user's decision for this listing
t_step = time.monotonic()
decision_val: str | None = None
user_id = _get_user_id_safe(user.email)
if user_id is not None:
@ -746,8 +792,12 @@ async def get_listing_detail(
if d.listing_id == listing_id and d.listing_type == listing_type:
decision_val = d.decision
break
app_metrics.listing_detail_step_duration_seconds.record(
time.monotonic() - t_step, {"step": "load_decision"}
)
# Load POI distances
t_step = time.monotonic()
poi_distances_list: list[dict] = []
if user_id is not None:
poi_repo = POIRepository(engine)
@ -765,6 +815,9 @@ async def get_listing_detail(
"duration_seconds": d.duration_seconds,
"distance_meters": d.distance_meters,
})
app_metrics.listing_detail_step_duration_seconds.record(
time.monotonic() - t_step, {"step": "load_poi_distances"}
)
return ListingDetailResponse(
id=listing.id,

View file

@ -64,6 +64,40 @@ frontend_worker_compute: Histogram
frontend_main_thread: Histogram
frontend_feature_count: Histogram
# ---------------------------------------------------------------------------
# Database query metrics
# ---------------------------------------------------------------------------
db_query_duration_seconds: Histogram
db_query_rows_returned: Histogram
# ---------------------------------------------------------------------------
# Streaming lifecycle metrics
# ---------------------------------------------------------------------------
stream_time_to_first_byte_seconds: Histogram
stream_total_duration_seconds: Histogram
stream_features_total: Counter
stream_requests_total: Counter
# ---------------------------------------------------------------------------
# Cache performance metrics
# ---------------------------------------------------------------------------
cache_operation_duration_seconds: Histogram
cache_repopulation_total: Counter
cache_stale_serves_total: Counter
# ---------------------------------------------------------------------------
# Listing detail metrics
# ---------------------------------------------------------------------------
listing_detail_step_duration_seconds: Histogram
# ---------------------------------------------------------------------------
# Frontend navigation/usage metrics
# ---------------------------------------------------------------------------
frontend_page_load: Histogram
frontend_time_to_first_listing: Histogram
frontend_stream_download: Histogram
frontend_listing_detail_load: Histogram
def init_metrics(service_name: str = "realestate-crawler") -> PrometheusMetricReader:
"""Initialise the OTel MeterProvider and define all instruments.
@ -80,6 +114,14 @@ def init_metrics(service_name: str = "realestate-crawler") -> PrometheusMetricRe
global celery_tasks_total, celery_task_duration_seconds, celery_tasks_active
global frontend_worker_roundtrip, frontend_worker_compute
global frontend_main_thread, frontend_feature_count
global db_query_duration_seconds, db_query_rows_returned
global stream_time_to_first_byte_seconds, stream_total_duration_seconds
global stream_features_total, stream_requests_total
global cache_operation_duration_seconds, cache_repopulation_total
global cache_stale_serves_total
global listing_detail_step_duration_seconds
global frontend_page_load, frontend_time_to_first_listing
global frontend_stream_download, frontend_listing_detail_load
if _reader is not None:
return _reader
@ -172,9 +214,93 @@ def init_metrics(service_name: str = "realestate-crawler") -> PrometheusMetricRe
description="Number of features per heatmap load",
)
# -- Database query timing --
db_query_duration_seconds = _meter.create_histogram(
"db_query_duration_seconds",
description="Duration of individual database queries in seconds",
)
db_query_rows_returned = _meter.create_histogram(
"db_query_rows_returned",
description="Number of rows returned per database query",
)
# -- Streaming lifecycle --
stream_time_to_first_byte_seconds = _meter.create_histogram(
"stream_time_to_first_byte_seconds",
description="Time from handler entry to first NDJSON line",
)
stream_total_duration_seconds = _meter.create_histogram(
"stream_total_duration_seconds",
description="Total wall-clock time for a streaming response",
)
stream_features_total = _meter.create_counter(
"stream_features_total",
description="Total GeoJSON features streamed to clients",
)
stream_requests_total = _meter.create_counter(
"stream_requests_total",
description="Total streaming requests served",
)
# -- Cache performance --
cache_operation_duration_seconds = _meter.create_histogram(
"cache_operation_duration_seconds",
description="Redis cache operation latency in seconds",
)
cache_repopulation_total = _meter.create_counter(
"cache_repopulation_total",
description="Cache repopulation events by result",
)
cache_stale_serves_total = _meter.create_counter(
"cache_stale_serves_total",
description="Number of times stale cache was served during repopulation",
)
# -- Listing detail --
listing_detail_step_duration_seconds = _meter.create_histogram(
"listing_detail_step_duration_seconds",
description="Per-step timing in listing detail endpoint",
)
# -- Frontend navigation/usage --
frontend_page_load = _meter.create_histogram(
"frontend_page_load_seconds",
description="Full page or filter load to data rendered",
)
frontend_time_to_first_listing = _meter.create_histogram(
"frontend_time_to_first_listing_seconds",
description="Time from load trigger to first listing batch on screen",
)
frontend_stream_download = _meter.create_histogram(
"frontend_stream_download_seconds",
description="Client-side total stream download duration",
)
frontend_listing_detail_load = _meter.create_histogram(
"frontend_listing_detail_load_seconds",
description="Time from click to listing detail data rendered",
)
return _reader
def record_db_query(
operation: str,
model: str,
duration: float,
rows: int | None = None,
) -> None:
"""Record a database query timing metric.
Safe to call even when ``init_metrics()`` has not been called (e.g.
from CLI usage) silently no-ops in that case.
"""
if _meter is None:
return
db_query_duration_seconds.record(duration, {"operation": operation, "model": model})
if rows is not None:
db_query_rows_returned.record(rows, {"operation": operation})
def get_metrics_asgi_app(): # type: ignore[no-untyped-def]
"""Return the Prometheus ASGI app for mounting at /metrics."""
return make_asgi_app()

View file

@ -6,7 +6,10 @@ from pydantic import BaseModel, Field, field_validator
import api.metrics as app_metrics
ALLOWED_METRICS = {"worker_roundtrip", "worker_compute", "main_thread", "feature_count"}
ALLOWED_METRICS = {
"worker_roundtrip", "worker_compute", "main_thread", "feature_count",
"page_load", "time_to_first_listing", "stream_download", "listing_detail_load",
}
MAX_BATCH_SIZE = 100
@ -41,5 +44,13 @@ async def record_perf(samples: list[PerfSample]) -> Response:
app_metrics.frontend_main_thread.record(s.value, attrs)
elif s.metric == "feature_count":
app_metrics.frontend_feature_count.record(s.value)
elif s.metric == "page_load":
app_metrics.frontend_page_load.record(s.value, attrs)
elif s.metric == "time_to_first_listing":
app_metrics.frontend_time_to_first_listing.record(s.value, attrs)
elif s.metric == "stream_download":
app_metrics.frontend_stream_download.record(s.value)
elif s.metric == "listing_detail_load":
app_metrics.frontend_listing_detail_load.record(s.value)
return Response(status_code=204)