wrongmove/tests/integration/test_repository_advanced.py

123 lines
3.9 KiB
Python
Raw Permalink Normal View History

"""Advanced integration tests for ListingRepository."""
import asyncio
import pytest
from sqlalchemy import Engine
from models.listing import FurnishType, ListingType, QueryParameters
from repositories.listing_repository import ListingRepository
# ---------- Count and basic queries ----------
@pytest.mark.asyncio
async def test_count_matches_get_listings(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT)
count = seeded_repository.count_listings(qp)
listings = await seeded_repository.get_listings(query_parameters=qp)
assert count == len(listings)
@pytest.mark.asyncio
async def test_stream_with_small_page_size(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT)
rows = list(seeded_repository.stream_listings_optimized(qp, page_size=3))
assert len(rows) == 10
# ---------- Filter tests ----------
@pytest.mark.asyncio
async def test_filter_by_bedrooms(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT, min_bedrooms=2, max_bedrooms=2)
listings = await seeded_repository.get_listings(query_parameters=qp)
for listing in listings:
assert listing.number_of_bedrooms == 2
assert len(listings) > 0
@pytest.mark.asyncio
async def test_filter_by_price_range(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT, min_price=1500, max_price=2500)
listings = await seeded_repository.get_listings(query_parameters=qp)
for listing in listings:
assert 1500 <= listing.price <= 2500
assert len(listings) > 0
@pytest.mark.asyncio
async def test_filter_by_max_sqm(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT, max_sqm=50)
listings = await seeded_repository.get_listings(query_parameters=qp)
for listing in listings:
assert listing.square_meters is not None
assert listing.square_meters <= 50
assert len(listings) > 0
@pytest.mark.asyncio
async def test_filter_by_furnish_type(
in_memory_engine: Engine,
rent_listing_factory,
) -> None:
repo = ListingRepository(engine=in_memory_engine)
furnished = rent_listing_factory(id=1, furnish_type=FurnishType.FURNISHED)
unfurnished = rent_listing_factory(id=2, furnish_type=FurnishType.UNFURNISHED)
await repo.upsert_listings([furnished, unfurnished])
qp = QueryParameters(
listing_type=ListingType.RENT,
furnish_types=[FurnishType.FURNISHED],
)
listings = await repo.get_listings(query_parameters=qp)
assert len(listings) == 1
assert listings[0].furnish_type == FurnishType.FURNISHED
# ---------- Concurrency ----------
@pytest.mark.asyncio
async def test_concurrent_upserts(
in_memory_engine: Engine,
rent_listing_factory,
) -> None:
repo = ListingRepository(engine=in_memory_engine)
async def upsert_batch(start_id: int) -> None:
listings = [rent_listing_factory(id=start_id + i) for i in range(5)]
await repo.upsert_listings(listings)
await asyncio.gather(
upsert_batch(1000),
upsert_batch(2000),
upsert_batch(3000),
upsert_batch(4000),
upsert_batch(5000),
)
qp = QueryParameters(listing_type=ListingType.RENT)
total = repo.count_listings(qp)
assert total == 25
# ---------- Streaming ----------
@pytest.mark.asyncio
async def test_stream_optimized_returns_dicts(seeded_repository: ListingRepository) -> None:
qp = QueryParameters(listing_type=ListingType.RENT)
rows = list(seeded_repository.stream_listings_optimized(qp))
assert len(rows) > 0
for row in rows:
assert isinstance(row, dict)
assert "id" in row
assert "price" in row
assert "number_of_bedrooms" in row
assert "square_meters" in row
assert "longitude" in row
assert "latitude" in row