From e53b1e120a10176866121c7c1f311a2e48b5dcdb Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 22 Feb 2026 13:26:17 +0000 Subject: [PATCH] Defer decision ID fetch to after metadata message Decision IDs are now loaded inside the streaming generators after the metadata message is yielded, eliminating a blocking DB query from the pre-stream path (~100-200ms improvement to TTFB). --- api/app.py | 56 +++++++++++++++++++---------------- tests/test_listing_geojson.py | 41 +++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/api/app.py b/api/app.py index 16a99f6..5c1ead4 100644 --- a/api/app.py +++ b/api/app.py @@ -273,8 +273,7 @@ async def _stream_from_cache( query_parameters: QueryParameters, batch_size: int, limit: int | None, - disliked_ids: set[int] | None = None, - liked_ids: set[int] | None = None, + user_email: str | None = None, decision_filter: str = "all", ) -> AsyncGenerator[str, None]: """Stream GeoJSON features from the Redis cache (cache-hit path).""" @@ -288,6 +287,19 @@ async def _stream_from_cache( "cached": True, }) + "\n" + # Resolve decision IDs (deferred to after metadata is sent) + disliked_ids: set[int] | None = None + liked_ids: set[int] | None = None + if decision_filter != "everything" and user_email: + user_id = _get_user_id_safe(user_email) + if user_id is not None: + decision_repo = DecisionRepository(engine) + listing_type_str = query_parameters.listing_type.value + if decision_filter == "liked": + liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str) + else: + disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str) + count = 0 is_first_batch = True for feature_batch in get_cached_features(query_parameters, batch_size=batch_size): @@ -337,8 +349,7 @@ async def _stream_from_db( limit: int | None, poi_distances_lookup: dict[int, list[dict[str, str | int]]] | None = None, skip_cache: bool = False, - disliked_ids: set[int] | None = None, - liked_ids: set[int] | None = None, + user_email: str | None = None, decision_filter: str = "all", ) -> AsyncGenerator[str, None]: """Stream GeoJSON features from the database, populating the cache as we go.""" @@ -354,6 +365,19 @@ async def _stream_from_db( "cached": False, }) + "\n" + # Resolve decision IDs (deferred to after metadata is sent) + disliked_ids: set[int] | None = None + liked_ids: set[int] | None = None + if decision_filter != "everything" and user_email: + user_id = _get_user_id_safe(user_email) + if user_id is not None: + decision_repo = DecisionRepository(engine) + listing_type_str = query_parameters.listing_type.value + if decision_filter == "liked": + liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str) + else: + disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str) + staging_key: str | None = None if not skip_cache: staging_key = begin_cache_population(query_parameters) @@ -435,31 +459,12 @@ async def stream_listing_geojson( # Build POI distances lookup if requested poi_distances_lookup = _build_poi_distances_lookup(user.email, query_parameters.listing_type) if include_poi_distances else None - # Build decision filter sets - disliked_ids: set[int] | None = None - liked_ids: set[int] | None = None - if decision_filter != "everything": - user_id = _get_user_id_safe(user.email) - if user_id is not None: - decision_repo = DecisionRepository(engine) - listing_type_str = query_parameters.listing_type.value - if decision_filter == "liked": - liked_ids = decision_service.get_liked_ids( - decision_repo, user_id, listing_type_str - ) - else: - # default "all": load disliked to exclude - disliked_ids = decision_service.get_disliked_ids( - decision_repo, user_id, listing_type_str - ) - cached_count = get_cached_count(query_parameters) if cached_count is not None and cached_count > 0 and not include_poi_distances: app_metrics.geojson_cache_operations.add(1, {"result": "hit"}) generator = _stream_from_cache( query_parameters, batch_size, limit, - disliked_ids=disliked_ids, - liked_ids=liked_ids, + user_email=user.email, decision_filter=decision_filter, ) else: @@ -467,8 +472,7 @@ async def stream_listing_geojson( generator = _stream_from_db( query_parameters, batch_size, limit, poi_distances_lookup, skip_cache=include_poi_distances, - disliked_ids=disliked_ids, - liked_ids=liked_ids, + user_email=user.email, decision_filter=decision_filter, ) diff --git a/tests/test_listing_geojson.py b/tests/test_listing_geojson.py index 8735b7a..4b2cb20 100644 --- a/tests/test_listing_geojson.py +++ b/tests/test_listing_geojson.py @@ -315,3 +315,44 @@ class TestStreamingEndpoint: from api.app import FIRST_BATCH_SIZE assert len(batch_messages[0]["features"]) <= FIRST_BATCH_SIZE + def test_streaming_still_filters_disliked(self, client): + """Test that disliked listings are filtered even with deferred decision loading.""" + with patch("api.app.get_cached_count", return_value=None), \ + patch("api.app._get_user_id_safe", return_value=42), \ + patch("api.app.ListingRepository") as MockRepo, \ + patch("api.app.DecisionRepository"), \ + patch("api.app.decision_service") as mock_ds: + + mock_ds.get_disliked_ids.return_value = {2} + + mock_instance = MagicMock() + mock_instance.count_listings.return_value = 2 + mock_instance.stream_listings_optimized.return_value = iter([ + { + 'id': 1, 'price': 2000.0, 'number_of_bedrooms': 2, + 'square_meters': 50.0, 'longitude': -0.1, 'latitude': 51.5, + 'photo_thumbnail': None, 'last_seen': datetime.now(), + 'agency': None, 'price_history_json': '[]', 'available_from': None, + }, + { + 'id': 2, 'price': 2500.0, 'number_of_bedrooms': 2, + 'square_meters': 60.0, 'longitude': -0.12, 'latitude': 51.51, + 'photo_thumbnail': None, 'last_seen': datetime.now(), + 'agency': None, 'price_history_json': '[]', 'available_from': None, + }, + ]) + MockRepo.return_value = mock_instance + + response = client.get("/api/listing_geojson/stream?listing_type=RENT&limit=10") + messages = [json.loads(line) for line in response.text.strip().split("\n")] + + all_features = [] + for m in messages: + if m["type"] == "batch": + all_features.extend(m["features"]) + + feature_ids = [f["properties"]["id"] for f in all_features] + assert 2 not in feature_ids + assert 1 in feature_ids + +