"""End-to-end tests for full API workflows.""" import json from unittest.mock import AsyncMock import pytest from httpx import AsyncClient from sqlalchemy import Engine from repositories.listing_repository import ListingRepository pytestmark = pytest.mark.e2e @pytest.fixture(autouse=True) def patch_db_engine(in_memory_engine: Engine, monkeypatch: pytest.MonkeyPatch) -> None: import database import api.app import api.rate_limiter monkeypatch.setattr(database, "engine", in_memory_engine) monkeypatch.setattr(api.app, "engine", in_memory_engine) # Disable rate limiting for E2E tests monkeypatch.setattr(api.rate_limiter, "_match_endpoint", lambda path, config: None) @pytest.fixture(autouse=True) def patch_redis_client(fake_redis, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr("services.listing_cache._get_redis_client", lambda: fake_redis) def _parse_ndjson(text: str) -> list[dict]: return [json.loads(line) for line in text.strip().split("\n") if line.strip()] # ---------- Streaming with filters ---------- @pytest.mark.asyncio async def test_seed_and_stream_with_filter( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [ rent_listing_factory(id=i, price=1000 + i * 300, square_meters=40.0) for i in range(1, 21) ] await listing_repository.upsert_listings(listings) resp = await async_client.get( "/api/listing_geojson/stream?listing_type=RENT&max_price=3000" ) lines = _parse_ndjson(resp.text) batches = [l for l in lines if l["type"] == "batch"] all_features = [f for b in batches for f in b["features"]] complete = lines[-1] for feat in all_features: assert feat["properties"]["total_price"] <= 3000 assert complete["type"] == "complete" assert complete["total"] == len(all_features) @pytest.mark.asyncio async def test_large_batch_streaming( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [ rent_listing_factory(id=i, square_meters=40.0) for i in range(1, 201) ] await listing_repository.upsert_listings(listings) resp = await async_client.get( "/api/listing_geojson/stream?listing_type=RENT&batch_size=50" ) lines = _parse_ndjson(resp.text) batches = [l for l in lines if l["type"] == "batch"] complete = lines[-1] assert len(batches) == 5 # first batch is smaller (FIRST_BATCH_SIZE=5), then 4x50 assert complete["total"] == 200 @pytest.mark.asyncio async def test_empty_result_set(async_client: AsyncClient) -> None: resp = await async_client.get( "/api/listing_geojson/stream?listing_type=RENT" ) lines = _parse_ndjson(resp.text) assert lines[0]["type"] == "metadata" complete = lines[-1] assert complete["type"] == "complete" assert complete["total"] == 0 @pytest.mark.asyncio async def test_refresh_creates_task( async_client: AsyncClient, monkeypatch: pytest.MonkeyPatch, ) -> None: from services.listing_service import RefreshResult async def fake_refresh(*args, **kwargs): return RefreshResult(task_id="e2e-task-1", new_listings_count=0, message="started") monkeypatch.setattr("services.listing_service.refresh_listings", fake_refresh) monkeypatch.setattr("services.task_service.add_task_for_user", lambda email, tid: None) monkeypatch.setattr("notifications.send_notification", AsyncMock(return_value=None)) resp = await async_client.post("/api/refresh_listings?listing_type=RENT") assert resp.status_code == 200 assert resp.json()["task_id"] == "e2e-task-1" @pytest.mark.asyncio async def test_cache_populated_on_first_stream( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [rent_listing_factory(id=i, square_meters=40.0) for i in range(1, 6)] await listing_repository.upsert_listings(listings) # First stream — cache miss, populated from DB resp1 = await async_client.get("/api/listing_geojson/stream?listing_type=RENT") lines1 = _parse_ndjson(resp1.text) assert lines1[0]["cached"] is False # Second stream — should hit cache resp2 = await async_client.get("/api/listing_geojson/stream?listing_type=RENT") lines2 = _parse_ndjson(resp2.text) assert lines2[0]["cached"] is True @pytest.mark.asyncio async def test_stream_filter_price( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [ rent_listing_factory(id=i, price=500 * i, square_meters=40.0) for i in range(1, 11) ] await listing_repository.upsert_listings(listings) resp = await async_client.get( "/api/listing_geojson/stream?listing_type=RENT&min_price=1500&max_price=3000" ) lines = _parse_ndjson(resp.text) batches = [l for l in lines if l["type"] == "batch"] all_features = [f for b in batches for f in b["features"]] for feat in all_features: assert 1500 <= feat["properties"]["total_price"] <= 3000 @pytest.mark.asyncio async def test_stream_filter_bedrooms( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [ rent_listing_factory(id=i, number_of_bedrooms=(i % 4) + 1, square_meters=40.0) for i in range(1, 21) ] await listing_repository.upsert_listings(listings) resp = await async_client.get( "/api/listing_geojson/stream?listing_type=RENT&min_bedrooms=2&max_bedrooms=3" ) lines = _parse_ndjson(resp.text) batches = [l for l in lines if l["type"] == "batch"] all_features = [f for b in batches for f in b["features"]] for feat in all_features: assert 2 <= feat["properties"]["rooms"] <= 3 assert len(all_features) > 0 @pytest.mark.asyncio async def test_complete_total_matches_actual( async_client: AsyncClient, listing_repository: ListingRepository, rent_listing_factory, ) -> None: listings = [rent_listing_factory(id=i, square_meters=40.0) for i in range(1, 16)] await listing_repository.upsert_listings(listings) resp = await async_client.get("/api/listing_geojson/stream?listing_type=RENT") lines = _parse_ndjson(resp.text) batches = [l for l in lines if l["type"] == "batch"] total_features = sum(len(b["features"]) for b in batches) complete = lines[-1] assert complete["type"] == "complete" assert complete["total"] == total_features assert complete["total"] == 15