Add security regression tests for all hardening fixes

- New: test_security_headers.py — verify all headers present, HSTS conditional on HTTPS
- New: test_passkey_error_handling.py — generic vs user-facing error messages
- New: test_poi_validation.py — field length and coordinate range constraints
- Extend test_rate_limiter.py — client IP depth selection, in-memory fallback enforcement
- Extend test_models.py — sqm range validation
- Extend test_task_service.py — IDOR 404, ownership 200, traceback suppression in production
This commit is contained in:
Viktor Barzin 2026-02-08 19:42:53 +00:00
parent 727dd537ef
commit 492921424e
No known key found for this signature in database
GPG key ID: 0EB088298288D958
6 changed files with 365 additions and 0 deletions

View file

@ -304,3 +304,84 @@ class TestClearAllTasks:
# Should not raise despite revoke failure
count = clear_all_tasks("test@example.com", revoke=True)
assert count == 1
from fastapi.testclient import TestClient
class TestTaskStatusSecurity:
"""Tests for task status endpoint security (IDOR, traceback suppression)."""
def _get_client(self) -> TestClient:
"""Create test client with mocked auth."""
from api.app import app
from api.auth import get_current_user, User
mock_user = User(sub="test", email="test@example.com", name="Test")
app.dependency_overrides[get_current_user] = lambda: mock_user
client = TestClient(app, raise_server_exceptions=False)
return client
@patch("api.app.task_service")
def test_returns_404_for_unowned_task(self, mock_task_service: MagicMock) -> None:
mock_task_service.get_user_tasks.return_value = ["task-abc"]
client = self._get_client()
try:
resp = client.get("/api/task_status", params={"task_id": "task-xyz"})
assert resp.status_code == 404
finally:
from api.app import app
from api.auth import get_current_user
app.dependency_overrides.pop(get_current_user, None)
@patch("api.app.task_service")
def test_returns_200_for_owned_task(self, mock_task_service: MagicMock) -> None:
mock_status = MagicMock()
mock_status.task_id = "task-abc"
mock_status.status = "SUCCESS"
mock_status.result = None
mock_status.progress = None
mock_status.processed = None
mock_status.total = None
mock_status.message = None
mock_status.error = None
mock_status.traceback = None
mock_task_service.get_user_tasks.return_value = ["task-abc"]
mock_task_service.get_task_status.return_value = mock_status
client = self._get_client()
try:
resp = client.get("/api/task_status", params={"task_id": "task-abc"})
assert resp.status_code == 200
finally:
from api.app import app
from api.auth import get_current_user
app.dependency_overrides.pop(get_current_user, None)
@patch("api.app.APP_ENV", "production")
@patch("api.app.task_service")
def test_traceback_suppressed_in_production(self, mock_task_service: MagicMock) -> None:
mock_status = MagicMock()
mock_status.task_id = "task-abc"
mock_status.status = "FAILURE"
mock_status.result = None
mock_status.progress = None
mock_status.processed = None
mock_status.total = None
mock_status.message = None
mock_status.error = "Internal error"
mock_status.traceback = "Traceback (most recent call last)..."
mock_task_service.get_user_tasks.return_value = ["task-abc"]
mock_task_service.get_task_status.return_value = mock_status
client = self._get_client()
try:
resp = client.get("/api/task_status", params={"task_id": "task-abc"})
assert resp.status_code == 200
data = resp.json()
assert data["traceback"] is None
assert data["error"] is None
finally:
from api.app import app
from api.auth import get_current_user
app.dependency_overrides.pop(get_current_user, None)