Add time-to-first-property implementation plan

7 tasks: K8s resource limits, adaptive first batch, deferred
decision fetch, Server-Timing headers, bulk POI distances endpoint,
frontend waterfall elimination, and end-to-end verification.
This commit is contained in:
Viktor Barzin 2026-02-22 13:16:42 +00:00
parent 6d653dba63
commit 8db7b60493
No known key found for this signature in database
GPG key ID: 0EB088298288D958

View file

@ -0,0 +1,683 @@
# Time-to-First-Property Performance Optimization — Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Reduce time-to-first-property from 5s+ to under 1s by eliminating CPU throttling, frontend waterfalls, and pre-stream blocking work.
**Architecture:** Six independent changes: (1) K8s resource limits fix, (2) adaptive first-batch sizing on the backend, (3) deferred decision-ID fetch, (4) Server-Timing headers, (5) bulk POI distances endpoint, (6) frontend waterfall elimination. Tasks 1-4 are backend-only; task 5 spans backend+frontend; task 6 is frontend-only.
**Tech Stack:** Python/FastAPI, Redis, K8s YAML, React/TypeScript, NDJSON streaming
---
### Task 1: Set Explicit API Pod Resources in K8s
The `realestate-crawler-api` deployment inherits LimitRange defaults (500m CPU / 1Gi memory). The 500m CPU limit causes kernel throttling during streaming requests (~240-460ms lost per request).
**Files:**
- Create: `k8s/api-deployment.yaml`
**Step 1: Create the K8s deployment manifest**
Extract the current deployment, then set explicit resources. Run:
```bash
KUBECONFIG=.config kubectl get deployment realestate-crawler-api -n realestate-crawler -o yaml > k8s/api-deployment.yaml
```
Then edit `k8s/api-deployment.yaml` to set these resource values on the container spec:
```yaml
resources:
requests:
cpu: "50m"
memory: "128Mi"
limits:
cpu: "2000m"
memory: "1Gi"
```
Remove server-managed fields (`managedFields`, `resourceVersion`, `uid`, `creationTimestamp`, `generation`, `status`) to make it a clean declarative manifest.
**Step 2: Also create a manifest for celery-beat**
Extract and clean up `realestate-crawler-celery-beat` the same way. It only needs minimal resources:
```yaml
resources:
requests:
cpu: "10m"
memory: "64Mi"
limits:
cpu: "200m"
memory: "256Mi"
```
**Step 3: Apply and verify**
```bash
KUBECONFIG=.config kubectl apply -f k8s/api-deployment.yaml
KUBECONFIG=.config kubectl rollout status deployment/realestate-crawler-api -n realestate-crawler
```
Verify throttling is eliminated by re-running the performance test from the design doc and checking `nr_throttled` delta is 0.
**Step 4: Commit**
```bash
git add k8s/api-deployment.yaml k8s/celery-beat-deployment.yaml
git commit -m "Set explicit resource limits for API and celery-beat pods
Overrides LimitRange defaults (500m CPU) which caused kernel CPU
throttling during streaming requests. API gets 2000m CPU limit,
celery-beat gets 200m."
```
---
### Task 2: Adaptive First Batch Size
Send a small "primer" batch (5 features) immediately, then switch to normal batch size. This reduces first-property latency by ~10x on the server side.
**Files:**
- Modify: `api/app.py:55` (add `FIRST_BATCH_SIZE` constant)
- Modify: `api/app.py:271-311` (`_stream_from_cache`)
- Modify: `api/app.py:314-383` (`_stream_from_db`)
- Test: `tests/test_listing_geojson.py`
**Step 1: Write the failing test**
Add to `tests/test_listing_geojson.py` in `TestStreamingEndpoint`:
```python
def test_first_batch_is_smaller(self, client, mock_repository):
"""Test that the first batch is smaller than subsequent batches for fast first paint."""
response = client.get("/api/listing_geojson/stream?listing_type=RENT&batch_size=50&limit=10")
lines = response.text.strip().split("\n")
messages = [json.loads(line) for line in lines]
batch_messages = [m for m in messages if m["type"] == "batch"]
assert len(batch_messages) >= 1
# First batch should contain FIRST_BATCH_SIZE (5) or fewer features
from api.app import FIRST_BATCH_SIZE
assert len(batch_messages[0]["features"]) <= FIRST_BATCH_SIZE
```
**Step 2: Run test to verify it fails**
```bash
pytest tests/test_listing_geojson.py::TestStreamingEndpoint::test_first_batch_is_smaller -v
```
Expected: FAIL — `FIRST_BATCH_SIZE` not defined, or first batch has 50 features.
**Step 3: Implement adaptive batching**
In `api/app.py`, add the constant near line 55:
```python
FIRST_BATCH_SIZE = 5
```
In `_stream_from_cache` (around line 291), replace the single-batch-size loop with adaptive logic:
```python
count = 0
is_first_batch = True
for feature_batch in get_cached_features(query_parameters, batch_size=batch_size):
# Apply decision filtering
if decision_filter != "everything":
feature_batch = [
f for f in feature_batch
if _should_include(
f.get("properties", {}).get("id", 0),
decision_filter,
disliked_ids,
liked_ids,
)
]
if limit and count + len(feature_batch) > limit:
feature_batch = feature_batch[:limit - count]
count += len(feature_batch)
if feature_batch:
if is_first_batch:
# Yield a small primer batch for fast first paint
yield json.dumps({"type": "batch", "features": feature_batch[:FIRST_BATCH_SIZE]}) + "\n"
remainder = feature_batch[FIRST_BATCH_SIZE:]
if remainder:
yield json.dumps({"type": "batch", "features": remainder}) + "\n"
is_first_batch = False
else:
yield json.dumps({"type": "batch", "features": feature_batch}) + "\n"
if limit and count >= limit:
break
```
In `_stream_from_db` (around line 358-366), replace the batch accumulation logic:
```python
count = 0
batch: list[dict] = []
is_first_batch = True
current_batch_target = FIRST_BATCH_SIZE
for row in repository.stream_listings_optimized(
query_parameters, limit=limit, page_size=batch_size
):
feature = convert_row_to_geojson(row, query_parameters.listing_type.value)
if poi_distances_lookup and row['id'] in poi_distances_lookup:
feature['properties']['poi_distances'] = poi_distances_lookup[row['id']]
if not _should_include(row['id'], decision_filter, disliked_ids, liked_ids):
if staging_key:
cache_features_batch_staged(staging_key, [feature])
continue
batch.append(feature)
count += 1
if len(batch) >= current_batch_target:
if staging_key:
cache_features_batch_staged(staging_key, batch)
yield json.dumps({"type": "batch", "features": batch}) + "\n"
batch = []
if is_first_batch:
is_first_batch = False
current_batch_target = batch_size
```
**Step 4: Run tests**
```bash
pytest tests/test_listing_geojson.py -v
```
Expected: All pass, including `test_first_batch_is_smaller`.
**Step 5: Commit**
```bash
git add api/app.py tests/test_listing_geojson.py
git commit -m "Send smaller first batch (5 features) for faster first paint
Subsequent batches use the normal batch_size (default 50). This
reduces server-side time-to-first-property by ~10x since only 5
features need to be serialized before the first yield."
```
---
### Task 3: Defer Decision ID Fetch
Move the decision ID lookup out of the blocking pre-stream path. Start streaming immediately, fetch decision IDs concurrently.
**Files:**
- Modify: `api/app.py:385-459` (`stream_listing_geojson`)
- Modify: `api/app.py:271-311` (`_stream_from_cache`)
- Modify: `api/app.py:314-383` (`_stream_from_db`)
- Test: `tests/test_listing_geojson.py`
**Step 1: Write the failing test**
Add to `tests/test_listing_geojson.py` in `TestStreamingEndpoint`:
```python
def test_streaming_still_filters_disliked(self, client):
"""Test that disliked listings are filtered even with deferred decision loading."""
with patch("api.app.get_cached_count", return_value=None), \
patch("api.app._get_user_id_safe", return_value=42), \
patch("api.app.ListingRepository") as MockRepo, \
patch("api.app.DecisionRepository"), \
patch("api.app.decision_service") as mock_ds:
mock_ds.get_disliked_ids.return_value = {2}
mock_instance = MagicMock()
mock_instance.count_listings.return_value = 3
mock_instance.stream_listings_optimized.return_value = iter([
{
'id': 1, 'price': 2000.0, 'number_of_bedrooms': 2,
'square_meters': 50.0, 'longitude': -0.1, 'latitude': 51.5,
'photo_thumbnail': None, 'last_seen': datetime.now(),
'agency': None, 'price_history_json': '[]', 'available_from': None,
},
{
'id': 2, 'price': 2500.0, 'number_of_bedrooms': 2,
'square_meters': 60.0, 'longitude': -0.12, 'latitude': 51.51,
'photo_thumbnail': None, 'last_seen': datetime.now(),
'agency': None, 'price_history_json': '[]', 'available_from': None,
},
])
MockRepo.return_value = mock_instance
response = client.get("/api/listing_geojson/stream?listing_type=RENT&limit=10")
messages = [json.loads(line) for line in response.text.strip().split("\n")]
all_features = []
for m in messages:
if m["type"] == "batch":
all_features.extend(m["features"])
# Listing 2 should be filtered out
feature_ids = [f["properties"]["id"] for f in all_features]
assert 2 not in feature_ids
assert 1 in feature_ids
```
**Step 2: Run test to verify it passes (existing behavior)**
```bash
pytest tests/test_listing_geojson.py::TestStreamingEndpoint::test_streaming_still_filters_disliked -v
```
Expected: PASS — this is a behavior-preservation test.
**Step 3: Refactor to defer decision loading**
In `stream_listing_geojson` (line ~415-431), replace the synchronous decision loading with a lazy loader pattern. Pass the user email and decision_filter to the generators, and let them fetch IDs on demand:
Change the `stream_listing_geojson` endpoint to pass `user_email` and `decision_filter` to the generators instead of pre-fetched ID sets. The generators call `_get_user_id_safe` and `decision_service` internally after yielding the metadata message.
In both `_stream_from_cache` and `_stream_from_db`, add parameters `user_email: str | None = None` and `decision_filter: str = "all"`. After yielding the metadata message, resolve the decision IDs:
```python
# Resolve decision IDs (deferred to after metadata is sent)
disliked_ids: set[int] | None = None
liked_ids: set[int] | None = None
if decision_filter != "everything" and user_email:
user_id = _get_user_id_safe(user_email)
if user_id is not None:
decision_repo = DecisionRepository(engine)
listing_type_str = query_parameters.listing_type.value
if decision_filter == "liked":
liked_ids = decision_service.get_liked_ids(
decision_repo, user_id, listing_type_str
)
else:
disliked_ids = decision_service.get_disliked_ids(
decision_repo, user_id, listing_type_str
)
```
Remove the `disliked_ids` and `liked_ids` parameters from the generator signatures and the pre-fetch from `stream_listing_geojson`.
**Step 4: Run all tests**
```bash
pytest tests/test_listing_geojson.py -v
```
Expected: All pass — decision filtering still works, but now happens after the metadata message.
**Step 5: Commit**
```bash
git add api/app.py tests/test_listing_geojson.py
git commit -m "Defer decision ID fetch to after metadata message
Decision IDs are now loaded inside the streaming generators after
the metadata message is yielded, eliminating a blocking DB query
from the pre-stream path (~100-200ms improvement to TTFB)."
```
---
### Task 4: Add Server-Timing Headers
Add `Server-Timing` headers to the streaming response so latency breakdown is visible in browser DevTools.
**Files:**
- Modify: `api/app.py:385-459` (`stream_listing_geojson`)
- Test: `tests/test_listing_geojson.py`
**Step 1: Write the failing test**
Add to `TestStreamingEndpoint`:
```python
def test_streaming_includes_server_timing_header(self, client, mock_repository):
"""Test that streaming response includes Server-Timing header."""
response = client.get("/api/listing_geojson/stream?listing_type=RENT&limit=10")
assert response.status_code == 200
assert "server-timing" in response.headers
# Should contain at least the cache_check timing
assert "cache_check" in response.headers["server-timing"]
```
**Step 2: Run test to verify it fails**
```bash
pytest tests/test_listing_geojson.py::TestStreamingEndpoint::test_streaming_includes_server_timing_header -v
```
Expected: FAIL — no `server-timing` header.
**Step 3: Implement Server-Timing**
In `stream_listing_geojson`, add timing around the cache check and build a `Server-Timing` header:
```python
import time
# ... inside stream_listing_geojson, after rate limit logic:
timings: list[str] = []
t0 = time.monotonic()
cached_count = get_cached_count(query_parameters)
timings.append(f"cache_check;dur={(time.monotonic() - t0) * 1000:.1f}")
# ... rest of the logic to choose generator ...
return StreamingResponse(
generator,
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Server-Timing": ", ".join(timings),
}
)
```
**Step 4: Run tests**
```bash
pytest tests/test_listing_geojson.py -v
```
Expected: All pass.
**Step 5: Commit**
```bash
git add api/app.py tests/test_listing_geojson.py
git commit -m "Add Server-Timing header to streaming endpoint
Reports cache_check latency in the response header, visible in
browser DevTools Network tab for ongoing performance monitoring."
```
---
### Task 5: Bulk POI Distances Endpoint
Add a new endpoint that returns all POI distances for a user in one call, keyed by listing ID. This decouples POI distances from the listing stream, allowing the stream to always hit the Redis cache.
**Files:**
- Modify: `api/poi_routes.py` (add new endpoint)
- Modify: `frontend/src/services/poiService.ts` (add bulk fetch function)
- Modify: `frontend/src/services/index.ts` (re-export)
- Modify: `frontend/src/constants/index.ts` (add endpoint constant)
- Test: `tests/unit/test_poi_validation.py` or new test file
**Step 1: Write the failing test**
Create or add to a test file for the POI routes:
```python
# In tests/test_listing_geojson.py or a new tests/test_poi_distances_bulk.py
def test_bulk_poi_distances_returns_dict(client):
"""Test bulk POI distances endpoint returns listing_id -> distances mapping."""
with patch("api.poi_routes._get_user_id", return_value=42), \
patch("api.poi_routes.POIRepository") as MockRepo, \
patch("api.poi_routes.poi_service") as mock_svc:
mock_pois = [MagicMock(id=1, name="Office")]
mock_svc.get_user_pois.return_value = mock_pois
mock_svc.get_distances_for_listings.return_value = [
MagicMock(listing_id=100, poi_id=1, travel_mode="WALK",
duration_seconds=600, distance_meters=800),
]
mock_repo_instance = MagicMock()
MockRepo.return_value = mock_repo_instance
mock_repo_instance.get_listing_ids.return_value = [100, 200]
response = client.get("/api/poi/distances/bulk?listing_type=RENT")
assert response.status_code == 200
data = response.json()
assert "100" in data # JSON keys are strings
assert data["100"][0]["poi_name"] == "Office"
```
**Step 2: Run test to verify it fails**
```bash
pytest tests/test_poi_distances_bulk.py -v # or wherever you placed it
```
Expected: FAIL — endpoint doesn't exist.
**Step 3: Implement the bulk endpoint**
In `api/poi_routes.py`, add:
```python
@poi_router.get("/distances/bulk")
async def get_bulk_distances(
user: Annotated[User, Depends(get_current_user)],
listing_type: ListingType = ListingType.RENT,
) -> dict[int, list[POIDistanceResponse]]:
"""Get all POI distances for the current user, keyed by listing ID.
Used by the frontend to decouple POI distance loading from the main
listing stream, allowing the stream to always hit the Redis cache.
"""
user_id = _get_user_id(user)
repo = POIRepository(engine)
pois = {p.id: p for p in poi_service.get_user_pois(repo, user_id)}
if not pois:
return {}
from repositories.listing_repository import ListingRepository
from database import engine as db_engine
listing_repo = ListingRepository(db_engine)
all_ids = list(listing_repo.get_listing_ids(listing_type))
if not all_ids:
return {}
distances = poi_service.get_distances_for_listings(
repo, all_ids, listing_type, user_id
)
result: dict[int, list[POIDistanceResponse]] = {}
for d in distances:
poi_name = pois[d.poi_id].name if d.poi_id in pois else "Unknown"
result.setdefault(d.listing_id, []).append(
POIDistanceResponse(
poi_id=d.poi_id,
poi_name=poi_name,
travel_mode=d.travel_mode,
duration_seconds=d.duration_seconds,
distance_meters=d.distance_meters,
)
)
return result
```
Note: The `/distances/bulk` route must be registered before `/distances` (which takes query params) to avoid route conflicts. Verify the order in the file.
**Step 4: Add frontend fetch function**
In `frontend/src/services/poiService.ts`, add:
```typescript
export async function fetchBulkPOIDistances(
user: AuthUser,
listingType: 'RENT' | 'BUY' = 'RENT'
): Promise<Record<number, POIDistanceInfo[]>> {
return apiRequest<Record<number, POIDistanceInfo[]>>(user, '/api/poi/distances/bulk', {
params: { listing_type: listingType },
});
}
```
In `frontend/src/services/index.ts`, add `fetchBulkPOIDistances` to the `poiService` re-export line.
**Step 5: Run tests**
```bash
pytest tests/ -v --tb=short -k "poi_distances_bulk or streaming"
```
Expected: All pass.
**Step 6: Commit**
```bash
git add api/poi_routes.py frontend/src/services/poiService.ts frontend/src/services/index.ts
git commit -m "Add bulk POI distances endpoint for decoupled loading
New GET /api/poi/distances/bulk returns all POI distances keyed by
listing ID, allowing the frontend to fetch distances separately
from the listing stream and keep the stream on the cached path."
```
---
### Task 6: Eliminate Frontend Waterfall
Remove the sequential dependency: POI fetch → loadListings. Fire both in parallel. Merge POI distances into features after both resolve.
**Files:**
- Modify: `frontend/src/App.tsx:120-203` (auto-load, loadListings, POI merge)
- Modify: `frontend/src/services/streamingService.ts:68-73` (remove includePoiDistances)
**Step 1: Modify `loadListings` to never request POI distances inline**
In `frontend/src/App.tsx`, change the `loadListings` callback. Remove `includePoiDistances` from the streaming call — it should always be `false`:
```typescript
for await (const batch of streamListingGeoJSON(user, parameters, (progress) => {
setStreamingProgress(progress);
}, { signal: controller.signal })) {
```
Remove `includePoiDistances` from the options entirely.
**Step 2: Fire loadListings immediately on user auth, don't wait for POIs**
Change the auto-load `useEffect` (around line 259-272) to fire `loadListings` immediately. Remove `userPOIs` from the `loadListings` dependency array:
```typescript
// Auto-load data with default filters when user is authenticated
useEffect(() => {
if (!user || initialLoadTriggeredRef.current) {
return;
}
initialLoadTriggeredRef.current = true;
const defaultParams: ParameterValues = {
...DEFAULT_FILTER_VALUES,
available_from: new Date(),
};
loadListings(defaultParams);
}, [user, loadListings]);
```
The `loadListings` `useCallback` dependency array should no longer include `userPOIs`.
**Step 3: Add POI distance merging after both resolve**
Add a new `useEffect` that triggers when both `listingData` and `userPOIs` are available. It fetches bulk POI distances and merges them into the existing features:
```typescript
// Merge POI distances into listing data after both are available
useEffect(() => {
if (!user || !listingData || userPOIs.length === 0) return;
let cancelled = false;
fetchBulkPOIDistances(user, queryParameters?.listing_type as 'RENT' | 'BUY' ?? 'RENT')
.then((distanceLookup) => {
if (cancelled) return;
// Merge distances into existing features
const updatedFeatures = accumulatedFeaturesRef.current.map(feature => {
const id = feature.properties?.id;
if (id && distanceLookup[id]) {
return {
...feature,
properties: {
...feature.properties,
poi_distances: distanceLookup[id],
},
};
}
return feature;
});
accumulatedFeaturesRef.current = updatedFeatures;
setListingData({
type: 'FeatureCollection',
features: [...updatedFeatures],
});
})
.catch(() => {}); // POI distances are best-effort
return () => { cancelled = true; };
}, [user, listingData?.features.length, userPOIs.length]);
```
Import `fetchBulkPOIDistances` from `@/services`.
**Step 4: Clean up streamingService**
In `frontend/src/services/streamingService.ts`, remove the `includePoiDistances` option from the function signature and query string building (lines 75-77):
Remove:
```typescript
if (options?.includePoiDistances) {
params.include_poi_distances = 'true';
}
```
Update the options type to remove `includePoiDistances`.
**Step 5: Run frontend tests**
```bash
cd frontend && npx vitest run --reporter=verbose
```
Expected: All pass (some tests may need the `includePoiDistances` option removed from their calls).
**Step 6: Commit**
```bash
git add frontend/src/App.tsx frontend/src/services/streamingService.ts
git commit -m "Eliminate frontend POI waterfall for faster initial load
Listing stream fires immediately on auth without waiting for POI
fetch. POI distances are fetched separately and merged into features
after both resolve. This saves ~200-500ms on initial load and keeps
the stream on the cached Redis path."
```
---
### Task 7: Verify End-to-End
**Step 1: Re-run the performance test from the design doc**
Port-forward to the API pod and run the same Python timing script from the design diagnostics, comparing before/after:
```bash
KUBECONFIG=.config kubectl port-forward deployment/realestate-crawler-api -n realestate-crawler 15001:5001 &
# Run timing script...
```
Verify:
- `nr_throttled` delta is 0 during the request (Task 1)
- First batch arrives within ~100ms of connection (Tasks 2+3)
- `Server-Timing` header is present (Task 4)
**Step 2: Test in browser**
Open the app, clear cache, log in, and observe:
- First property markers appear within ~1s
- POI distances populate shortly after
- All filters work correctly
**Step 3: Commit the design doc update with results**
Update `docs/plans/2026-02-22-time-to-first-property-design.md` with measured results and commit.