trading/services/api_gateway/auth/jwt.py

98 lines
2.2 KiB
Python

"""JWT utilities — token creation and verification."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import jwt
from services.api_gateway.config import ApiGatewayConfig
def create_access_token(
user_id: str,
username: str,
config: ApiGatewayConfig,
) -> str:
"""Create a short-lived JWT access token.
Parameters
----------
user_id:
UUID of the authenticated user (stored as ``sub`` claim).
username:
Username (stored as ``username`` claim).
config:
Gateway configuration with secret key and algorithm.
Returns
-------
str
Encoded JWT string.
"""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"username": username,
"type": "access",
"iat": now,
"exp": now + timedelta(minutes=config.access_token_expire_minutes),
}
return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm)
def create_refresh_token(
user_id: str,
config: ApiGatewayConfig,
) -> str:
"""Create a longer-lived JWT refresh token.
Parameters
----------
user_id:
UUID of the authenticated user (stored as ``sub`` claim).
config:
Gateway configuration with secret key and algorithm.
Returns
-------
str
Encoded JWT string.
"""
now = datetime.now(timezone.utc)
payload = {
"sub": user_id,
"type": "refresh",
"iat": now,
"exp": now + timedelta(days=config.refresh_token_expire_days),
}
return jwt.encode(payload, config.jwt_secret_key, algorithm=config.jwt_algorithm)
def decode_token(token: str, config: ApiGatewayConfig) -> dict:
"""Decode and verify a JWT token.
Parameters
----------
token:
The JWT string to decode.
config:
Gateway configuration with secret key and algorithm.
Returns
-------
dict
The decoded payload.
Raises
------
jwt.ExpiredSignatureError
If the token has expired.
jwt.InvalidTokenError
If the token is malformed or signature verification fails.
"""
return jwt.decode(
token,
config.jwt_secret_key,
algorithms=[config.jwt_algorithm],
)