"""JWT utilities — token creation and verification.""" from __future__ import annotations from datetime import datetime, timedelta, timezone import jwt from services.api_gateway.config import ApiGatewayConfig def create_access_token( user_id: str, username: str, config: ApiGatewayConfig, ) -> str: """Create a short-lived JWT access token. Parameters ---------- user_id: UUID of the authenticated user (stored as ``sub`` claim). username: Username (stored as ``username`` claim). config: Gateway configuration with secret key and algorithm. Returns ------- str Encoded JWT string. """ now = datetime.now(timezone.utc) payload = { "sub": user_id, "username": username, "type": "access", "iat": now, "exp": now + timedelta(minutes=config.access_token_expire_minutes), } return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm) def create_refresh_token( user_id: str, config: ApiGatewayConfig, ) -> str: """Create a longer-lived JWT refresh token. Parameters ---------- user_id: UUID of the authenticated user (stored as ``sub`` claim). config: Gateway configuration with secret key and algorithm. Returns ------- str Encoded JWT string. """ now = datetime.now(timezone.utc) payload = { "sub": user_id, "type": "refresh", "iat": now, "exp": now + timedelta(days=config.refresh_token_expire_days), } return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm) def decode_token(token: str, config: ApiGatewayConfig) -> dict: """Decode and verify a JWT token. Parameters ---------- token: The JWT string to decode. config: Gateway configuration with secret key and algorithm. Returns ------- dict The decoded payload. Raises ------ jwt.ExpiredSignatureError If the token has expired. jwt.InvalidTokenError If the token is malformed or signature verification fails. """ return jwt.decode( token, config.jwt_secret_key, algorithms=[config.jwt_algorithm], )