98 lines
2.2 KiB
Python
98 lines
2.2 KiB
Python
"""JWT utilities — token creation and verification."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
import jwt
|
|
|
|
from services.api_gateway.config import ApiGatewayConfig
|
|
|
|
|
|
def create_access_token(
|
|
user_id: str,
|
|
username: str,
|
|
config: ApiGatewayConfig,
|
|
) -> str:
|
|
"""Create a short-lived JWT access token.
|
|
|
|
Parameters
|
|
----------
|
|
user_id:
|
|
UUID of the authenticated user (stored as ``sub`` claim).
|
|
username:
|
|
Username (stored as ``username`` claim).
|
|
config:
|
|
Gateway configuration with secret key and algorithm.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Encoded JWT string.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": user_id,
|
|
"username": username,
|
|
"type": "access",
|
|
"iat": now,
|
|
"exp": now + timedelta(minutes=config.access_token_expire_minutes),
|
|
}
|
|
return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm)
|
|
|
|
|
|
def create_refresh_token(
|
|
user_id: str,
|
|
config: ApiGatewayConfig,
|
|
) -> str:
|
|
"""Create a longer-lived JWT refresh token.
|
|
|
|
Parameters
|
|
----------
|
|
user_id:
|
|
UUID of the authenticated user (stored as ``sub`` claim).
|
|
config:
|
|
Gateway configuration with secret key and algorithm.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
Encoded JWT string.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": user_id,
|
|
"type": "refresh",
|
|
"iat": now,
|
|
"exp": now + timedelta(days=config.refresh_token_expire_days),
|
|
}
|
|
return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm)
|
|
|
|
|
|
def decode_token(token: str, config: ApiGatewayConfig) -> dict:
|
|
"""Decode and verify a JWT token.
|
|
|
|
Parameters
|
|
----------
|
|
token:
|
|
The JWT string to decode.
|
|
config:
|
|
Gateway configuration with secret key and algorithm.
|
|
|
|
Returns
|
|
-------
|
|
dict
|
|
The decoded payload.
|
|
|
|
Raises
|
|
------
|
|
jwt.ExpiredSignatureError
|
|
If the token has expired.
|
|
jwt.InvalidTokenError
|
|
If the token is malformed or signature verification fails.
|
|
"""
|
|
return jwt.decode(
|
|
token,
|
|
config.jwt_secret_key,
|
|
algorithms=[config.jwt_algorithm],
|
|
)
|