Defer decision ID fetch to after metadata message
Decision IDs are now loaded inside the streaming generators after the metadata message is yielded, eliminating a blocking DB query from the pre-stream path (~100-200ms improvement to TTFB).
This commit is contained in:
parent
e99006e2f9
commit
e53b1e120a
2 changed files with 71 additions and 26 deletions
56
api/app.py
56
api/app.py
|
|
@ -273,8 +273,7 @@ async def _stream_from_cache(
|
||||||
query_parameters: QueryParameters,
|
query_parameters: QueryParameters,
|
||||||
batch_size: int,
|
batch_size: int,
|
||||||
limit: int | None,
|
limit: int | None,
|
||||||
disliked_ids: set[int] | None = None,
|
user_email: str | None = None,
|
||||||
liked_ids: set[int] | None = None,
|
|
||||||
decision_filter: str = "all",
|
decision_filter: str = "all",
|
||||||
) -> AsyncGenerator[str, None]:
|
) -> AsyncGenerator[str, None]:
|
||||||
"""Stream GeoJSON features from the Redis cache (cache-hit path)."""
|
"""Stream GeoJSON features from the Redis cache (cache-hit path)."""
|
||||||
|
|
@ -288,6 +287,19 @@ async def _stream_from_cache(
|
||||||
"cached": True,
|
"cached": True,
|
||||||
}) + "\n"
|
}) + "\n"
|
||||||
|
|
||||||
|
# Resolve decision IDs (deferred to after metadata is sent)
|
||||||
|
disliked_ids: set[int] | None = None
|
||||||
|
liked_ids: set[int] | None = None
|
||||||
|
if decision_filter != "everything" and user_email:
|
||||||
|
user_id = _get_user_id_safe(user_email)
|
||||||
|
if user_id is not None:
|
||||||
|
decision_repo = DecisionRepository(engine)
|
||||||
|
listing_type_str = query_parameters.listing_type.value
|
||||||
|
if decision_filter == "liked":
|
||||||
|
liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str)
|
||||||
|
else:
|
||||||
|
disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str)
|
||||||
|
|
||||||
count = 0
|
count = 0
|
||||||
is_first_batch = True
|
is_first_batch = True
|
||||||
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
|
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
|
||||||
|
|
@ -337,8 +349,7 @@ async def _stream_from_db(
|
||||||
limit: int | None,
|
limit: int | None,
|
||||||
poi_distances_lookup: dict[int, list[dict[str, str | int]]] | None = None,
|
poi_distances_lookup: dict[int, list[dict[str, str | int]]] | None = None,
|
||||||
skip_cache: bool = False,
|
skip_cache: bool = False,
|
||||||
disliked_ids: set[int] | None = None,
|
user_email: str | None = None,
|
||||||
liked_ids: set[int] | None = None,
|
|
||||||
decision_filter: str = "all",
|
decision_filter: str = "all",
|
||||||
) -> AsyncGenerator[str, None]:
|
) -> AsyncGenerator[str, None]:
|
||||||
"""Stream GeoJSON features from the database, populating the cache as we go."""
|
"""Stream GeoJSON features from the database, populating the cache as we go."""
|
||||||
|
|
@ -354,6 +365,19 @@ async def _stream_from_db(
|
||||||
"cached": False,
|
"cached": False,
|
||||||
}) + "\n"
|
}) + "\n"
|
||||||
|
|
||||||
|
# Resolve decision IDs (deferred to after metadata is sent)
|
||||||
|
disliked_ids: set[int] | None = None
|
||||||
|
liked_ids: set[int] | None = None
|
||||||
|
if decision_filter != "everything" and user_email:
|
||||||
|
user_id = _get_user_id_safe(user_email)
|
||||||
|
if user_id is not None:
|
||||||
|
decision_repo = DecisionRepository(engine)
|
||||||
|
listing_type_str = query_parameters.listing_type.value
|
||||||
|
if decision_filter == "liked":
|
||||||
|
liked_ids = decision_service.get_liked_ids(decision_repo, user_id, listing_type_str)
|
||||||
|
else:
|
||||||
|
disliked_ids = decision_service.get_disliked_ids(decision_repo, user_id, listing_type_str)
|
||||||
|
|
||||||
staging_key: str | None = None
|
staging_key: str | None = None
|
||||||
if not skip_cache:
|
if not skip_cache:
|
||||||
staging_key = begin_cache_population(query_parameters)
|
staging_key = begin_cache_population(query_parameters)
|
||||||
|
|
@ -435,31 +459,12 @@ async def stream_listing_geojson(
|
||||||
# Build POI distances lookup if requested
|
# Build POI distances lookup if requested
|
||||||
poi_distances_lookup = _build_poi_distances_lookup(user.email, query_parameters.listing_type) if include_poi_distances else None
|
poi_distances_lookup = _build_poi_distances_lookup(user.email, query_parameters.listing_type) if include_poi_distances else None
|
||||||
|
|
||||||
# Build decision filter sets
|
|
||||||
disliked_ids: set[int] | None = None
|
|
||||||
liked_ids: set[int] | None = None
|
|
||||||
if decision_filter != "everything":
|
|
||||||
user_id = _get_user_id_safe(user.email)
|
|
||||||
if user_id is not None:
|
|
||||||
decision_repo = DecisionRepository(engine)
|
|
||||||
listing_type_str = query_parameters.listing_type.value
|
|
||||||
if decision_filter == "liked":
|
|
||||||
liked_ids = decision_service.get_liked_ids(
|
|
||||||
decision_repo, user_id, listing_type_str
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# default "all": load disliked to exclude
|
|
||||||
disliked_ids = decision_service.get_disliked_ids(
|
|
||||||
decision_repo, user_id, listing_type_str
|
|
||||||
)
|
|
||||||
|
|
||||||
cached_count = get_cached_count(query_parameters)
|
cached_count = get_cached_count(query_parameters)
|
||||||
if cached_count is not None and cached_count > 0 and not include_poi_distances:
|
if cached_count is not None and cached_count > 0 and not include_poi_distances:
|
||||||
app_metrics.geojson_cache_operations.add(1, {"result": "hit"})
|
app_metrics.geojson_cache_operations.add(1, {"result": "hit"})
|
||||||
generator = _stream_from_cache(
|
generator = _stream_from_cache(
|
||||||
query_parameters, batch_size, limit,
|
query_parameters, batch_size, limit,
|
||||||
disliked_ids=disliked_ids,
|
user_email=user.email,
|
||||||
liked_ids=liked_ids,
|
|
||||||
decision_filter=decision_filter,
|
decision_filter=decision_filter,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
|
@ -467,8 +472,7 @@ async def stream_listing_geojson(
|
||||||
generator = _stream_from_db(
|
generator = _stream_from_db(
|
||||||
query_parameters, batch_size, limit, poi_distances_lookup,
|
query_parameters, batch_size, limit, poi_distances_lookup,
|
||||||
skip_cache=include_poi_distances,
|
skip_cache=include_poi_distances,
|
||||||
disliked_ids=disliked_ids,
|
user_email=user.email,
|
||||||
liked_ids=liked_ids,
|
|
||||||
decision_filter=decision_filter,
|
decision_filter=decision_filter,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -315,3 +315,44 @@ class TestStreamingEndpoint:
|
||||||
from api.app import FIRST_BATCH_SIZE
|
from api.app import FIRST_BATCH_SIZE
|
||||||
assert len(batch_messages[0]["features"]) <= FIRST_BATCH_SIZE
|
assert len(batch_messages[0]["features"]) <= FIRST_BATCH_SIZE
|
||||||
|
|
||||||
|
def test_streaming_still_filters_disliked(self, client):
|
||||||
|
"""Test that disliked listings are filtered even with deferred decision loading."""
|
||||||
|
with patch("api.app.get_cached_count", return_value=None), \
|
||||||
|
patch("api.app._get_user_id_safe", return_value=42), \
|
||||||
|
patch("api.app.ListingRepository") as MockRepo, \
|
||||||
|
patch("api.app.DecisionRepository"), \
|
||||||
|
patch("api.app.decision_service") as mock_ds:
|
||||||
|
|
||||||
|
mock_ds.get_disliked_ids.return_value = {2}
|
||||||
|
|
||||||
|
mock_instance = MagicMock()
|
||||||
|
mock_instance.count_listings.return_value = 2
|
||||||
|
mock_instance.stream_listings_optimized.return_value = iter([
|
||||||
|
{
|
||||||
|
'id': 1, 'price': 2000.0, 'number_of_bedrooms': 2,
|
||||||
|
'square_meters': 50.0, 'longitude': -0.1, 'latitude': 51.5,
|
||||||
|
'photo_thumbnail': None, 'last_seen': datetime.now(),
|
||||||
|
'agency': None, 'price_history_json': '[]', 'available_from': None,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'id': 2, 'price': 2500.0, 'number_of_bedrooms': 2,
|
||||||
|
'square_meters': 60.0, 'longitude': -0.12, 'latitude': 51.51,
|
||||||
|
'photo_thumbnail': None, 'last_seen': datetime.now(),
|
||||||
|
'agency': None, 'price_history_json': '[]', 'available_from': None,
|
||||||
|
},
|
||||||
|
])
|
||||||
|
MockRepo.return_value = mock_instance
|
||||||
|
|
||||||
|
response = client.get("/api/listing_geojson/stream?listing_type=RENT&limit=10")
|
||||||
|
messages = [json.loads(line) for line in response.text.strip().split("\n")]
|
||||||
|
|
||||||
|
all_features = []
|
||||||
|
for m in messages:
|
||||||
|
if m["type"] == "batch":
|
||||||
|
all_features.extend(m["features"])
|
||||||
|
|
||||||
|
feature_ids = [f["properties"]["id"] for f in all_features]
|
||||||
|
assert 2 not in feature_ids
|
||||||
|
assert 1 in feature_ids
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue