diff --git a/main.tf b/main.tf index 68c3170d..6b5c346e 100644 --- a/main.tf +++ b/main.tf @@ -37,6 +37,7 @@ variable "dbaas_pgadmin_password" {} variable "drone_github_client_id" {} variable "drone_github_client_secret" {} variable "drone_rpc_secret" {} +variable "drone_webhook_secret" {} variable "dockerhub_registry_password" {} variable "oauth2_proxy_client_id" {} variable "oauth2_proxy_client_secret" {} @@ -79,6 +80,7 @@ variable "vaultwarden_smtp_password" {} variable "resume_database_url" {} variable "resume_database_password" {} variable "resume_redis_url" {} +variable "resume_auth_secret" { type = string } variable "frigate_valchedrym_camera_credentials" { default = "" } variable "paperless_db_password" {} variable "diun_nfty_token" {} @@ -139,6 +141,10 @@ variable "freedify_credentials" { type = map(any) } variable "mcaptcha_postgresql_password" { type = string } variable "mcaptcha_cookie_secret" { type = string } variable "mcaptcha_captcha_salt" { type = string } +variable "openrouter_api_key" { type = string } +variable "slack_bot_token" { type = string } +variable "slack_channel" { type = string } +variable "affine_postgresql_password" { type = string } provider "kubernetes" { config_path = var.prod ? "" : "~/.kube/config" @@ -437,6 +443,7 @@ module "kubernetes_cluster" { drone_github_client_id = var.drone_github_client_id drone_github_client_secret = var.drone_github_client_secret drone_rpc_secret = var.drone_rpc_secret + drone_webhook_secret = var.drone_webhook_secret # Oauth proxy oauth2_proxy_client_id = var.oauth2_proxy_client_id @@ -492,6 +499,7 @@ module "kubernetes_cluster" { resume_redis_url = var.resume_redis_url resume_database_password = var.resume_database_password resume_database_url = var.resume_database_url + resume_auth_secret = var.resume_auth_secret frigate_valchedrym_camera_credentials = var.frigate_valchedrym_camera_credentials @@ -570,6 +578,12 @@ module "kubernetes_cluster" { mcaptcha_postgresql_password = var.mcaptcha_postgresql_password mcaptcha_cookie_secret = var.mcaptcha_cookie_secret mcaptcha_captcha_salt = var.mcaptcha_captcha_salt + + openrouter_api_key = var.openrouter_api_key + slack_bot_token = var.slack_bot_token + slack_channel = var.slack_channel + + affine_postgresql_password = var.affine_postgresql_password } diff --git a/modules/kubernetes/affine/main.tf b/modules/kubernetes/affine/main.tf new file mode 100644 index 00000000..b8e8b49c --- /dev/null +++ b/modules/kubernetes/affine/main.tf @@ -0,0 +1,217 @@ +variable "tls_secret_name" {} +variable "tier" { type = string } +variable "postgresql_password" {} +variable "smtp_password" { type = string } + +resource "kubernetes_namespace" "affine" { + metadata { + name = "affine" + } +} + +module "tls_secret" { + source = "../setup_tls_secret" + namespace = kubernetes_namespace.affine.metadata[0].name + tls_secret_name = var.tls_secret_name +} + +locals { + common_env = [ + { + name = "DATABASE_URL" + value = "postgresql://affine:${var.postgresql_password}@postgresql.dbaas.svc.cluster.local:5432/affine" + }, + { + name = "REDIS_SERVER_HOST" + value = "redis.redis.svc.cluster.local" + }, + { + name = "AFFINE_INDEXER_ENABLED" + value = "false" + }, + { + name = "NODE_OPTIONS" + value = "--max-old-space-size=4096" + }, + # Server URL configuration + { + name = "AFFINE_SERVER_EXTERNAL_URL" + value = "https://affine.viktorbarzin.me" + }, + { + name = "AFFINE_SERVER_HTTPS" + value = "true" + }, + # Email/SMTP configuration + { + name = "MAILER_HOST" + value = "mailserver.viktorbarzin.me" + }, + { + name = "MAILER_PORT" + value = "587" + }, + { + name = "MAILER_USER" + value = "info@viktorbarzin.me" + }, + { + name = "MAILER_PASSWORD" + value = var.smtp_password + }, + { + name = "MAILER_SENDER" + value = "AFFiNE " + }, + ] +} + +resource "kubernetes_deployment" "affine" { + metadata { + name = "affine" + namespace = kubernetes_namespace.affine.metadata[0].name + labels = { + app = "affine" + tier = var.tier + } + } + spec { + replicas = 1 + selector { + match_labels = { + app = "affine" + } + } + template { + metadata { + labels = { + app = "affine" + } + } + spec { + # Init container to run database migrations + init_container { + name = "migration" + image = "ghcr.io/toeverything/affine:stable" + command = ["sh", "-c", "node ./scripts/self-host-predeploy.js"] + + dynamic "env" { + for_each = local.common_env + content { + name = env.value.name + value = env.value.value + } + } + + volume_mount { + name = "data" + mount_path = "/root/.affine/storage" + sub_path = "storage" + } + volume_mount { + name = "data" + mount_path = "/root/.affine/config" + sub_path = "config" + } + } + + container { + name = "affine" + image = "ghcr.io/toeverything/affine:stable" + + port { + container_port = 3010 + } + + dynamic "env" { + for_each = local.common_env + content { + name = env.value.name + value = env.value.value + } + } + + volume_mount { + name = "data" + mount_path = "/root/.affine/storage" + sub_path = "storage" + } + volume_mount { + name = "data" + mount_path = "/root/.affine/config" + sub_path = "config" + } + + resources { + requests = { + memory = "512Mi" + cpu = "100m" + } + limits = { + memory = "4Gi" + cpu = "2" + } + } + + liveness_probe { + http_get { + path = "/info" + port = 3010 + } + initial_delay_seconds = 120 + period_seconds = 30 + timeout_seconds = 10 + } + readiness_probe { + http_get { + path = "/info" + port = 3010 + } + initial_delay_seconds = 60 + period_seconds = 10 + timeout_seconds = 5 + } + } + volume { + name = "data" + nfs { + server = "10.0.10.15" + path = "/mnt/main/affine" + } + } + } + } + } +} + +resource "kubernetes_service" "affine" { + metadata { + name = "affine" + namespace = kubernetes_namespace.affine.metadata[0].name + labels = { + app = "affine" + } + } + + spec { + selector = { + app = "affine" + } + port { + name = "http" + port = 80 + target_port = 3010 + } + } +} + +module "ingress" { + source = "../ingress_factory" + namespace = kubernetes_namespace.affine.metadata[0].name + name = "affine" + tls_secret_name = var.tls_secret_name + max_body_size = "500m" + extra_annotations = { + "nginx.ingress.kubernetes.io/proxy-body-size" : "500m" + } +} diff --git a/modules/kubernetes/crowdsec/main.tf b/modules/kubernetes/crowdsec/main.tf index ae859c53..e914beb7 100644 --- a/modules/kubernetes/crowdsec/main.tf +++ b/modules/kubernetes/crowdsec/main.tf @@ -202,3 +202,144 @@ module "ingress" { rybbit_site_id = "d09137795ccc" } +# CronJob to import public blocklists into CrowdSec +# https://github.com/wolffcatskyy/crowdsec-blocklist-import +# Uses kubectl exec to run in an existing CrowdSec agent pod that's already registered +resource "kubernetes_cron_job_v1" "crowdsec_blocklist_import" { + metadata { + name = "crowdsec-blocklist-import" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + labels = { + app = "crowdsec-blocklist-import" + tier = var.tier + } + } + + spec { + # Run daily at 4 AM + schedule = "0 4 * * *" + timezone = "Europe/London" + concurrency_policy = "Forbid" + successful_jobs_history_limit = 3 + failed_jobs_history_limit = 3 + + job_template { + metadata { + labels = { + app = "crowdsec-blocklist-import" + } + } + + spec { + backoff_limit = 3 + template { + metadata { + labels = { + app = "crowdsec-blocklist-import" + } + } + + spec { + service_account_name = kubernetes_service_account.blocklist_import.metadata[0].name + restart_policy = "OnFailure" + + container { + name = "blocklist-import" + image = "bitnami/kubectl:latest" + + command = ["/bin/bash", "-c"] + args = [ + <<-EOF + set -e + + echo "Finding CrowdSec agent pod..." + AGENT_POD=$(kubectl get pods -n crowdsec -l k8s-app=crowdsec,type=agent -o jsonpath='{.items[0].metadata.name}') + + if [ -z "$AGENT_POD" ]; then + echo "ERROR: Could not find CrowdSec agent pod" + exit 1 + fi + + echo "Using agent pod: $AGENT_POD" + + # Download the import script + echo "Downloading blocklist import script..." + curl -fsSL -o /tmp/import.sh \ + https://raw.githubusercontent.com/wolffcatskyy/crowdsec-blocklist-import/main/import.sh + chmod +x /tmp/import.sh + + # Copy script to agent pod and execute + echo "Copying script to agent pod and executing..." + kubectl cp /tmp/import.sh crowdsec/$AGENT_POD:/tmp/import.sh + + kubectl exec -n crowdsec "$AGENT_POD" -- /bin/bash -c ' + set -e + + # Run with native mode since we are inside the CrowdSec container + export MODE=native + export DECISION_DURATION=24h + export FETCH_TIMEOUT=60 + export LOG_LEVEL=INFO + + /tmp/import.sh + + # Cleanup + rm -f /tmp/import.sh + ' + + echo "Blocklist import completed successfully!" + EOF + ] + } + } + } + } + } + } +} + +# Service account for the blocklist import job (needs kubectl exec permissions) +resource "kubernetes_service_account" "blocklist_import" { + metadata { + name = "crowdsec-blocklist-import" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + } +} + +resource "kubernetes_role" "blocklist_import" { + metadata { + name = "crowdsec-blocklist-import" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + } + + rule { + api_groups = [""] + resources = ["pods"] + verbs = ["get", "list"] + } + rule { + api_groups = [""] + resources = ["pods/exec"] + verbs = ["create"] + } +} + +resource "kubernetes_role_binding" "blocklist_import" { + metadata { + name = "crowdsec-blocklist-import" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + } + + role_ref { + api_group = "rbac.authorization.k8s.io" + kind = "Role" + name = kubernetes_role.blocklist_import.metadata[0].name + } + + subject { + kind = "ServiceAccount" + name = kubernetes_service_account.blocklist_import.metadata[0].name + namespace = kubernetes_namespace.crowdsec.metadata[0].name + } +} + diff --git a/modules/kubernetes/drone/main.tf b/modules/kubernetes/drone/main.tf index eb730be6..4d778e3d 100644 --- a/modules/kubernetes/drone/main.tf +++ b/modules/kubernetes/drone/main.tf @@ -3,6 +3,7 @@ variable "tier" { type = string } variable "github_client_id" {} variable "github_client_secret" {} variable "rpc_secret" {} +variable "webhook_secret" {} variable "server_host" {} variable "server_proto" {} variable "rpc_host" { @@ -98,6 +99,10 @@ resource "kubernetes_deployment" "drone_server" { name = "DRONE_RPC_SECRET" value = var.rpc_secret } + env { + name = "DRONE_WEBHOOK_SECRET" + value = var.webhook_secret + } env { name = "DRONE_SERVER_HOST" value = var.server_host diff --git a/modules/kubernetes/immich/main.tf b/modules/kubernetes/immich/main.tf index a3c2ecf3..1a8988e9 100644 --- a/modules/kubernetes/immich/main.tf +++ b/modules/kubernetes/immich/main.tf @@ -5,7 +5,7 @@ variable "homepage_token" {} variable "immich_version" { type = string # Change me to upgrade - default = "v2.4.1" + default = "v2.5.0" } diff --git a/modules/kubernetes/main.tf b/modules/kubernetes/main.tf index 67f989a3..9bec6813 100644 --- a/modules/kubernetes/main.tf +++ b/modules/kubernetes/main.tf @@ -22,6 +22,7 @@ variable "dbaas_pgadmin_password" {} variable "drone_github_client_id" {} variable "drone_github_client_secret" {} variable "drone_rpc_secret" {} +variable "drone_webhook_secret" {} variable "oauth2_proxy_client_id" {} variable "oauth2_proxy_client_secret" {} variable "oauth2_proxy_authenticated_emails" {} @@ -63,6 +64,7 @@ variable "vaultwarden_smtp_password" {} variable "resume_database_url" {} variable "resume_database_password" {} variable "resume_redis_url" {} +variable "resume_auth_secret" { type = string } variable "frigate_valchedrym_camera_credentials" { default = "" } variable "paperless_db_password" {} variable "diun_nfty_token" {} @@ -118,6 +120,10 @@ variable "freedify_credentials" { type = map(any) } variable "mcaptcha_postgresql_password" { type = string } variable "mcaptcha_cookie_secret" { type = string } variable "mcaptcha_captcha_salt" { type = string } +variable "openrouter_api_key" { type = string } +variable "slack_bot_token" { type = string } +variable "slack_channel" { type = string } +variable "affine_postgresql_password" { type = string } variable "defcon_level" { @@ -143,7 +149,7 @@ locals { "url", "excalidraw", "travel_blog", "dashy", "send", "ytdlp", "wealthfolio", "rybbit", "stirling-pdf", "networking-toolbox", "navidrome", "freshrss", "forgejo", "tor-proxy", "real-estate-crawler", "n8n", "changedetection", "linkwarden", "matrix", "homepage", "meshcentral", "diun", "cyberchef", "ntfy", "ollama", - "servarr", "jsoncrack", "paperless-ngx", "frigate", "audiobookshelf", "tandoor", "ebook2audiobook", "netbox", "speedtest", "resume", "freedify", "mcaptcha" + "servarr", "jsoncrack", "paperless-ngx", "frigate", "audiobookshelf", "tandoor", "ebook2audiobook", "netbox", "speedtest", "resume", "freedify", "mcaptcha", "affine" ], } active_modules = distinct(flatten([ @@ -215,6 +221,7 @@ module "drone" { github_client_id = var.drone_github_client_id github_client_secret = var.drone_github_client_secret rpc_secret = var.drone_rpc_secret + webhook_secret = var.drone_webhook_secret server_host = "drone.viktorbarzin.me" server_proto = "https" tier = local.tiers.edge @@ -539,10 +546,13 @@ module "redis" { } module "ytdlp" { - source = "./youtube_dl" - for_each = contains(local.active_modules, "ytdlp") ? { ytdlp = true } : {} - tls_secret_name = var.tls_secret_name - tier = local.tiers.aux + source = "./youtube_dl" + for_each = contains(local.active_modules, "ytdlp") ? { ytdlp = true } : {} + tls_secret_name = var.tls_secret_name + tier = local.tiers.aux + openrouter_api_key = var.openrouter_api_key + slack_bot_token = var.slack_bot_token + slack_channel = var.slack_channel depends_on = [null_resource.core_services] } @@ -583,16 +593,15 @@ module "crowdsec" { crowdsec_dash_machine_password = var.crowdsec_dash_machine_password } -# Seems like it needs S3 even if pg is local... -# module "resume" { -# source = "./resume" -# tier = local.tiers.aux -# for_each = contains(local.active_modules, "resume") ? { resume = true } : {} -# tls_secret_name = var.tls_secret_name -# redis_url = var.resume_redis_url -# database_url = var.resume_database_url -# db_password = var.resume_database_password -# } +module "resume" { + source = "./resume" + for_each = contains(local.active_modules, "resume") ? { resume = true } : {} + tls_secret_name = var.tls_secret_name + tier = local.tiers.aux + database_url = var.resume_database_url + auth_secret = var.resume_auth_secret + smtp_password = var.mailserver_accounts["info@viktorbarzin.me"] +} module "uptime-kuma" { source = "./uptime-kuma" @@ -1062,3 +1071,14 @@ module "freedify" { for_each = contains(local.active_modules, "freedify") ? { freedify = true } : {} additional_credentials = var.freedify_credentials } + +module "affine" { + source = "./affine" + for_each = contains(local.active_modules, "affine") ? { affine = true } : {} + tls_secret_name = var.tls_secret_name + postgresql_password = var.affine_postgresql_password + smtp_password = var.mailserver_accounts["info@viktorbarzin.me"] + tier = local.tiers.aux + + depends_on = [null_resource.core_services] +} diff --git a/modules/kubernetes/monitoring/prometheus_chart_values.tpl b/modules/kubernetes/monitoring/prometheus_chart_values.tpl index 33374dc0..c65e9358 100755 --- a/modules/kubernetes/monitoring/prometheus_chart_values.tpl +++ b/modules/kubernetes/monitoring/prometheus_chart_values.tpl @@ -623,4 +623,9 @@ extraScrapeConfigs: | action: replace regex: '(.*)' replacement: 'nvidia_tesla_t4_$${1}' + - job_name: 'gpu-pod-memory' + static_configs: + - targets: + - "gpu-pod-exporter.nvidia.svc.cluster.local" + metrics_path: '/metrics' diff --git a/modules/kubernetes/nvidia/main.tf b/modules/kubernetes/nvidia/main.tf index 70f294cb..dc05a790 100644 --- a/modules/kubernetes/nvidia/main.tf +++ b/modules/kubernetes/nvidia/main.tf @@ -17,6 +17,18 @@ resource "kubernetes_namespace" "nvidia" { } } +# Apply GPU taint to ensure only GPU workloads run on GPU node +resource "null_resource" "gpu_node_taint" { + provisioner "local-exec" { + command = "kubectl taint nodes k8s-node1 nvidia.com/gpu=true:NoSchedule --overwrite" + } + + # Re-run if namespace changes (proxy for cluster changes) + triggers = { + namespace = kubernetes_namespace.nvidia.metadata[0].name + } +} + # [not needed anymore; part of the chart values] Apply to operator with: # kubectl patch clusterpolicies.nvidia.com/cluster-policy -n gpu-operator --type merge -p '{"spec": {"devicePlugin": {"config": {"name": "time-slicing-config", "default": "any"}}}}' @@ -36,7 +48,7 @@ resource "kubernetes_config_map" "time_slicing_config" { failRequestsGreaterThanOne: false resources: - name: nvidia.com/gpu - replicas: 10 + replicas: 20 EOF } depends_on = [kubernetes_namespace.nvidia] @@ -82,6 +94,12 @@ resource "kubernetes_deployment" "nvidia-exporter" { node_selector = { "gpu" : "true" } + toleration { + key = "nvidia.com/gpu" + operator = "Equal" + value = "true" + effect = "NoSchedule" + } container { image = "nvidia/dcgm-exporter:latest" name = "nvidia-exporter" @@ -219,3 +237,274 @@ module "ingress" { # } # depends_on = [helm_release.nvidia-gpu-operator] # } + +# GPU Pod Memory Exporter - exposes per-pod GPU memory usage as Prometheus metrics +resource "kubernetes_config_map" "gpu_pod_exporter_script" { + metadata { + name = "gpu-pod-exporter-script" + namespace = kubernetes_namespace.nvidia.metadata[0].name + } + + data = { + "exporter.py" = <<-EOF +#!/usr/bin/env python3 +"""GPU Pod Memory Exporter - Collects per-pod GPU memory usage.""" + +import subprocess +import time +import re +import os +from http.server import HTTPServer, BaseHTTPRequestHandler + +METRICS_PORT = 9401 +SCRAPE_INTERVAL = 15 + +def get_gpu_processes(): + """Run nvidia-smi to get GPU process info.""" + try: + result = subprocess.run( + ["nvidia-smi", "--query-compute-apps=pid,used_memory,process_name", "--format=csv,noheader,nounits"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + print(f"nvidia-smi error: {result.stderr}") + return [] + + processes = [] + for line in result.stdout.strip().split('\n'): + if not line.strip(): + continue + parts = [p.strip() for p in line.split(',')] + if len(parts) >= 3: + pid, memory_mib, process_name = parts[0], parts[1], parts[2] + processes.append({ + 'pid': pid, + 'memory_bytes': int(memory_mib) * 1024 * 1024, + 'process_name': process_name + }) + return processes + except Exception as e: + print(f"Error running nvidia-smi: {e}") + return [] + +def get_container_id(pid): + """Map PID to container ID via cgroup.""" + cgroup_path = f"/host_proc/{pid}/cgroup" + try: + with open(cgroup_path, 'r') as f: + for line in f: + # Match container ID patterns (docker, containerd, cri-o) + # e.g., /kubepods/pod.../containerid or /docker/containerid + match = re.search(r'[:/]([a-f0-9]{64})', line) + if match: + return match.group(1)[:12] # Return short container ID + # Also check for cri-containerd pattern + match = re.search(r'cri-containerd-([a-f0-9]{64})', line) + if match: + return match.group(1)[:12] + except (FileNotFoundError, PermissionError): + pass + return "host" + +# Global metrics storage +current_metrics = [] + +def collect_metrics(): + """Collect GPU memory metrics.""" + global current_metrics + metrics = [] + processes = get_gpu_processes() + + for proc in processes: + container_id = get_container_id(proc['pid']) + metrics.append({ + 'container_id': container_id, + 'pid': proc['pid'], + 'process_name': proc['process_name'], + 'memory_bytes': proc['memory_bytes'] + }) + + current_metrics = metrics + +def format_metrics(): + """Format metrics in Prometheus exposition format.""" + lines = [ + "# HELP gpu_pod_memory_used_bytes GPU memory used by container", + "# TYPE gpu_pod_memory_used_bytes gauge" + ] + + for m in current_metrics: + labels = f'container_id="{m["container_id"]}",pid="{m["pid"]}",process_name="{m["process_name"]}"' + lines.append(f"gpu_pod_memory_used_bytes{{{labels}}} {m['memory_bytes']}") + + return '\n'.join(lines) + '\n' + +class MetricsHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == '/metrics': + content = format_metrics() + self.send_response(200) + self.send_header('Content-Type', 'text/plain; charset=utf-8') + self.end_headers() + self.wfile.write(content.encode()) + elif self.path == '/health': + self.send_response(200) + self.end_headers() + self.wfile.write(b'ok') + else: + self.send_response(404) + self.end_headers() + + def log_message(self, format, *args): + pass # Suppress request logging + +def background_collector(): + """Background thread to collect metrics periodically.""" + import threading + def run(): + while True: + collect_metrics() + time.sleep(SCRAPE_INTERVAL) + thread = threading.Thread(target=run, daemon=True) + thread.start() + +if __name__ == '__main__': + print(f"Starting GPU Pod Memory Exporter on port {METRICS_PORT}") + collect_metrics() # Initial collection + background_collector() + + server = HTTPServer(('', METRICS_PORT), MetricsHandler) + server.serve_forever() +EOF + } +} + +resource "kubernetes_daemonset" "gpu_pod_exporter" { + metadata { + name = "gpu-pod-exporter" + namespace = kubernetes_namespace.nvidia.metadata[0].name + labels = { + app = "gpu-pod-exporter" + tier = var.tier + } + } + + spec { + selector { + match_labels = { + app = "gpu-pod-exporter" + } + } + + template { + metadata { + labels = { + app = "gpu-pod-exporter" + } + } + + spec { + host_pid = true + + node_selector = { + "gpu" : "true" + } + + toleration { + key = "nvidia.com/gpu" + operator = "Equal" + value = "true" + effect = "NoSchedule" + } + + container { + name = "exporter" + image = "python:3.11-slim" + + command = ["/bin/bash", "-c"] + args = [ + "python3 /scripts/exporter.py" + ] + + port { + container_port = 9401 + name = "metrics" + } + + volume_mount { + name = "scripts" + mount_path = "/scripts" + read_only = true + } + + volume_mount { + name = "host-proc" + mount_path = "/host_proc" + read_only = true + } + + resources { + requests = { + cpu = "50m" + memory = "128Mi" + } + limits = { + cpu = "200m" + memory = "256Mi" + "nvidia.com/gpu" = "1" + } + } + + liveness_probe { + http_get { + path = "/health" + port = 9401 + } + initial_delay_seconds = 30 + period_seconds = 30 + } + } + + volume { + name = "scripts" + config_map { + name = kubernetes_config_map.gpu_pod_exporter_script.metadata[0].name + default_mode = "0755" + } + } + + volume { + name = "host-proc" + host_path { + path = "/proc" + type = "Directory" + } + } + } + } + } + + depends_on = [helm_release.nvidia-gpu-operator] +} + +resource "kubernetes_service" "gpu_pod_exporter" { + metadata { + name = "gpu-pod-exporter" + namespace = kubernetes_namespace.nvidia.metadata[0].name + labels = { + app = "gpu-pod-exporter" + } + } + + spec { + selector = { + app = "gpu-pod-exporter" + } + + port { + name = "metrics" + port = 80 + target_port = 9401 + } + } +} diff --git a/modules/kubernetes/resume/main.tf b/modules/kubernetes/resume/main.tf index cf3fd0ef..4320ccb4 100644 --- a/modules/kubernetes/resume/main.tf +++ b/modules/kubernetes/resume/main.tf @@ -1,8 +1,19 @@ -variable "tls_secret_name" { type = string } +variable "tls_secret_name" {} variable "tier" { type = string } variable "database_url" { type = string } -variable "redis_url" { type = string } -variable "db_password" { type = string } +variable "auth_secret" { type = string } +variable "smtp_password" { type = string } + +locals { + namespace = "resume" + app_url = "https://resume.viktorbarzin.me" +} + +resource "kubernetes_namespace" "resume" { + metadata { + name = local.namespace + } +} module "tls_secret" { source = "../setup_tls_secret" @@ -10,17 +21,103 @@ module "tls_secret" { tls_secret_name = var.tls_secret_name } -resource "kubernetes_namespace" "resume" { +# Printer service (browserless chromium for PDF generation) +resource "kubernetes_deployment" "printer" { metadata { - name = "resume" + name = "printer" + namespace = kubernetes_namespace.resume.metadata[0].name + labels = { + app = "printer" + tier = var.tier + } + } + spec { + replicas = 1 + selector { + match_labels = { + app = "printer" + } + } + template { + metadata { + labels = { + app = "printer" + } + } + spec { + container { + name = "printer" + image = "ghcr.io/browserless/chromium:latest" + + port { + container_port = 3000 + } + + env { + name = "HEALTH" + value = "true" + } + env { + name = "CONCURRENT" + value = "20" + } + env { + name = "QUEUED" + value = "10" + } + + resources { + requests = { + memory = "256Mi" + cpu = "100m" + } + limits = { + memory = "2Gi" + cpu = "2" + } + } + + liveness_probe { + http_get { + path = "/pressure" + port = 3000 + } + initial_delay_seconds = 30 + period_seconds = 10 + timeout_seconds = 5 + } + readiness_probe { + http_get { + path = "/pressure" + port = 3000 + } + initial_delay_seconds = 10 + period_seconds = 10 + timeout_seconds = 5 + } + } + } + } } } -resource "random_string" "random" { - length = 32 - lower = true +resource "kubernetes_service" "printer" { + metadata { + name = "printer" + namespace = kubernetes_namespace.resume.metadata[0].name + } + spec { + selector = { + app = "printer" + } + port { + port = 3000 + target_port = 3000 + } + } } +# Reactive Resume app resource "kubernetes_deployment" "resume" { metadata { name = "resume" @@ -29,9 +126,6 @@ resource "kubernetes_deployment" "resume" { app = "resume" tier = var.tier } - annotations = { - "reloader.stakater.com/search" = "true" - } } spec { replicas = 1 @@ -48,69 +142,113 @@ resource "kubernetes_deployment" "resume" { } spec { container { - image = "amruthpillai/reactive-resume:server-latest" name = "resume" + image = "amruthpillai/reactive-resume:v5.0.3" + + port { + container_port = 3000 + } + + # Required env vars + env { + name = "APP_URL" + value = local.app_url + } env { name = "DATABASE_URL" value = var.database_url } env { - name = "REDIS_URL" - value = var.redis_url + name = "PRINTER_ENDPOINT" + value = "ws://printer.${local.namespace}.svc.cluster.local:3000" } env { - name = "PUBLIC_URL" - value = "https://resume.viktorbarzin.me" - } - env { - name = "PUBLIC_SERVER_URL" - value = "https://resume.viktorbarzin.me" - } - - env { - name = "POSTGRES_HOST" - value = "postgresql.dbaas.svc.cluster.local" - } - env { - name = "POSTGRES_DB" - value = "resume" - } - env { - name = "POSTGRES_USER" - value = "resume" - } - env { - name = "POSTGRES_PASSWORD" - value = var.db_password - } - env { - name = "JWT_SECRET" - value = random_string.random.result + name = "PRINTER_APP_URL" + value = "http://resume.${local.namespace}.svc.cluster.local" } env { name = "AUTH_SECRET" - value = random_string.random.result + value = var.auth_secret } - env { - name = "SECRET_KEY" - value = random_string.random.result - } - env { - name = "JWT_EXPIRY_TIME" - value = 604800 - } - env { - name = "STORAGE_ENDPOINT" - value = "https://resume.viktorbarzin.me" - } - // There's a tone of these... I give up... - // check https://github.com/AmruthPillai/Reactive-Resume/blob/main/.env.example - port { - container_port = 3000 + # Server config + env { + name = "TZ" + value = "Etc/UTC" } - port { - container_port = 3100 + + # SMTP config for password reset emails + env { + name = "SMTP_HOST" + value = "mail.viktorbarzin.me" + } + env { + name = "SMTP_PORT" + value = "587" + } + env { + name = "SMTP_USER" + value = "info@viktorbarzin.me" + } + env { + name = "SMTP_PASS" + value = var.smtp_password + } + env { + name = "SMTP_FROM" + value = "Reactive Resume " + } + env { + name = "SMTP_SECURE" + value = "false" + } + + # Feature flags + env { + name = "FLAG_DISABLE_SIGNUPS" + value = "false" # toggle me + } + + volume_mount { + name = "data" + mount_path = "/app/data" + } + + resources { + requests = { + memory = "256Mi" + cpu = "100m" + } + limits = { + memory = "1Gi" + cpu = "1" + } + } + + liveness_probe { + http_get { + path = "/api/health" + port = 3000 + } + initial_delay_seconds = 60 + period_seconds = 30 + timeout_seconds = 10 + } + readiness_probe { + http_get { + path = "/api/health" + port = 3000 + } + initial_delay_seconds = 30 + period_seconds = 10 + timeout_seconds = 5 + } + } + volume { + name = "data" + nfs { + server = "10.0.10.15" + path = "/mnt/main/resume" } } } @@ -118,22 +256,16 @@ resource "kubernetes_deployment" "resume" { } } - resource "kubernetes_service" "resume" { metadata { name = "resume" namespace = kubernetes_namespace.resume.metadata[0].name - labels = { - "app" = "resume" - } } - spec { selector = { app = "resume" } port { - name = "http" port = 80 target_port = 3000 } diff --git a/modules/kubernetes/youtube_dl/main.tf b/modules/kubernetes/youtube_dl/main.tf index 71523195..1721421c 100644 --- a/modules/kubernetes/youtube_dl/main.tf +++ b/modules/kubernetes/youtube_dl/main.tf @@ -1,5 +1,8 @@ variable "tls_secret_name" {} variable "tier" { type = string } +variable "openrouter_api_key" { type = string } +variable "slack_bot_token" { type = string } +variable "slack_channel" { type = string } resource "kubernetes_namespace" "ytdlp" { metadata { @@ -128,3 +131,198 @@ module "ingress" { "nginx.ingress.kubernetes.io/proxy-body-size" : "0", } } + +# ---------------------- +# yt-highlights service +# ---------------------- + +resource "kubernetes_secret" "openrouter" { + metadata { + name = "openrouter-credentials" + namespace = kubernetes_namespace.ytdlp.metadata[0].name + } + data = { + "api-key" = var.openrouter_api_key + } +} + +resource "kubernetes_secret" "slack" { + metadata { + name = "slack-credentials" + namespace = kubernetes_namespace.ytdlp.metadata[0].name + } + data = { + "bot-token" = var.slack_bot_token + "channel" = var.slack_channel + } +} + +resource "kubernetes_deployment" "yt_highlights" { + metadata { + name = "yt-highlights" + namespace = kubernetes_namespace.ytdlp.metadata[0].name + labels = { + app = "yt-highlights" + tier = var.tier + } + annotations = { + "diun.enable" = "true" + } + } + spec { + replicas = 1 + strategy { + type = "Recreate" + } + selector { + match_labels = { + app = "yt-highlights" + } + } + template { + metadata { + labels = { + app = "yt-highlights" + } + } + spec { + node_selector = { + "gpu" : "true" + } + container { + name = "yt-highlights" + image = "viktorbarzin/yt-highlights:v20-20260127" + image_pull_policy = "Always" + port { + container_port = 8000 + } + env { + name = "ASR_MODEL" + value = "large-v3" + } + env { + name = "ASR_DEVICE" + value = "cuda" + } + env { + name = "OPENROUTER_MODEL" + value = "deepseek/deepseek-r1-0528:free" + } + env { + name = "OPENROUTER_API_KEY" + value_from { + secret_key_ref { + name = kubernetes_secret.openrouter.metadata[0].name + key = "api-key" + } + } + } + env { + name = "DATA_PATH" + value = "/data" + } + env { + name = "SLACK_BOT_TOKEN" + value_from { + secret_key_ref { + name = kubernetes_secret.slack.metadata[0].name + key = "bot-token" + } + } + } + env { + name = "SLACK_CHANNEL" + value_from { + secret_key_ref { + name = kubernetes_secret.slack.metadata[0].name + key = "channel" + } + } + } + env { + name = "REDIS_URL" + value = "redis://redis.redis.svc.cluster.local:6379/0" + } + # Store model cache on NFS to avoid ephemeral storage eviction + env { + name = "HF_HOME" + value = "/data/cache/huggingface" + } + env { + name = "TORCH_HOME" + value = "/data/cache/torch" + } + # Ollama fallback for when OpenRouter models fail + env { + name = "OLLAMA_URL" + value = "http://ollama.ollama.svc.cluster.local:11434" + } + env { + name = "OLLAMA_MODEL" + value = "qwen2.5:14b" + } + volume_mount { + name = "data" + mount_path = "/data" + } + resources { + limits = { + "nvidia.com/gpu" = "1" + } + } + liveness_probe { + http_get { + path = "/health" + port = 8000 + } + initial_delay_seconds = 180 + period_seconds = 60 + timeout_seconds = 60 + failure_threshold = 10 + } + } + volume { + name = "data" + nfs { + server = "10.0.10.15" + path = "/mnt/main/ytdlp-highlights" + } + } + } + } + } +} + +resource "kubernetes_service" "yt_highlights" { + metadata { + name = "yt-highlights" + namespace = kubernetes_namespace.ytdlp.metadata[0].name + labels = { + "app" = "yt-highlights" + } + } + spec { + selector = { + app = "yt-highlights" + } + port { + name = "http" + port = 80 + target_port = 8000 + protocol = "TCP" + } + } +} + +module "highlights_ingress" { + source = "../ingress_factory" + namespace = kubernetes_namespace.ytdlp.metadata[0].name + name = "yt-highlights" + tls_secret_name = var.tls_secret_name + host = "yt-highlights" + protected = true + extra_annotations = { + "nginx.ingress.kubernetes.io/proxy-read-timeout" : "300" + "nginx.ingress.kubernetes.io/proxy-send-timeout" : "300" + } +} diff --git a/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile b/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile new file mode 100644 index 00000000..95e1b56f --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/Dockerfile @@ -0,0 +1,38 @@ +FROM nvidia/cuda:12.6.3-cudnn-runtime-ubuntu22.04 + +ENV DEBIAN_FRONTEND=noninteractive +ENV PYTHONUNBUFFERED=1 +ENV PATH="/root/.local/bin:$PATH" + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + python3 \ + python3-pip \ + python3-venv \ + ffmpeg \ + git \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Create app directory +WORKDIR /app + +# Install Python dependencies +COPY requirements.txt . +RUN pip3 install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Create data directories +RUN mkdir -p /data/audio /data/transcripts /data/highlights /data/config /data/state + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run the application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py b/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py new file mode 100644 index 00000000..c34bcade --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/__init__.py @@ -0,0 +1 @@ +# YouTube Highlights Extraction Service diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/main.py b/modules/kubernetes/youtube_dl/yt-highlights/app/main.py new file mode 100644 index 00000000..57526978 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/main.py @@ -0,0 +1,1359 @@ +""" +YouTube Highlights Extraction Service + +Downloads YouTube videos, transcribes them using Faster-Whisper, +and extracts highlights using OpenRouter LLM. +""" +import os +import json +import uuid +import asyncio +import logging +import threading +import queue +from datetime import datetime +from pathlib import Path +from typing import Optional +from contextlib import asynccontextmanager + +import feedparser +import httpx +import redis +import yt_dlp +from faster_whisper import WhisperModel +from fastapi import FastAPI, HTTPException +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse +from pydantic import BaseModel + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration from environment +DATA_PATH = Path(os.getenv("DATA_PATH", "/data")) +ASR_MODEL = os.getenv("ASR_MODEL", "large-v3") +ASR_DEVICE = os.getenv("ASR_DEVICE", "cuda") +OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") +OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-r1-0528:free") +OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" + +# Ollama fallback configuration (used as last resort if set) +OLLAMA_URL = os.getenv("OLLAMA_URL", "") # e.g., "http://ollama.ollama.svc.cluster.local:11434" +OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "qwen2.5:3b") # Small but capable model + +# Dynamic model pool - fetched from OpenRouter API +_cached_models: list[str] = [] +_models_fetched_at: float = 0 +MODEL_CACHE_TTL = 3600 # Refresh model list every hour + + +def _fetch_free_models() -> list[str]: + """Fetch list of free models from OpenRouter API. + + Returns models sorted by preference (primary env model first if available). + """ + import requests + import time + + global _cached_models, _models_fetched_at + + # Return cached list if still valid + if _cached_models and (time.time() - _models_fetched_at) < MODEL_CACHE_TTL: + return _cached_models + + try: + logger.info("Fetching available models from OpenRouter API...") + response = requests.get( + "https://openrouter.ai/api/v1/models", + headers={"Authorization": f"Bearer {OPENROUTER_API_KEY}"}, + timeout=30.0 + ) + + if response.status_code != 200: + logger.warning(f"Failed to fetch models: {response.status_code}") + # Return fallback list if API fails + return _get_fallback_models() + + data = response.json() + models = data.get("data", []) + + # Filter for free models (pricing is 0 or model ID ends with :free) + free_models = [] + for model in models: + model_id = model.get("id", "") + pricing = model.get("pricing", {}) + + # Check if model is free (prompt and completion are 0 or "0") + prompt_price = pricing.get("prompt", "1") + completion_price = pricing.get("completion", "1") + + is_free = ( + str(prompt_price) == "0" and str(completion_price) == "0" + ) or model_id.endswith(":free") + + if is_free: + free_models.append(model_id) + + logger.info(f"Found {len(free_models)} free models from OpenRouter") + + # Sort models - put preferred/primary model first if in list + sorted_models = [] + if OPENROUTER_MODEL in free_models: + sorted_models.append(OPENROUTER_MODEL) + free_models.remove(OPENROUTER_MODEL) + + # Add remaining models (could add more sophisticated ordering here) + sorted_models.extend(free_models) + + # Cache the result + _cached_models = sorted_models + _models_fetched_at = time.time() + + return sorted_models + + except Exception as e: + logger.warning(f"Error fetching models from OpenRouter: {e}") + return _get_fallback_models() + + +def _get_fallback_models() -> list[str]: + """Fallback model list if API fetch fails - only models known to work.""" + return [ + OPENROUTER_MODEL, + "deepseek/deepseek-r1-0528:free", + "google/gemini-2.0-flash-exp:free", + "meta-llama/llama-3.3-70b-instruct:free", + "mistralai/mistral-small-3.1-24b-instruct:free", + "google/gemma-3-27b-it:free", + ] + +# Slack configuration +SLACK_BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN", "") +SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "automation") + +# Redis configuration +REDIS_URL = os.getenv("REDIS_URL", "redis://redis.redis.svc.cluster.local:6379/0") +REDIS_PREFIX = "yt-highlights:" + +# Paths +AUDIO_PATH = DATA_PATH / "audio" +TRANSCRIPTS_PATH = DATA_PATH / "transcripts" +HIGHLIGHTS_PATH = DATA_PATH / "highlights" +CONFIG_PATH = DATA_PATH / "config" +STATE_PATH = DATA_PATH / "state" + +# Ensure directories exist +for path in [AUDIO_PATH, TRANSCRIPTS_PATH, HIGHLIGHTS_PATH, CONFIG_PATH, STATE_PATH]: + path.mkdir(parents=True, exist_ok=True) + +# Global state +whisper_model: Optional[WhisperModel] = None +redis_client: Optional[redis.Redis] = None + +# Worker thread state +job_queue: queue.Queue = queue.Queue() +worker_thread: Optional[threading.Thread] = None +worker_running: bool = False + + +class JobStore: + """Redis-backed job storage.""" + + # Jobs older than this are auto-expired + JOB_EXPIRY_HOURS = 24 + + def __init__(self, client: redis.Redis, prefix: str = REDIS_PREFIX): + self.client = client + self.prefix = prefix + + def _key(self, job_id: str) -> str: + return f"{self.prefix}job:{job_id}" + + def set(self, job_id: str, job_data: dict): + """Store job data in Redis.""" + self.client.set(self._key(job_id), json.dumps(job_data)) + # Add to job index + self.client.sadd(f"{self.prefix}jobs", job_id) + + def get(self, job_id: str) -> Optional[dict]: + """Get job data from Redis.""" + data = self.client.get(self._key(job_id)) + if data: + return json.loads(data) + return None + + def update(self, job_id: str, **kwargs): + """Update specific fields in a job.""" + job = self.get(job_id) + if job: + job.update(kwargs) + self.set(job_id, job) + + def delete(self, job_id: str): + """Delete a job from Redis.""" + self.client.delete(self._key(job_id)) + self.client.srem(f"{self.prefix}jobs", job_id) + + def all(self) -> list[dict]: + """Get all jobs.""" + job_ids = self.client.smembers(f"{self.prefix}jobs") + jobs = [] + for job_id in job_ids: + job = self.get(job_id.decode() if isinstance(job_id, bytes) else job_id) + if job: + jobs.append(job) + return jobs + + def get_pending(self) -> list[dict]: + """Get jobs that need to be resumed (queued or processing).""" + pending = [] + for job in self.all(): + if job.get("status") in ("queued", "downloading", "transcribing", "analyzing"): + pending.append(job) + return pending + + def expire_old_jobs(self) -> int: + """Expire jobs older than JOB_EXPIRY_HOURS. + + Returns the number of jobs expired. + """ + from datetime import datetime, timedelta + + cutoff = datetime.utcnow() - timedelta(hours=self.JOB_EXPIRY_HOURS) + expired_count = 0 + + for job in self.all(): + # Skip already completed or failed jobs + if job.get("status") in ("completed", "failed", "expired"): + continue + + # Check job age + created_at = job.get("created_at") + if not created_at: + continue + + try: + # Parse ISO format datetime + job_time = datetime.fromisoformat(created_at.replace("Z", "+00:00").replace("+00:00", "")) + if job_time < cutoff: + job_id = job.get("job_id") + self.update( + job_id, + status="expired", + error=f"Job expired after {self.JOB_EXPIRY_HOURS} hours" + ) + expired_count += 1 + logger.info(f"Expired old job: {job_id}") + except (ValueError, TypeError) as e: + logger.warning(f"Could not parse job date: {created_at}: {e}") + + return expired_count + + +# Global job store (initialized on startup) +job_store: Optional[JobStore] = None + + +# Pydantic models +class ProcessRequest(BaseModel): + video_url: str + whisper_model: Optional[str] = None + language: Optional[str] = "en" + num_highlights: Optional[int] = 5 + + +class ChannelRequest(BaseModel): + channel_id: str + name: Optional[str] = None + + +class JobStatus(BaseModel): + job_id: str + status: str + video_url: str + video_title: Optional[str] = None + progress: Optional[str] = None + error: Optional[str] = None + created_at: str + + +class Highlight(BaseModel): + timestamp: str + timestamp_seconds: int + title: str + description: str + + +class JobResult(BaseModel): + job_id: str + status: str + video_url: str + video_title: str + duration_seconds: int + highlights: list[Highlight] + summary: str + transcript_path: str + + +def load_json(path: Path, default: dict) -> dict: + """Load JSON file or return default.""" + if path.exists(): + return json.loads(path.read_text()) + return default + + +def save_json(path: Path, data: dict): + """Save data to JSON file.""" + path.write_text(json.dumps(data, indent=2)) + + +def send_notification_sync(title: str, message: str, url: str = None): + """Send notification via Slack (synchronous).""" + import requests + + if not SLACK_BOT_TOKEN: + logger.warning("Slack bot token not configured, skipping notification") + return + + try: + # Build Slack message blocks + blocks = [ + { + "type": "header", + "text": {"type": "plain_text", "text": title[:150], "emoji": True} + }, + { + "type": "section", + "text": {"type": "mrkdwn", "text": message[:2900]} + } + ] + + if url: + blocks.append({ + "type": "section", + "text": {"type": "mrkdwn", "text": f"<{url}|Watch Video>"} + }) + + response = requests.post( + "https://slack.com/api/chat.postMessage", + headers={ + "Authorization": f"Bearer {SLACK_BOT_TOKEN}", + "Content-Type": "application/json", + }, + json={ + "channel": SLACK_CHANNEL, + "text": f"{title}: {message}", # Fallback text + "blocks": blocks + }, + timeout=10.0 + ) + + result = response.json() + if not result.get("ok"): + logger.warning(f"Slack API error: {result.get('error', 'unknown')}") + else: + logger.info(f"Slack notification sent: {title}") + + except Exception as e: + logger.warning(f"Failed to send Slack notification: {e}") + + +async def send_notification(title: str, message: str, url: str = None): + """Send notification via Slack (async wrapper).""" + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, send_notification_sync, title, message, url) + + +def get_channels() -> dict: + """Get subscribed channels.""" + return load_json(CONFIG_PATH / "channels.json", {"channels": []}) + + +def save_channels(data: dict): + """Save channels.""" + save_json(CONFIG_PATH / "channels.json", data) + + +def get_processed() -> dict: + """Get processed videos.""" + return load_json(STATE_PATH / "processed.json", {"processed_videos": {}}) + + +def save_processed(data: dict): + """Save processed videos.""" + save_json(STATE_PATH / "processed.json", data) + + +def cleanup_old_processed(hours: int = 24) -> int: + """Remove processed videos older than specified hours. + + Deletes both the state entry and the highlights JSON file. + Returns the number of videos cleaned up. + """ + from datetime import datetime, timedelta + + cutoff = datetime.utcnow() - timedelta(hours=hours) + processed = get_processed() + videos = processed.get("processed_videos", {}) + cleaned = 0 + + to_remove = [] + for video_id, info in videos.items(): + processed_at = info.get("processed_at") + if not processed_at: + continue + + try: + # Parse ISO format datetime + video_time = datetime.fromisoformat(processed_at.replace("Z", "+00:00").replace("+00:00", "")) + if video_time < cutoff: + to_remove.append(video_id) + + # Delete highlights file if exists + highlights_path = info.get("highlights_path") + if highlights_path: + path = Path(highlights_path) + if path.exists(): + path.unlink() + logger.info(f"Deleted old highlights file: {path}") + + # Also delete transcript if exists + transcript_path = TRANSCRIPTS_PATH / f"{video_id}.json" + if transcript_path.exists(): + transcript_path.unlink() + logger.info(f"Deleted old transcript file: {transcript_path}") + + cleaned += 1 + except (ValueError, TypeError) as e: + logger.warning(f"Could not parse processed date: {processed_at}: {e}") + + # Remove from state + for video_id in to_remove: + del videos[video_id] + logger.info(f"Removed old processed video: {video_id}") + + if to_remove: + save_processed(processed) + + return cleaned + + +def extract_video_id(url: str) -> str: + """Extract video ID from YouTube URL.""" + import re + patterns = [ + r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', + ] + for pattern in patterns: + match = re.search(pattern, url) + if match: + return match.group(1) + return url + + +async def download_audio(video_url: str, output_path: Path) -> dict: + """Download audio from YouTube video (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, download_audio_sync, video_url, output_path) + + +def download_audio_sync(video_url: str, output_path: Path) -> dict: + """Download audio from YouTube video (synchronous).""" + ydl_opts = { + # Accept any format - FFmpeg will extract audio + 'format': 'best', + 'postprocessors': [{ + 'key': 'FFmpegExtractAudio', + 'preferredcodec': 'mp3', + 'preferredquality': '128', # Lower quality is fine for transcription + }], + 'outtmpl': str(output_path.with_suffix('')), + 'quiet': True, + 'no_warnings': True, + # Avoid 403 errors from YouTube + 'http_headers': { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'en-us,en;q=0.5', + }, + 'extractor_args': {'youtube': {'player_client': ['ios', 'android', 'web']}}, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(video_url, download=True) + return { + 'title': info.get('title', 'Unknown'), + 'duration': info.get('duration', 0), + 'channel': info.get('channel', 'Unknown'), + 'upload_date': info.get('upload_date', ''), + } + + +async def transcribe_audio(audio_path: Path, language: str = "en") -> list[dict]: + """Transcribe audio using Faster-Whisper (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, transcribe_audio_sync, audio_path, language) + + +def transcribe_audio_sync(audio_path: Path, language: str = "en") -> list[dict]: + """Transcribe audio using Faster-Whisper (synchronous).""" + global whisper_model + + if whisper_model is None: + logger.info(f"Loading Whisper model: {ASR_MODEL} on {ASR_DEVICE}") + whisper_model = WhisperModel( + ASR_MODEL, + device=ASR_DEVICE, + compute_type="float16" if ASR_DEVICE == "cuda" else "int8" + ) + + segments, info = whisper_model.transcribe( + str(audio_path), + language=language, + word_timestamps=True + ) + return [ + { + "start": segment.start, + "end": segment.end, + "text": segment.text.strip(), + } + for segment in segments + ] + + +def format_timestamp(seconds: float) -> str: + """Format seconds as MM:SS or HH:MM:SS.""" + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:02d}" + return f"{minutes}:{secs:02d}" + + +async def extract_highlights( + transcript: list[dict], + video_title: str, + num_highlights: int = 5 +) -> dict: + """Extract highlights using OpenRouter LLM (async wrapper).""" + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, extract_highlights_sync, transcript, video_title, num_highlights + ) + + +def _call_llm_with_retry(prompt: str) -> dict: + """Call OpenRouter LLM with limited retries, then Ollama fallback. + + Tries up to 5 OpenRouter models once each, then falls back to Ollama. + Designed to fail fast - Ollama is the reliable fallback. + """ + import requests + import time + + # Configuration - keep it fast, Ollama is reliable fallback + MAX_MODELS_TO_TRY = 5 + + # Get available free models (cached, refreshed hourly) + model_pool = _fetch_free_models()[:MAX_MODELS_TO_TRY] + + last_error = None + + for i, model in enumerate(model_pool): + try: + logger.info(f"Trying model: {model} ({i + 1}/{len(model_pool)})") + + response = requests.post( + OPENROUTER_URL, + headers={ + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + }, + json={ + "model": model, + "messages": [{"role": "user", "content": prompt}], + "temperature": 0.3, + }, + timeout=60.0 # Shorter timeout + ) + + # Non-200 responses - log and try next model + if response.status_code != 200: + logger.warning(f"Model {model} returned {response.status_code}: {response.text[:200]}") + last_error = f"Model {model} error: {response.status_code}" + continue + + result = response.json() + + # Check for API-level errors + if "error" in result: + error_msg = result.get('error', {}) + if isinstance(error_msg, dict): + error_msg = error_msg.get('message', 'unknown') + logger.warning(f"Model {model} API error: {error_msg}") + last_error = f"Model {model} API error: {error_msg}" + continue + + content = result.get("choices", [{}])[0].get("message", {}).get("content", "") + + if not content or not content.strip(): + logger.warning(f"Model {model} returned empty response") + last_error = f"Model {model} returned empty response" + continue + + # Parse and return if successful + parsed = _parse_llm_response(content, model) + if parsed: + logger.info(f"Successfully used model: {model}") + return parsed + else: + last_error = f"Model {model} returned unparseable response" + continue + + except requests.exceptions.Timeout: + logger.warning(f"Model {model} timed out") + last_error = f"Model {model} timed out" + continue + except requests.exceptions.RequestException as e: + logger.warning(f"Model {model} request failed: {e}") + last_error = f"Model {model} request error: {e}" + continue + except Exception as e: + logger.warning(f"Model {model} unexpected error: {e}") + last_error = f"Model {model} error: {e}" + continue + + # Try Ollama as last resort if configured + if OLLAMA_URL: + logger.info(f"All OpenRouter models failed, trying Ollama fallback: {OLLAMA_MODEL}") + try: + response = requests.post( + f"{OLLAMA_URL}/api/generate", + json={ + "model": OLLAMA_MODEL, + "prompt": prompt, + "stream": False, + "options": {"temperature": 0.3} + }, + timeout=300.0 # Ollama can be slow on first load + ) + + if response.status_code == 200: + result = response.json() + content = result.get("response", "") + if content: + parsed = _parse_llm_response(content, f"ollama:{OLLAMA_MODEL}") + if parsed: + logger.info(f"Successfully used Ollama fallback: {OLLAMA_MODEL}") + return parsed + else: + last_error = f"Ollama {OLLAMA_MODEL} returned unparseable response" + else: + last_error = f"Ollama {OLLAMA_MODEL} returned empty response" + else: + last_error = f"Ollama {OLLAMA_MODEL} error: {response.status_code}" + logger.warning(f"Ollama fallback failed: {response.status_code} - {response.text[:200]}") + + except Exception as e: + logger.warning(f"Ollama fallback failed: {e}") + last_error = f"Ollama error: {e}" + + # All models failed + raise ValueError(f"All models failed. Last error: {last_error}") + + +def _parse_llm_response(content: str, model_name: str) -> Optional[dict]: + """Parse LLM response content into JSON dict. Returns None if parsing fails.""" + import re + + # Strip DeepSeek R1 thinking blocks (e.g., ...) + content = re.sub(r'.*?', '', content, flags=re.DOTALL) + + # Parse JSON from response (handle markdown code blocks) + if "```json" in content: + content = content.split("```json")[1].split("```")[0] + elif "```" in content: + content = content.split("```")[1].split("```")[0] + + content = content.strip() + if not content: + logger.warning(f"Model {model_name} returned no JSON content after stripping") + return None + + try: + return json.loads(content) + except json.JSONDecodeError as e: + logger.warning(f"Model {model_name} returned invalid JSON: {e}") + return None + + +def extract_highlights_sync( + transcript: list[dict], + video_title: str, + num_highlights: int = 5 +) -> dict: + """Extract highlights using OpenRouter LLM (synchronous). + + For long transcripts, splits into chunks and processes each separately, + then combines results. Tries multiple models with exponential backoff. + """ + # Chunk configuration - conservative limit for free tier models + MAX_CHUNK_CHARS = 6000 # ~1500 tokens, safe for most free models + HIGHLIGHTS_PER_CHUNK = max(2, num_highlights // 2) + + # Format transcript with timestamps + formatted_segments = [ + f"[{format_timestamp(seg['start'])}] {seg['text']}" + for seg in transcript + ] + formatted_transcript = "\n".join(formatted_segments) + + # If transcript is small enough, process in one go + if len(formatted_transcript) <= MAX_CHUNK_CHARS: + logger.info(f"Processing transcript in single chunk ({len(formatted_transcript)} chars)") + return _process_single_chunk(formatted_transcript, video_title, num_highlights) + + # Split into chunks for long transcripts + chunks = [] + current_chunk = [] + current_length = 0 + + for segment in formatted_segments: + seg_len = len(segment) + 1 # +1 for newline + if current_length + seg_len > MAX_CHUNK_CHARS and current_chunk: + chunks.append("\n".join(current_chunk)) + current_chunk = [segment] + current_length = seg_len + else: + current_chunk.append(segment) + current_length += seg_len + + if current_chunk: + chunks.append("\n".join(current_chunk)) + + logger.info(f"Processing transcript in {len(chunks)} chunks ({len(formatted_transcript)} total chars)") + + # Process each chunk + all_highlights = [] + summaries = [] + + for i, chunk in enumerate(chunks): + logger.info(f"Processing chunk {i + 1}/{len(chunks)} ({len(chunk)} chars)") + try: + result = _process_single_chunk(chunk, video_title, HIGHLIGHTS_PER_CHUNK, is_partial=True, chunk_num=i+1, total_chunks=len(chunks)) + all_highlights.extend(result.get("highlights", [])) + if result.get("summary"): + summaries.append(result["summary"]) + except Exception as e: + logger.warning(f"Chunk {i + 1} failed: {e}") + # Continue with other chunks + + if not all_highlights and not summaries: + raise ValueError("All chunks failed to process") + + # Sort highlights by timestamp and take top N + all_highlights.sort(key=lambda h: h.get("timestamp_seconds", 0)) + top_highlights = all_highlights[:num_highlights] + + # Combine summaries + if len(summaries) > 1: + combined_summary = " ".join(summaries) + elif summaries: + combined_summary = summaries[0] + else: + combined_summary = "Video processed in chunks." + + return { + "highlights": top_highlights, + "summary": combined_summary + } + + +def _process_single_chunk( + formatted_transcript: str, + video_title: str, + num_highlights: int, + is_partial: bool = False, + chunk_num: int = 1, + total_chunks: int = 1 +) -> dict: + """Process a single transcript chunk to extract highlights.""" + chunk_context = "" + summary_instruction = "Provide a brief summary (2-3 sentences MAX, under 200 characters) of the main takeaway." + if is_partial: + chunk_context = f" (Part {chunk_num} of {total_chunks})" + summary_instruction = "Provide a one-sentence summary (under 100 characters) of this section's main point." + + prompt = f"""Analyze this video transcript and extract key moments. + +Video: "{video_title}"{chunk_context} + +TASK: +1. Identify exactly {num_highlights} most important/interesting moments +2. {summary_instruction} + +OUTPUT FORMAT (valid JSON only, no other text): +{{ + "highlights": [ + {{"timestamp": "MM:SS", "timestamp_seconds": , "title": "", "description": "<1 sentence>"}} + ], + "summary": "" +}} + +RULES: +- Timestamps MUST match exactly from transcript (format: MM:SS or H:MM:SS) +- Keep titles punchy and specific (not generic like "Important point") +- Summary must be SHORT - this is critical + +Transcript: +{formatted_transcript}""" + + return _call_llm_with_retry(prompt) + + +def process_video_sync(job_id: str, video_url: str, language: str, num_highlights: int): + """Process a video: download, transcribe, extract highlights (synchronous). + + This runs entirely in the worker thread, keeping the main event loop free. + """ + video_id = extract_video_id(video_url) + + try: + job_store.update(job_id, status="downloading", progress="Downloading audio...") + + audio_path = AUDIO_PATH / f"{video_id}.mp3" + video_info = download_audio_sync(video_url, audio_path) + + job_store.update( + job_id, + video_title=video_info["title"], + status="transcribing", + progress="Transcribing audio..." + ) + + transcript = transcribe_audio_sync(audio_path, language) + + # Save transcript + transcript_path = TRANSCRIPTS_PATH / f"{video_id}.json" + save_json(transcript_path, { + "video_id": video_id, + "video_url": video_url, + "title": video_info["title"], + "duration": video_info["duration"], + "segments": transcript + }) + + job_store.update(job_id, status="analyzing", progress="Extracting highlights...") + + highlights = extract_highlights_sync( + transcript, + video_info["title"], + num_highlights + ) + + # Save highlights + result = { + "job_id": job_id, + "video_id": video_id, + "video_url": video_url, + "video_title": video_info["title"], + "duration_seconds": video_info["duration"], + "highlights": highlights.get("highlights", []), + "summary": highlights.get("summary", ""), + "transcript_path": str(transcript_path), + "processed_at": datetime.utcnow().isoformat() + } + + highlights_path = HIGHLIGHTS_PATH / f"{video_id}.json" + save_json(highlights_path, result) + + # Update processed state + processed = get_processed() + processed["processed_videos"][video_id] = { + "processed_at": datetime.utcnow().isoformat(), + "status": "completed", + "highlights_path": str(highlights_path) + } + save_processed(processed) + + job_store.update(job_id, status="completed", progress=None, result=result) + + # Cleanup audio file + if audio_path.exists(): + audio_path.unlink() + + logger.info(f"Job {job_id} completed: {video_info['title']}") + + # Build notification message with summary and highlights + summary_text = highlights.get('summary', 'No summary') + highlight_list = highlights.get('highlights', []) + + message_parts = [f"*Summary:* {summary_text}"] + + if highlight_list: + message_parts.append("\n*Key Moments:*") + for h in highlight_list[:5]: # Limit to 5 highlights + ts = h.get('timestamp', '0:00') + title = h.get('title', 'Untitled') + message_parts.append(f"- `{ts}` {title}") + + notification_message = "\n".join(message_parts) + + # Send notification (sync version) + send_notification_sync( + title=f"Video Processed: {video_info['title'][:50]}", + message=notification_message, + url=video_url + ) + + except Exception as e: + logger.exception(f"Job {job_id} failed: {e}") + job_store.update(job_id, status="failed", error=str(e)) + + +def worker_loop(): + """Worker thread main loop - processes jobs from the queue one at a time.""" + global worker_running + logger.info("Worker thread started") + + while worker_running: + try: + # Block for up to 1 second waiting for a job + job = job_queue.get(timeout=1.0) + except queue.Empty: + continue + + try: + job_id = job["job_id"] + video_url = job["video_url"] + language = job.get("language", "en") + num_highlights = job.get("num_highlights", 5) + + logger.info(f"Worker processing job {job_id}: {video_url}") + process_video_sync(job_id, video_url, language, num_highlights) + + except Exception as e: + logger.exception(f"Worker error processing job: {e}") + finally: + job_queue.task_done() + + logger.info("Worker thread stopped") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan handler.""" + global redis_client, job_store, worker_thread, worker_running + + logger.info("Starting yt-highlights service...") + + # Initialize Redis connection + try: + redis_client = redis.from_url(REDIS_URL, decode_responses=False) + redis_client.ping() + job_store = JobStore(redis_client) + logger.info(f"Connected to Redis at {REDIS_URL}") + + # Expire old jobs on startup + expired = job_store.expire_old_jobs() + if expired: + logger.info(f"Expired {expired} old jobs on startup") + + # Cleanup old processed videos on startup + cleaned = cleanup_old_processed(hours=24) + if cleaned: + logger.info(f"Cleaned up {cleaned} old processed videos on startup") + + # Check for pending jobs that need to be resumed + pending = job_store.get_pending() + if pending: + logger.info(f"Found {len(pending)} pending jobs to resume") + for job in pending: + # Mark as failed with resume note - they need to be resubmitted + job_store.update( + job["job_id"], + status="failed", + error="Service restarted - please resubmit" + ) + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + + # Start worker thread + worker_running = True + worker_thread = threading.Thread(target=worker_loop, daemon=True, name="video-worker") + worker_thread.start() + logger.info("Worker thread started") + + yield + + logger.info("Shutting down yt-highlights service...") + + # Stop worker thread + worker_running = False + if worker_thread and worker_thread.is_alive(): + worker_thread.join(timeout=5.0) + logger.info("Worker thread stopped") + + if redis_client: + redis_client.close() + + +app = FastAPI( + title="YouTube Highlights Extractor", + description="Extract key moments and summaries from YouTube videos", + version="1.0.0", + lifespan=lifespan +) + + +@app.get("/health") +async def health(): + """Health check endpoint.""" + return {"status": "healthy", "model": ASR_MODEL, "device": ASR_DEVICE} + + +@app.post("/process") +async def process(request: ProcessRequest): + """Queue a video for processing.""" + job_id = str(uuid.uuid4())[:8] + + job_data = { + "job_id": job_id, + "status": "queued", + "video_url": request.video_url, + "video_title": None, + "progress": None, + "error": None, + "created_at": datetime.utcnow().isoformat(), + } + + job_store.set(job_id, job_data) + + # Add to worker queue instead of background task + job_queue.put({ + "job_id": job_id, + "video_url": request.video_url, + "language": request.language or "en", + "num_highlights": request.num_highlights or 5, + }) + + return JobStatus(**job_data) + + +@app.get("/status/{job_id}") +async def status(job_id: str): + """Get job status.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + return JobStatus(**job) + + +@app.get("/results/{job_id}") +async def results(job_id: str): + """Get job results.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + + if job["status"] != "completed": + raise HTTPException(400, f"Job {job_id} not completed: {job['status']}") + + return job.get("result", {}) + + +@app.delete("/jobs/{job_id}") +async def delete_job(job_id: str): + """Delete a job from the queue.""" + job = job_store.get(job_id) + if not job: + raise HTTPException(404, f"Job {job_id} not found") + + job_store.delete(job_id) + return {"status": "deleted", "job_id": job_id} + + +def resolve_channel_id(channel_input: str) -> tuple[str, str]: + """Resolve a YouTube channel handle/URL to a channel ID. + + Args: + channel_input: Can be a handle (@username), channel ID (UC...), or URL + + Returns: + Tuple of (channel_id, channel_name) + """ + # If it's already a channel ID (starts with UC and is 24 chars), return as-is + if channel_input.startswith("UC") and len(channel_input) == 24: + return channel_input, channel_input + + # Build URL from handle or use as-is if it's a URL + if channel_input.startswith("@"): + url = f"https://www.youtube.com/{channel_input}" + elif channel_input.startswith("http"): + url = channel_input + else: + url = f"https://www.youtube.com/@{channel_input}" + + try: + ydl_opts = { + 'quiet': True, + 'extract_flat': True, + 'playlist_items': '1', + 'no_warnings': True, + } + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info(url, download=False) + channel_id = info.get('channel_id') + channel_name = info.get('channel') or info.get('uploader') or channel_input + if channel_id: + return channel_id, channel_name + raise ValueError(f"Could not resolve channel ID for {channel_input}") + except Exception as e: + logger.error(f"Failed to resolve channel: {channel_input}: {e}") + raise ValueError(f"Could not resolve channel: {channel_input}") + + +@app.get("/channels") +async def list_channels(): + """List subscribed channels.""" + return get_channels() + + +@app.post("/channels") +async def add_channel(request: ChannelRequest): + """Add a channel subscription. + + Accepts handles (@username), channel IDs (UC...), or URLs. + Resolves to the actual channel ID for RSS feed compatibility. + """ + channels = get_channels() + + # Resolve to actual channel ID + try: + channel_id, channel_name = resolve_channel_id(request.channel_id) + except ValueError as e: + raise HTTPException(400, str(e)) + + # Check if already subscribed (check both input and resolved ID) + for ch in channels["channels"]: + if ch["id"] == channel_id: + raise HTTPException(400, f"Channel already subscribed (ID: {channel_id})") + + channels["channels"].append({ + "id": channel_id, + "name": request.name or channel_name, + "handle": request.channel_id if request.channel_id.startswith("@") else None, + "added_at": datetime.utcnow().isoformat(), + "last_checked": None, + "enabled": True + }) + + save_channels(channels) + logger.info(f"Added channel: {channel_name} (ID: {channel_id})") + return {"status": "added", "channel_id": channel_id, "name": channel_name} + + +@app.delete("/channels/{channel_id}") +async def remove_channel(channel_id: str): + """Remove a channel subscription.""" + channels = get_channels() + channels["channels"] = [ + ch for ch in channels["channels"] + if ch["id"] != channel_id + ] + save_channels(channels) + return {"status": "removed", "channel_id": channel_id} + + +@app.post("/channels/migrate") +async def migrate_channels(): + """Migrate existing channels from handles to proper channel IDs. + + Fixes channels that were added with handles (@username) instead of IDs. + """ + channels = get_channels() + migrated = [] + failed = [] + + for channel in channels["channels"]: + old_id = channel["id"] + # Skip if already a proper channel ID + if old_id.startswith("UC") and len(old_id) == 24: + continue + + try: + new_id, new_name = resolve_channel_id(old_id) + channel["id"] = new_id + channel["handle"] = old_id if old_id.startswith("@") else None + channel["name"] = new_name + migrated.append({"old": old_id, "new": new_id, "name": new_name}) + logger.info(f"Migrated channel: {old_id} -> {new_id}") + except Exception as e: + failed.append({"id": old_id, "error": str(e)}) + logger.error(f"Failed to migrate channel {old_id}: {e}") + + if migrated: + save_channels(channels) + + return {"migrated": migrated, "failed": failed} + + +@app.post("/check-new") +async def check_new_videos(): + """Check all subscribed channels for new videos.""" + channels = get_channels() + processed = get_processed() + new_videos = [] + + for channel in channels["channels"]: + if not channel.get("enabled", True): + continue + + feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel['id']}" + + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:5]: # Check last 5 videos + video_id = entry.yt_videoid + + if video_id not in processed.get("processed_videos", {}): + new_videos.append({ + "video_id": video_id, + "video_url": entry.link, + "title": entry.title, + "channel": channel["name"], + "published": entry.published + }) + + # Update last checked + channel["last_checked"] = datetime.utcnow().isoformat() + + except Exception as e: + logger.error(f"Error checking channel {channel['id']}: {e}") + + save_channels(channels) + + return { + "channels_checked": len(channels["channels"]), + "new_videos": new_videos + } + + +@app.get("/jobs") +async def list_jobs(): + """List all jobs. Auto-expires jobs older than 24 hours.""" + # Expire old jobs before listing + job_store.expire_old_jobs() + return {"jobs": job_store.all()} + + +@app.get("/processed") +async def list_processed(): + """List all processed videos with their results. Auto-cleans videos older than 24 hours.""" + # Cleanup old processed videos before listing + cleanup_old_processed(hours=24) + + results = [] + for video_id, info in get_processed().get("processed_videos", {}).items(): + highlights_path = Path(info.get("highlights_path", "")) + if highlights_path.exists(): + try: + data = json.loads(highlights_path.read_text()) + results.append(data) + except Exception: + pass + # Sort by processed_at descending + results.sort(key=lambda x: x.get("processed_at", ""), reverse=True) + return {"videos": results} + + +@app.post("/auto-process") +async def auto_process(): + """Check for new videos and auto-queue them for processing. + + Designed to be called by n8n or other schedulers. + """ + # First check for new videos + channels = get_channels() + processed = get_processed() + new_videos = [] + + for channel in channels["channels"]: + if not channel.get("enabled", True): + continue + + feed_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel['id']}" + + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:3]: # Check last 3 videos + video_id = entry.yt_videoid + + if video_id not in processed.get("processed_videos", {}): + new_videos.append({ + "video_id": video_id, + "video_url": entry.link, + "title": entry.title, + "channel": channel["name"], + }) + + channel["last_checked"] = datetime.utcnow().isoformat() + + except Exception as e: + logger.error(f"Error checking channel {channel['id']}: {e}") + + save_channels(channels) + + # Queue new videos for processing + queued_jobs = [] + for video in new_videos: + job_id = str(uuid.uuid4())[:8] + + job_data = { + "job_id": job_id, + "status": "queued", + "video_url": video["video_url"], + "video_title": video["title"], + "progress": None, + "error": None, + "created_at": datetime.utcnow().isoformat(), + } + + job_store.set(job_id, job_data) + + # Add to worker queue instead of background task + job_queue.put({ + "job_id": job_id, + "video_url": video["video_url"], + "language": "en", + "num_highlights": 5, + }) + + queued_jobs.append({"job_id": job_id, "title": video["title"]}) + + return { + "channels_checked": len(channels["channels"]), + "new_videos_found": len(new_videos), + "queued": queued_jobs + } + + +# Serve static files for web UI +STATIC_PATH = Path(__file__).parent / "static" +if STATIC_PATH.exists(): + app.mount("/static", StaticFiles(directory=str(STATIC_PATH)), name="static") + + +@app.get("/") +async def root(): + """Serve the web UI.""" + index_path = STATIC_PATH / "index.html" + if index_path.exists(): + return FileResponse(index_path) + return {"message": "YouTube Highlights API", "docs": "/docs"} diff --git a/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html b/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html new file mode 100644 index 00000000..f954c218 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/app/static/index.html @@ -0,0 +1,667 @@ + + + + + + YouTube Highlights + + + +
+

YouTube Highlights Checking...

+ +
+ + +
+

Process a Video

+
+ + +
+
+ + +
+

Subscribed Channels

+
+ + + +
+
+
No channels subscribed yet
+
+
+ + +
+

Processing Queue

+
+
No active jobs
+
+
+ + +
+

Processed Videos

+
+
No videos processed yet
+
+
+
+ + + + diff --git a/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt b/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt new file mode 100644 index 00000000..debfa714 --- /dev/null +++ b/modules/kubernetes/youtube_dl/yt-highlights/requirements.txt @@ -0,0 +1,10 @@ +fastapi>=0.104.0 +uvicorn>=0.24.0 +yt-dlp==2025.12.8 +faster-whisper>=1.0.0 +httpx>=0.25.0 +requests>=2.31.0 +pydantic>=2.0.0 +python-multipart>=0.0.6 +feedparser>=6.0.0 +redis>=5.0.0 diff --git a/terraform.tfstate b/terraform.tfstate index 14253b7d..a6012eb9 100644 Binary files a/terraform.tfstate and b/terraform.tfstate differ diff --git a/terraform.tfvars b/terraform.tfvars index 223edcb8..7f367da0 100644 Binary files a/terraform.tfvars and b/terraform.tfvars differ