wrongmove/tests/integration/test_api_workflow.py
Viktor Barzin 41b7d221e4
Fix 7 bugs: security, memory leak, stale state, error handling
- WebSocket: verify task ownership before allowing subscribe (security)
- POI routes: replace assert with HTTPException for production safety
- cancel_task: return HTTP 404 instead of 200 for missing tasks
- routing_config: add descriptive ValueError for invalid env vars
- POIManager: show error feedback instead of silently swallowing failures
- VisualizationCard: reset POI/travel mode state on metric switch
- Map: clean up heatmap layers/sources on unmount to prevent memory leak
- Update test to expect 404 from cancel_task ownership check
2026-02-13 19:36:43 +00:00

318 lines
10 KiB
Python

"""Integration tests for API workflow endpoints."""
import json
from unittest.mock import AsyncMock
import pytest
from httpx import AsyncClient
from sqlalchemy import Engine
from repositories.listing_repository import ListingRepository
from services.task_service import TaskStatus
@pytest.fixture(autouse=True)
def patch_db_engine(in_memory_engine: Engine, monkeypatch: pytest.MonkeyPatch) -> None:
"""Patch the database engine and disable rate limiting for tests."""
import database
import api.app
import api.rate_limiter
monkeypatch.setattr(database, "engine", in_memory_engine)
monkeypatch.setattr(api.app, "engine", in_memory_engine)
# Disable rate limiting by making _match_endpoint always return None
monkeypatch.setattr(api.rate_limiter, "_match_endpoint", lambda path, config: None)
@pytest.fixture(autouse=True)
def patch_redis_for_streaming(fake_redis, monkeypatch: pytest.MonkeyPatch) -> None:
"""Patch Redis client used by listing cache so streaming doesn't hit real Redis."""
monkeypatch.setattr("services.listing_cache._get_redis_client", lambda: fake_redis)
# ---------- Listing query tests ----------
@pytest.mark.asyncio
async def test_listing_returns_inserted_data(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listings = [rent_listing_factory(id=i) for i in range(1, 4)]
await listing_repository.upsert_listings(listings)
resp = await async_client.get("/api/listing?limit=10")
assert resp.status_code == 200
data = resp.json()
assert len(data["listings"]) == 3
@pytest.mark.asyncio
async def test_listing_respects_limit(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listings = [rent_listing_factory(id=i) for i in range(1, 6)]
await listing_repository.upsert_listings(listings)
resp = await async_client.get("/api/listing?limit=2")
assert resp.status_code == 200
assert len(resp.json()["listings"]) == 2
@pytest.mark.asyncio
async def test_geojson_returns_feature_collection(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listings = [rent_listing_factory(id=i, square_meters=50.0) for i in range(1, 4)]
await listing_repository.upsert_listings(listings)
resp = await async_client.get("/api/listing_geojson?listing_type=RENT")
assert resp.status_code == 200
data = resp.json()
assert data["type"] == "FeatureCollection"
assert len(data["features"]) == 3
@pytest.mark.asyncio
async def test_geojson_features_have_properties(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listing = rent_listing_factory(id=1, price=2500, number_of_bedrooms=2, square_meters=60.0)
await listing_repository.upsert_listings([listing])
resp = await async_client.get("/api/listing_geojson?listing_type=RENT")
assert resp.status_code == 200
features = resp.json()["features"]
assert len(features) == 1
feat = features[0]
assert feat["geometry"]["type"] == "Point"
props = feat["properties"]
for key in ("url", "total_price", "rooms", "qm", "qmprice", "agency", "last_seen"):
assert key in props, f"Missing property: {key}"
# ---------- Streaming tests ----------
def _parse_ndjson(text: str) -> list[dict]:
return [json.loads(line) for line in text.strip().split("\n") if line.strip()]
@pytest.mark.asyncio
async def test_stream_metadata_batch_complete(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listings = [rent_listing_factory(id=i, square_meters=40.0) for i in range(1, 4)]
await listing_repository.upsert_listings(listings)
resp = await async_client.get("/api/listing_geojson/stream?listing_type=RENT")
assert resp.status_code == 200
lines = _parse_ndjson(resp.text)
assert lines[0]["type"] == "metadata"
batches = [l for l in lines if l["type"] == "batch"]
assert len(batches) >= 1
complete = lines[-1]
assert complete["type"] == "complete"
assert complete["total"] == 3
@pytest.mark.asyncio
async def test_stream_respects_limit(
async_client: AsyncClient,
listing_repository: ListingRepository,
rent_listing_factory,
) -> None:
listings = [rent_listing_factory(id=i, square_meters=40.0) for i in range(1, 11)]
await listing_repository.upsert_listings(listings)
resp = await async_client.get("/api/listing_geojson/stream?listing_type=RENT&limit=3")
lines = _parse_ndjson(resp.text)
complete = lines[-1]
assert complete["type"] == "complete"
assert complete["total"] == 3
@pytest.mark.asyncio
async def test_stream_empty_db(async_client: AsyncClient) -> None:
resp = await async_client.get("/api/listing_geojson/stream?listing_type=RENT")
lines = _parse_ndjson(resp.text)
assert lines[0]["type"] == "metadata"
complete = lines[-1]
assert complete["type"] == "complete"
assert complete["total"] == 0
# ---------- Task management tests ----------
@pytest.mark.asyncio
async def test_refresh_returns_task_id(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
from services.listing_service import RefreshResult
async def fake_refresh(*args, **kwargs):
return RefreshResult(task_id="test-123", new_listings_count=0, message="Task started")
import services.listing_service
monkeypatch.setattr(services.listing_service, "refresh_listings", fake_refresh)
monkeypatch.setattr("services.task_service.add_task_for_user", lambda email, tid: None)
monkeypatch.setattr("notifications.send_notification", AsyncMock(return_value=None))
resp = await async_client.post("/api/refresh_listings?listing_type=RENT")
assert resp.status_code == 200
data = resp.json()
assert data["task_id"] == "test-123"
@pytest.mark.asyncio
async def test_task_tracked_for_user(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
from services.listing_service import RefreshResult
async def fake_refresh(*args, **kwargs):
return RefreshResult(task_id="task-abc", new_listings_count=0, message="ok")
import services.listing_service
monkeypatch.setattr(services.listing_service, "refresh_listings", fake_refresh)
monkeypatch.setattr("notifications.send_notification", AsyncMock(return_value=None))
calls: list[tuple[str, str]] = []
import services.task_service
monkeypatch.setattr(
services.task_service,
"add_task_for_user",
lambda email, tid: calls.append((email, tid)),
)
await async_client.post("/api/refresh_listings?listing_type=RENT")
assert len(calls) == 1
assert calls[0] == ("test@example.com", "task-abc")
@pytest.mark.asyncio
async def test_task_status_works(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.get_user_tasks", lambda email: ["test-123"])
monkeypatch.setattr(
"services.task_service.get_task_status",
lambda task_id: TaskStatus(
task_id="test-123",
status="SUCCESS",
result=None,
progress=1.0,
processed=10,
total=10,
message="Done",
error=None,
traceback=None,
),
)
resp = await async_client.get("/api/task_status?task_id=test-123")
assert resp.status_code == 200
data = resp.json()
assert data["task_id"] == "test-123"
assert data["status"] == "SUCCESS"
assert data["progress"] == 1.0
@pytest.mark.asyncio
async def test_task_not_found_returns_404(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.get_user_tasks", lambda email: [])
resp = await async_client.get("/api/task_status?task_id=unknown")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_cancel_task(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.get_user_tasks", lambda email: ["test-123"])
monkeypatch.setattr("services.task_service.cancel_task", lambda task_id, user_email=None: True)
resp = await async_client.post("/api/cancel_task?task_id=test-123")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
@pytest.mark.asyncio
async def test_clear_all_tasks(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.clear_all_tasks", lambda email: 3)
resp = await async_client.post("/api/clear_all_tasks")
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 3
assert data["success"] is True
# ---------- Additional edge cases ----------
@pytest.mark.asyncio
async def test_listing_empty_db_returns_empty(async_client: AsyncClient) -> None:
resp = await async_client.get("/api/listing?limit=10")
assert resp.status_code == 200
assert resp.json()["listings"] == []
@pytest.mark.asyncio
async def test_geojson_empty_db_returns_empty_collection(async_client: AsyncClient) -> None:
resp = await async_client.get("/api/listing_geojson?listing_type=RENT")
assert resp.status_code == 200
data = resp.json()
assert data["type"] == "FeatureCollection"
assert len(data["features"]) == 0
@pytest.mark.asyncio
async def test_cancel_task_not_owned(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.get_user_tasks", lambda email: [])
resp = await async_client.post("/api/cancel_task?task_id=not-mine")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_tasks_for_user(
async_client: AsyncClient,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr("services.task_service.get_user_tasks", lambda email: ["a", "b", "c"])
resp = await async_client.get("/api/tasks_for_user")
assert resp.status_code == 200
assert resp.json() == ["a", "b", "c"]
@pytest.mark.asyncio
async def test_status_endpoint(async_client: AsyncClient) -> None:
resp = await async_client.get("/api/status")
assert resp.status_code == 200
assert resp.json()["status"] == "OK"