import logging from fastapi import APIRouter, HTTPException from pydantic import BaseModel, EmailStr from database import engine from repositories.user_repository import UserRepository from services import passkey_service logger = logging.getLogger("uvicorn") passkey_router = APIRouter(prefix="/api/passkey", tags=["passkey"]) class RegisterBeginRequest(BaseModel): email: EmailStr class RegisterBeginResponse(BaseModel): options: dict # type: ignore[type-arg] session_id: str class CeremonyCompleteRequest(BaseModel): session_id: str credential: dict # type: ignore[type-arg] class AuthTokenResponse(BaseModel): token: str class LoginBeginResponse(BaseModel): options: dict # type: ignore[type-arg] session_id: str @passkey_router.post("/register/begin", response_model=RegisterBeginResponse) async def register_begin(body: RegisterBeginRequest) -> RegisterBeginResponse: """Start passkey registration ceremony.""" try: user_repo = UserRepository(engine) options, session_id = passkey_service.begin_registration( body.email, user_repo ) return RegisterBeginResponse(options=options, session_id=session_id) except Exception as e: logger.error(f"Registration begin failed: {e}") raise HTTPException(status_code=400, detail=str(e)) @passkey_router.post("/register/complete", response_model=AuthTokenResponse) async def register_complete(body: CeremonyCompleteRequest) -> AuthTokenResponse: """Complete passkey registration ceremony.""" try: user_repo = UserRepository(engine) token = passkey_service.complete_registration( body.session_id, body.credential, user_repo ) return AuthTokenResponse(token=token) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Registration complete failed: {e}") raise HTTPException(status_code=400, detail=str(e)) @passkey_router.post("/login/begin", response_model=LoginBeginResponse) async def login_begin() -> LoginBeginResponse: """Start passkey authentication ceremony.""" try: user_repo = UserRepository(engine) options, session_id = passkey_service.begin_authentication(user_repo) return LoginBeginResponse(options=options, session_id=session_id) except Exception as e: logger.error(f"Login begin failed: {e}") raise HTTPException(status_code=400, detail=str(e)) @passkey_router.post("/login/complete", response_model=AuthTokenResponse) async def login_complete(body: CeremonyCompleteRequest) -> AuthTokenResponse: """Complete passkey authentication ceremony.""" try: user_repo = UserRepository(engine) token = passkey_service.complete_authentication( body.session_id, body.credential, user_repo ) return AuthTokenResponse(token=token) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error(f"Login complete failed: {e}") raise HTTPException(status_code=400, detail=str(e))