wrongmove/api/passkey_routes.py
Viktor Barzin 0a9a83507e
Harden backend security: IDOR fix, error sanitization, rate limiter fallback, security headers
- Fix task status IDOR by adding ownership check; suppress traceback/error in production
- Passkey routes: return generic error messages for internal exceptions, keep ValueError for user-facing
- JWT_SECRET and OIDC_CLIENT_ID: raise RuntimeError in production when using defaults
- Rate limiter: add in-memory fallback counter when Redis is unavailable
- Fix X-Forwarded-For IP spoofing with trusted_proxy_depth (rightmost-N selection)
- Add SecurityHeadersMiddleware (X-Content-Type-Options, X-Frame-Options, CSP, conditional HSTS)
- CORS: add PUT/DELETE methods for POI routes
- POI input validation: field length and coordinate range constraints
- QueryParameters: add min_sqm <= max_sqm validation
2026-02-08 19:42:30 +00:00

97 lines
3.4 KiB
Python

import logging
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, EmailStr
from database import engine
from repositories.user_repository import UserRepository
from services import passkey_service
logger = logging.getLogger("uvicorn")
passkey_router = APIRouter(prefix="/api/passkey", tags=["passkey"])
class RegisterBeginRequest(BaseModel):
email: EmailStr
class RegisterBeginResponse(BaseModel):
options: dict # type: ignore[type-arg]
session_id: str
class CeremonyCompleteRequest(BaseModel):
session_id: str
credential: dict # type: ignore[type-arg]
class AuthTokenResponse(BaseModel):
token: str
class LoginBeginResponse(BaseModel):
options: dict # type: ignore[type-arg]
session_id: str
@passkey_router.post("/register/begin", response_model=RegisterBeginResponse)
async def register_begin(body: RegisterBeginRequest) -> RegisterBeginResponse:
"""Start passkey registration ceremony."""
try:
user_repo = UserRepository(engine)
options, session_id = passkey_service.begin_registration(
body.email, user_repo
)
return RegisterBeginResponse(options=options, session_id=session_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception:
logger.exception("Registration begin failed")
raise HTTPException(status_code=400, detail="Registration failed. Please try again.")
@passkey_router.post("/register/complete", response_model=AuthTokenResponse)
async def register_complete(body: CeremonyCompleteRequest) -> AuthTokenResponse:
"""Complete passkey registration ceremony."""
try:
user_repo = UserRepository(engine)
token = passkey_service.complete_registration(
body.session_id, body.credential, user_repo
)
return AuthTokenResponse(token=token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception:
logger.exception("Registration complete failed")
raise HTTPException(status_code=400, detail="Registration could not be completed.")
@passkey_router.post("/login/begin", response_model=LoginBeginResponse)
async def login_begin() -> LoginBeginResponse:
"""Start passkey authentication ceremony."""
try:
user_repo = UserRepository(engine)
options, session_id = passkey_service.begin_authentication(user_repo)
return LoginBeginResponse(options=options, session_id=session_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception:
logger.exception("Login begin failed")
raise HTTPException(status_code=400, detail="Login initiation failed. Please try again.")
@passkey_router.post("/login/complete", response_model=AuthTokenResponse)
async def login_complete(body: CeremonyCompleteRequest) -> AuthTokenResponse:
"""Complete passkey authentication ceremony."""
try:
user_repo = UserRepository(engine)
token = passkey_service.complete_authentication(
body.session_id, body.credential, user_repo
)
return AuthTokenResponse(token=token)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception:
logger.exception("Login complete failed")
raise HTTPException(status_code=400, detail="Login could not be completed.")