infra/modules/kubernetes/ebook2audiobook/audiblez-web/backend/api/auth.py
Viktor Barzin bcad200a23 chore: add untracked stacks, scripts, and agent configs
- New stacks: beads-server, hermes-agent
- Terragrunt tiers.tf for infra, phpipam, status-page
- Secrets symlinks for vault, phpipam, hermes-agent
- Scripts: cluster_manager, image_pull, containerd pullthrough setup
- Frigate config, audiblez-web app source, n8n workflows dir
- Claude agent: service-upgrade, reference: upgrade-config.json
- Removed: claudeception skill, excalidraw empty submodule, temp listings

[ci skip]

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 09:33:06 +00:00

104 lines
3 KiB
Python

"""
Authentication module for extracting user identity from Authentik headers.
When nginx ingress is protected with Authentik, these headers are forwarded:
- X-Authentik-Username: The user's username
- X-Authentik-Uid: Unique user ID (used for directory separation)
- X-Authentik-Email: User's email
- X-Authentik-Name: User's display name
- X-Authentik-Groups: Comma-separated group list
"""
from dataclasses import dataclass
from fastapi import Request, HTTPException
from typing import Optional
import re
@dataclass
class User:
"""Represents an authenticated user from Authentik."""
uid: str
username: str
email: Optional[str] = None
name: Optional[str] = None
groups: list[str] = None
def __post_init__(self):
if self.groups is None:
self.groups = []
def sanitize_user_id(uid: str) -> str:
"""
Sanitize user ID for use as a directory name.
Only allows alphanumeric, hyphens, and underscores.
"""
if not uid:
raise ValueError("User ID cannot be empty")
# Only allow safe characters for filesystem
safe_uid = re.sub(r'[^a-zA-Z0-9\-_]', '', uid)
if not safe_uid:
raise ValueError("User ID contains no valid characters")
# Limit length to prevent path issues
if len(safe_uid) > 64:
safe_uid = safe_uid[:64]
return safe_uid
async def get_current_user(request: Request) -> User:
"""
Extract user information from Authentik headers.
This is a FastAPI dependency that should be used on protected endpoints.
Raises 401 if user headers are not present (not authenticated).
"""
# Header names are case-insensitive, but commonly forwarded as:
uid = request.headers.get("X-Authentik-Uid")
username = request.headers.get("X-Authentik-Username")
email = request.headers.get("X-Authentik-Email")
name = request.headers.get("X-Authentik-Name")
groups_str = request.headers.get("X-Authentik-Groups", "")
# For development/testing, check for alternative header names
if not uid:
uid = request.headers.get("X-Authentik-Userid")
if not uid:
uid = request.headers.get("Remote-User")
if not uid or not username:
raise HTTPException(
status_code=401,
detail="Authentication required. Authentik headers not found."
)
try:
safe_uid = sanitize_user_id(uid)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Parse groups (comma-separated)
groups = [g.strip() for g in groups_str.split(",") if g.strip()]
return User(
uid=safe_uid,
username=username,
email=email,
name=name,
groups=groups
)
async def get_optional_user(request: Request) -> Optional[User]:
"""
Extract user information if available, or return None.
Use this for endpoints that work with or without authentication.
"""
try:
return await get_current_user(request)
except HTTPException:
return None