From 0ac176da01e6b0f1e31e156a59fc6579b9fedb7d Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 08:03:46 +0000 Subject: [PATCH 1/6] crowdsec: whitelist internal/LAN/tailnet CIDRs at the decision layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Preparing for real CrowdSec enforcement (edge Cloudflare Worker for proxied hosts + cs-firewall-bouncer for direct hosts). Both enforce by dropping the real source IP, so if an internal/RFC1918 address ever ended up in a ban decision it could blackhole legitimate internal traffic. Whitelisting the cluster/LAN/tailnet ranges (10/8, 172.16/12, 192.168/16, 100.64/10) at the CrowdSec parser layer makes that structurally impossible — a trusted source can never produce a decision in the first place. Public IP already whitelisted. Co-Authored-By: Claude Opus 4.8 --- stacks/crowdsec/modules/crowdsec/main.tf | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/stacks/crowdsec/modules/crowdsec/main.tf b/stacks/crowdsec/modules/crowdsec/main.tf index a900e8c4..650aafb3 100644 --- a/stacks/crowdsec/modules/crowdsec/main.tf +++ b/stacks/crowdsec/modules/crowdsec/main.tf @@ -102,6 +102,15 @@ resource "kubernetes_config_map" "crowdsec_whitelist" { reason: "Trusted IP - never block" ip: - "176.12.22.76" + cidr: + # Never ban internal/cluster/LAN/tailnet sources. Enforcement (edge + # Worker + firewall-bouncer) drops on real source IP, so an internal + # range slipping into a decision could blackhole legit traffic — this + # makes that structurally impossible at the decision layer. + - "10.0.0.0/8" # k8s nodes/pods/services + VLAN 10/20 + - "172.16.0.0/12" # RFC1918 + - "192.168.0.0/16" # LAN (192.168.1.0/24) + Sofia + - "100.64.0.0/10" # Headscale tailnet (CGNAT) --- name: viktor/immich-asset-paths-whitelist description: "Don't penalise legit Immich timeline bursts (mobile scrub, web grid)" From 4d9fdbc7f7853ae19810c91fdb3830144ea36ba6 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 08:05:11 +0000 Subject: [PATCH 2/6] rybbit: add CrowdSec LAPI -> Cloudflare KV sync script (proxied edge control plane) Pure-stdlib script (alert_digest pattern, runs on stock python:3.12-alpine) that projects CrowdSec Ip-scope ban/captcha decisions into the Workers KV namespace the edge Worker reads on each proxied request. Full-reconcile per run so an un-ban clears from the edge within one interval; fail-safe (a LAPI read error skips the run and leaves existing bans to expire by TTL = fail-open, never a stale all-block). TF wiring (KV namespace + CronJob + key registration) follows. Co-Authored-By: Claude Opus 4.8 --- stacks/rybbit/lapi_kv_sync.py | 189 ++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 stacks/rybbit/lapi_kv_sync.py diff --git a/stacks/rybbit/lapi_kv_sync.py b/stacks/rybbit/lapi_kv_sync.py new file mode 100644 index 00000000..abe7192d --- /dev/null +++ b/stacks/rybbit/lapi_kv_sync.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +"""Sync CrowdSec LAPI decisions -> Cloudflare Workers KV. + +The rybbit edge Worker enforces CrowdSec bans/captchas for Cloudflare-PROXIED +hosts by reading an edge-local KV entry (`ip:` -> `ban`|`captcha`) on each +request. This job is the control plane that keeps that KV in sync with LAPI: +the request path NEVER calls LAPI (no per-request hop) — exactly the nginx +"tail logs -> inject rules" model, just projected onto Workers KV instead of +nftables. + +Design notes: + * Pure Python stdlib (no pip/apk at runtime) — runs on stock python:3.12-alpine + mounted from a ConfigMap, the alert_digest pattern (avoids the disk + anti-pattern of installing packages every run). + * FULL RECONCILE each run: read the complete current decision set from LAPI, + compute the desired KV state, then upsert present keys and delete stale ones. + This makes an un-ban (cscli decisions delete) clear from the edge within one + interval instead of lingering until the original TTL — important for getting + a false-positive un-blocked fast. + * FAIL-SAFE: if LAPI can't be read, we SKIP the run (leave KV untouched) rather + than wipe every ban. Existing KV entries then simply expire by their TTL, so + a LAPI outage degrades toward fail-OPEN, never toward a stale all-block. + * Only Ip-scope ban/captcha decisions are projected. Range-scope and other + remediations are ignored (the Worker keys on a single IP). +""" +import json +import os +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request + +LAPI_URL = os.environ.get("LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080").rstrip("/") +LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI +CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] +CF_NAMESPACE_ID = os.environ["CF_KV_NAMESPACE_ID"] +CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Workers KV Storage:Edit +PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional +KEY_PREFIX = "ip:" +# CrowdSec remediation type -> KV value the Worker understands. +TYPE_MAP = {"ban": "ban", "captcha": "captcha"} +CF_API = "https://api.cloudflare.com/client/v4" +MIN_TTL = 60 # Cloudflare KV minimum expiration_ttl (seconds) + + +def _req(url, *, method="GET", headers=None, data=None, timeout=20): + req = urllib.request.Request(url, method=method, headers=headers or {}, data=data) + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read() + return json.loads(body) if body else None + + +def parse_duration_seconds(dur): + """Parse a Go duration string (e.g. '167h59m51.5s') into whole seconds. + + LAPI returns the REMAINING duration of each decision here. We floor to the + second and clamp to Cloudflare's 60s minimum so the edge entry outlives the + next sync interval. + """ + if not dur: + return MIN_TTL + dur = dur.strip().lstrip("+") + if dur.startswith("-"): # already expired; give it the floor and move on + return MIN_TTL + total = 0.0 + for value, unit in re.findall(r"(\d+(?:\.\d+)?)(h|m|s|ms|us|µs|ns)", dur): + v = float(value) + total += {"h": 3600, "m": 60, "s": 1, "ms": 1e-3, "us": 1e-6, "µs": 1e-6, "ns": 1e-9}[unit] * v + return max(MIN_TTL, int(total)) + + +def fetch_decisions(): + """Return desired KV state {('ip:'): (value, ttl_seconds)} from LAPI. + + Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). + """ + out = {} + data = _req(f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}) + for d in data or []: + if (d.get("scope") or "").lower() != "ip": + continue + value = TYPE_MAP.get((d.get("type") or "").lower()) + if not value: + continue + ip = d.get("value") + if not ip: + continue + key = KEY_PREFIX + ip + ttl = parse_duration_seconds(d.get("duration")) + # If the same IP carries multiple decisions, ban beats captcha and the + # longest TTL wins. + if key in out: + prev_val, prev_ttl = out[key] + value = "ban" if "ban" in (value, prev_val) else value + ttl = max(ttl, prev_ttl) + out[key] = (value, ttl) + return out + + +def cf_list_keys(): + """List existing `ip:` keys currently in the KV namespace (paginated).""" + keys = [] + cursor = "" + while True: + url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/keys?prefix={KEY_PREFIX}&limit=1000" + if cursor: + url += f"&cursor={urllib.parse.quote(cursor)}" + res = _req(url, headers={"Authorization": f"Bearer {CF_API_TOKEN}"}) + keys.extend(k["name"] for k in (res.get("result") or [])) + cursor = (res.get("result_info") or {}).get("cursor") or "" + if not cursor: + return keys + + +def cf_bulk_put(items): + """items: list of (key, value, ttl). Cloudflare bulk PUT (<=10000/call).""" + for i in range(0, len(items), 10000): + chunk = [{"key": k, "value": v, "expiration_ttl": t} for k, v, t in items[i:i + 10000]] + _req( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk", + method="PUT", + headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, + data=json.dumps(chunk).encode(), + ) + + +def cf_bulk_delete(keys): + for i in range(0, len(keys), 10000): + _req( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk/delete", + method="POST", + headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, + data=json.dumps(list(keys)).encode(), + ) + + +def push_metrics(synced, deleted, ok): + if not PUSHGATEWAY: + return + payload = ( + "# TYPE crowdsec_kv_sync_decisions gauge\n" + f"crowdsec_kv_sync_decisions {synced}\n" + "# TYPE crowdsec_kv_sync_deleted gauge\n" + f"crowdsec_kv_sync_deleted {deleted}\n" + "# TYPE crowdsec_kv_sync_success gauge\n" + f"crowdsec_kv_sync_success {1 if ok else 0}\n" + "# TYPE crowdsec_kv_sync_last_run_seconds gauge\n" + f"crowdsec_kv_sync_last_run_seconds {int(time.time())}\n" + ) + try: + _req(f"{PUSHGATEWAY}/metrics/job/crowdsec-kv-sync", method="PUT", + headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10) + except Exception as e: # metrics are best-effort, never fail the job + print(f"[warn] pushgateway: {e}", file=sys.stderr) + + +def main(): + # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe, leave KV). + try: + desired = fetch_decisions() + except Exception as e: + print(f"[skip] LAPI unreadable ({e}); leaving KV untouched so existing " + f"bans expire by TTL (fail-open).", file=sys.stderr) + push_metrics(0, 0, ok=False) + return 0 + + # 2. Current edge state. + existing = set(cf_list_keys()) + desired_keys = set(desired) + + upserts = [(k, v, t) for k, (v, t) in desired.items()] + stale = existing - desired_keys + + if upserts: + cf_bulk_put(upserts) + if stale: + cf_bulk_delete(stale) + + print(f"[ok] synced {len(upserts)} decision(s) to KV, removed {len(stale)} stale; " + f"{sum(1 for _, v, _ in upserts if v == 'ban')} ban / " + f"{sum(1 for _, v, _ in upserts if v == 'captcha')} captcha") + push_metrics(len(upserts), len(stale), ok=True) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 38675b7922a960bbd7191dbf1b92d9724878beca Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 08:12:38 +0000 Subject: [PATCH 3/6] crowdsec: register kvsync + firewall bouncer keys in LAPI Seeds two new bouncers at LAPI startup (BOUNCER_KEY_kvsync, BOUNCER_KEY_firewall) from Vault secret/platform, mirroring the existing BOUNCER_KEY_traefik wiring. These are the two halves of the real enforcement that replaces the dead Yaegi plugin: kvsync authenticates the LAPI->Cloudflare-KV sync (proxied edge Worker), firewall authenticates the cs-firewall-bouncer DaemonSet (direct-host nftables). Co-Authored-By: Claude Opus 4.8 --- stacks/crowdsec/main.tf | 4 ++++ stacks/crowdsec/modules/crowdsec/main.tf | 12 +++++++++++- stacks/crowdsec/modules/crowdsec/values.yaml | 8 ++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/stacks/crowdsec/main.tf b/stacks/crowdsec/main.tf index a4c0013f..ca768356 100644 --- a/stacks/crowdsec/main.tf +++ b/stacks/crowdsec/main.tf @@ -32,4 +32,8 @@ module "crowdsec" { # Same key the traefik-stack bouncer middleware uses — seeded into LAPI so the # bouncer authenticates and pulls decisions (was unregistered → 403 → fail-open). ingress_bouncer_key = data.vault_kv_secret_v2.secrets.data["ingress_crowdsec_api_key"] + # Real enforcement replacing the dead Traefik plugin: kvsync feeds the proxied + # edge Worker via Cloudflare KV; firewall is the direct-host nftables bouncer. + kvsync_bouncer_key = data.vault_kv_secret_v2.secrets.data["kvsync_bouncer_key"] + firewall_bouncer_key = data.vault_kv_secret_v2.secrets.data["firewall_bouncer_key"] } diff --git a/stacks/crowdsec/modules/crowdsec/main.tf b/stacks/crowdsec/modules/crowdsec/main.tf index 650aafb3..86b8c3ab 100644 --- a/stacks/crowdsec/modules/crowdsec/main.tf +++ b/stacks/crowdsec/modules/crowdsec/main.tf @@ -21,6 +21,16 @@ variable "ingress_bouncer_key" { sensitive = true description = "API key for the Traefik CrowdSec bouncer plugin. Seeded into LAPI via BOUNCER_KEY_traefik so the bouncer authenticates and pulls decisions — the same key the traefik-stack middleware presents." } +variable "kvsync_bouncer_key" { + type = string + sensitive = true + description = "API key for the LAPI->Cloudflare-KV sync job (proxied-edge control plane). Seeded into LAPI via BOUNCER_KEY_kvsync; the rybbit-stack CronJob presents the same key to pull decisions." +} +variable "firewall_bouncer_key" { + type = string + sensitive = true + description = "API key for the cs-firewall-bouncer DaemonSet (direct-host in-kernel enforcement). Seeded into LAPI via BOUNCER_KEY_firewall; the DaemonSet presents the same key to stream decisions." +} module "tls_secret" { source = "../../../../modules/kubernetes/setup_tls_secret" @@ -162,7 +172,7 @@ resource "helm_release" "crowdsec" { repository = "https://crowdsecurity.github.io/helm-charts" chart = "crowdsec" - values = [templatefile("${path.module}/values.yaml", { homepage_username = var.homepage_username, homepage_password = var.homepage_password, DB_PASSWORD = var.db_password, ENROLL_KEY = var.enroll_key, SLACK_WEBHOOK_URL = var.slack_webhook_url, mysql_host = var.mysql_host, postgresql_host = var.postgresql_host, INGRESS_CROWDSEC_API_KEY = var.ingress_bouncer_key })] + values = [templatefile("${path.module}/values.yaml", { homepage_username = var.homepage_username, homepage_password = var.homepage_password, DB_PASSWORD = var.db_password, ENROLL_KEY = var.enroll_key, SLACK_WEBHOOK_URL = var.slack_webhook_url, mysql_host = var.mysql_host, postgresql_host = var.postgresql_host, INGRESS_CROWDSEC_API_KEY = var.ingress_bouncer_key, KVSYNC_CROWDSEC_API_KEY = var.kvsync_bouncer_key, FIREWALL_CROWDSEC_API_KEY = var.firewall_bouncer_key })] timeout = 1200 wait = true wait_for_jobs = true diff --git a/stacks/crowdsec/modules/crowdsec/values.yaml b/stacks/crowdsec/modules/crowdsec/values.yaml index 040b44d8..f03dfb22 100644 --- a/stacks/crowdsec/modules/crowdsec/values.yaml +++ b/stacks/crowdsec/modules/crowdsec/values.yaml @@ -135,6 +135,14 @@ lapi: # the prior manual registration was lost in the MySQL→PostgreSQL migration). - name: BOUNCER_KEY_traefik value: "${INGRESS_CROWDSEC_API_KEY}" + # Real enforcement path that replaces the dead Traefik Yaegi plugin: + # kvsync -> LAPI->Cloudflare-KV sync CronJob (proxied hosts, edge Worker) + # firewall -> cs-firewall-bouncer DaemonSet (direct hosts, in-kernel nftables drop) + # Registered at LAPI startup (idempotent across the 3 replicas / restarts). + - name: BOUNCER_KEY_kvsync + value: "${KVSYNC_CROWDSEC_API_KEY}" + - name: BOUNCER_KEY_firewall + value: "${FIREWALL_CROWDSEC_API_KEY}" dashboard: enabled: true env: From 7e646e1c7c1a540a0630f44c4829205b6976fd1e Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 09:11:08 +0000 Subject: [PATCH 4/6] crowdsec: add cs-firewall-bouncer DaemonSet (direct-host nftables enforcement) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drops banned source IPs in-kernel via nftables (hooks input+forward, so DNAT'd LoadBalancer traffic is caught before reaching Traefik) for DIRECT hosts — the direct-side replacement for the dead Traefik plugin, zero per-request hop. No published image exists, so an initContainer fetches the pinned official static binary (v0.0.34) onto a stock debian-slim base (nftables backend uses netlink directly, no nft CLI needed). hostNetwork + NET_ADMIN/NET_RAW (not privileged). Config (with api_key) in a Secret, Reloader-annotated. crowdsec ns is already in the Kyverno wave-1 exclude list, so the privileged/hostNetwork pod is admitted. Pinned to k8s-node2 (runs a Traefik pod) for one-node validation before the nodeSelector is removed to roll cluster-wide. Fail-open by element timeout if the bouncer stops. Co-Authored-By: Claude Opus 4.8 --- .../modules/crowdsec/firewall_bouncer.tf | 254 ++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 stacks/crowdsec/modules/crowdsec/firewall_bouncer.tf diff --git a/stacks/crowdsec/modules/crowdsec/firewall_bouncer.tf b/stacks/crowdsec/modules/crowdsec/firewall_bouncer.tf new file mode 100644 index 00000000..c161486d --- /dev/null +++ b/stacks/crowdsec/modules/crowdsec/firewall_bouncer.tf @@ -0,0 +1,254 @@ +# ============================================================================= +# cs-firewall-bouncer — in-kernel (nftables) enforcement DaemonSet +# ============================================================================= +# CrowdSec currently enforces NOTHING at the network layer: the Traefik Yaegi +# (lua) bouncer plugin is dead, so banned IPs still reach Traefik. For DIRECT +# (non-Cloudflare-proxied) hosts we drop banned source IPs IN-KERNEL via +# cs-firewall-bouncer's nftables backend — the packet is dropped before it ever +# reaches Traefik, costing zero per-request hops. +# +# Topology this respects (do NOT change without re-reading docs/architecture/networking.md): +# - Calico CNI + kube-proxy in IPTABLES mode (NOT eBPF). +# - Traefik is a LoadBalancer Service at 10.0.20.203, externalTrafficPolicy=Local +# (real client IP preserved — that's the whole point of the dedicated .203 IP). +# - LB traffic is DNAT'd to the Traefik POD, so the original source IP survives +# into the `forward` netfilter hook. The drop rule MUST therefore cover the +# `forward` hook, not only `input` (a pod-destined packet traverses forward, +# not input, on the node). Hence nftables_hooks: [input, forward] below. +# +# Packaging: cs-firewall-bouncer publishes NO container image. We pin the +# official release binary (v0.0.34, 2025-08-04 — latest stable) and fetch it at +# runtime: an initContainer (curlimages/curl — has curl + tar, alpine) downloads +# + extracts the static binary into an emptyDir; the main container +# (debian:bookworm-slim) runs it. The nftables backend talks netlink DIRECTLY +# via github.com/google/nftables (go.mod + pkg/nftables/nftables.go: no os/exec) +# and the docs confirm "mode nftables relies on github.com/google/nftables to +# create table, chain and set" — so NO `nft` userspace CLI is needed and a plain +# slim base image suffices. The binary is built CGO_ENABLED=0 / -extldflags +# -static (Makefile), so it runs on glibc (debian) or musl (alpine) alike. +# +# Source: https://github.com/crowdsecurity/cs-firewall-bouncer +# https://docs.crowdsec.net/u/bouncers/firewall/ +# +# nodeSelector pins this to ONE node (k8s-node2, which runs a Traefik pod) for first validation. +# !!! REMOVING THE nodeSelector ROLLS THIS DAEMONSET CLUSTER-WIDE !!! +# Do that ONLY after the one-node validation checklist passes (see commit/PR). + +locals { + # Pin a specific stable release. Bump deliberately (re-validate on one node first). + firewall_bouncer_version = "v0.0.34" + firewall_bouncer_tgz_url = "https://github.com/crowdsecurity/cs-firewall-bouncer/releases/download/${local.firewall_bouncer_version}/crowdsec-firewall-bouncer-linux-amd64.tgz" + firewall_bouncer_bin_path = "/opt/firewall-bouncer/crowdsec-firewall-bouncer" + firewall_bouncer_cfg_path = "/etc/crowdsec/bouncers/crowdsec-firewall-bouncer.yaml" + + # Rendered firewall-bouncer config. Lives in a Secret (NOT a ConfigMap) because + # it embeds api_key. Key names/structure verified against the reference config + # (config/crowdsec-firewall-bouncer.yaml @ v0.0.34): + # - `set-only` uses a HYPHEN (not set_only). + # - `nftables_hooks` is a TOP-LEVEL list (sibling of `nftables:`, underscore). + # - `deny_action` values are uppercase DROP / REJECT. + # - `log_mode: stdout` sends logs to the container's stdout (default is `file`). + # - api_url carries a trailing slash (matches the reference default). + firewall_bouncer_yaml = <<-YAML + mode: nftables + update_frequency: 10s + log_mode: stdout + log_level: info + api_url: http://crowdsec-service.crowdsec.svc.cluster.local:8080/ + api_key: ${var.firewall_bouncer_key} + insecure_skip_verify: false + disable_ipv6: false + deny_action: DROP + deny_log: true + nftables: + ipv4: + enabled: true + set-only: false + table: crowdsec + chain: crowdsec-chain + priority: -10 + ipv6: + enabled: true + set-only: false + table: crowdsec6 + chain: crowdsec6-chain + priority: -10 + nftables_hooks: + - input + - forward + YAML +} + +resource "kubernetes_secret" "firewall_bouncer_config" { + metadata { + name = "crowdsec-firewall-bouncer-config" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + labels = { + "app.kubernetes.io/name" = "crowdsec-firewall-bouncer" + tier = var.tier + } + annotations = { + # Rotate the pods if the API key / config ever changes (the binary reads + # the config only at startup). + "reloader.stakater.com/match" = "true" + } + } + data = { + "crowdsec-firewall-bouncer.yaml" = local.firewall_bouncer_yaml + } + type = "Opaque" +} + +resource "kubernetes_daemon_set_v1" "firewall_bouncer" { + metadata { + name = "crowdsec-firewall-bouncer" + namespace = kubernetes_namespace.crowdsec.metadata[0].name + labels = { + "app.kubernetes.io/name" = "crowdsec-firewall-bouncer" + tier = var.tier + } + } + spec { + selector { + match_labels = { + "app.kubernetes.io/name" = "crowdsec-firewall-bouncer" + } + } + template { + metadata { + labels = { + "app.kubernetes.io/name" = "crowdsec-firewall-bouncer" + tier = var.tier + } + annotations = { + # Bounce pods when the config Secret changes (api_key rotation etc.). + "secret.reloader.stakater.com/reload" = kubernetes_secret.firewall_bouncer_config.metadata[0].name + } + } + spec { + priority_class_name = "tier-1-cluster" + + # Program the HOST's nftables ruleset (not the pod netns) — the bouncer's + # drop rules must live in the host network namespace where DNAT'd LB + # traffic transits the forward hook. + host_network = true + dns_policy = "ClusterFirstWithHostNet" + + # ---- FIRST-VALIDATION PIN ---------------------------------------------- + # Pinned to a SINGLE node so a mistake in the nftables rules can only + # affect one node. k8s-node2 is chosen because it currently runs a Traefik + # pod — required to validate the `forward`-hook drop on DNAT'd LoadBalancer + # traffic (under ETP=Local a node with no Traefik pod never sees that path, + # so the validation would be meaningless there). + # REMOVE this nodeSelector to roll the bouncer to EVERY node (the normal + # end state for a firewall bouncer) — but ONLY after the one-node + # validation checklist passes. + node_selector = { + "kubernetes.io/hostname" = "k8s-node2" + } + # ------------------------------------------------------------------------ + + # initContainer fetches + extracts the pinned release binary into the + # shared emptyDir. curlimages/curl is alpine and ships curl + tar. + init_container { + name = "fetch-bouncer" + image = "curlimages/curl:8.10.1" + command = [ + "sh", "-c", + <<-EOT + set -eu + echo "Downloading cs-firewall-bouncer ${local.firewall_bouncer_version}..." + curl -fsSL "${local.firewall_bouncer_tgz_url}" -o /tmp/fb.tgz + # Archive layout (verified @ v0.0.34): a single versioned top dir + # `crowdsec-firewall-bouncer-vX.Y.Z/` containing the binary plus + # config/, scripts/, install.sh. Strip that dir and extract ONLY the + # binary — the `*/crowdsec-firewall-bouncer` glob matches one path + # segment after the top dir, so config/...yaml is NOT pulled. + tar -xzf /tmp/fb.tgz -C /opt/firewall-bouncer --strip-components=1 \ + --wildcards '*/crowdsec-firewall-bouncer' + chmod +x ${local.firewall_bouncer_bin_path} + echo "Fetched: $(ls -l ${local.firewall_bouncer_bin_path})" + EOT + ] + volume_mount { + name = "binary" + mount_path = "/opt/firewall-bouncer" + } + resources { + requests = { + cpu = "10m" + memory = "32Mi" + } + limits = { + memory = "64Mi" + } + } + } + + container { + name = "firewall-bouncer" + image = "debian:bookworm-slim" + command = [ + local.firewall_bouncer_bin_path, + "-c", local.firewall_bouncer_cfg_path, + ] + + # nftables backend needs NET_ADMIN to program the host ruleset. NET_RAW + # is the proven-safe companion the reference container images add. We + # deliberately AVOID full `privileged: true` — these two caps are + # sufficient for the netlink nftables path (no iptables/ipset shell-out + # here). If validation shows rules are NOT being installed, the next + # thing to try is privileged:true (see checklist) — but start minimal. + security_context { + capabilities { + add = ["NET_ADMIN", "NET_RAW"] + } + } + + volume_mount { + name = "binary" + mount_path = "/opt/firewall-bouncer" + read_only = true + } + volume_mount { + name = "config" + mount_path = local.firewall_bouncer_cfg_path + sub_path = "crowdsec-firewall-bouncer.yaml" + read_only = true + } + + # No liveness probe: the bouncer runs as PID 1, so a crash — or a bad + # config that makes it exit non-zero at startup — surfaces on its own as + # a pod restart / CrashLoopBackOff. This avoids coupling pod liveness to + # a periodic LAPI round-trip (a brief LAPI blip must NOT bounce the pod). + + resources { + requests = { + cpu = "10m" + memory = "64Mi" + } + # crowdsec-quota enforces limits.memory on every pod in the ns. + limits = { + memory = "128Mi" + } + } + } + + volume { + name = "binary" + empty_dir {} + } + volume { + name = "config" + secret { + secret_name = kubernetes_secret.firewall_bouncer_config.metadata[0].name + } + } + } + } + } + lifecycle { + # KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2 + ignore_changes = [spec[0].template[0].spec[0].dns_config] + } +} From cc4bfb593b97510e8b5329c3ba225daca3cc8d72 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 09:18:33 +0000 Subject: [PATCH 5/6] rybbit: proxied CrowdSec enforcement via Cloudflare IP Lists + WAF rule MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the Worker+KV approach (which only covered the ~27 routed hosts) with a zone-wide mechanism that covers ALL proxied hosts: two CF account IP Lists (crowdsec_ban, crowdsec_captcha) + one zone WAF custom rule that blocks `(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)`. No per-request Worker, no cookie machinery — the rybbit Worker stays analytics-only. lapi_kv_sync.py now full-reconciles the two lists from LAPI (fail-safe: a LAPI blip skips the run and freezes the last-known-good block set; serializes CF bulk ops since CF allows one pending op per account). A least-privilege CF API token (Account Filter Lists Edit) is minted in TF. Co-Authored-By: Claude Opus 4.8 --- stacks/rybbit/crowdsec_edge.tf | 299 +++++++++++++++++++++++++++ stacks/rybbit/lapi_kv_sync.py | 357 ++++++++++++++++++++++----------- 2 files changed, 542 insertions(+), 114 deletions(-) create mode 100644 stacks/rybbit/crowdsec_edge.tf diff --git a/stacks/rybbit/crowdsec_edge.tf b/stacks/rybbit/crowdsec_edge.tf new file mode 100644 index 00000000..598cde28 --- /dev/null +++ b/stacks/rybbit/crowdsec_edge.tf @@ -0,0 +1,299 @@ +# ============================================================================= +# CrowdSec edge enforcement for Cloudflare-PROXIED hosts — control plane +# ============================================================================= +# Proxied hosts terminate at the Cloudflare edge, so the in-cluster CrowdSec +# bouncer (which keys on the real client IP seen by Traefik) never gets to +# decide on them. To enforce CrowdSec bans/captchas on proxied traffic we push +# the decision INTO the Cloudflare edge as account-level IP Lists + a single +# zone-scoped WAF custom rule: +# +# * Two account IP Lists — `crowdsec_ban` and `crowdsec_captcha` — hold the +# banned / captcha'd source IPs (empty in TF; populated at runtime). +# * A zone-scoped WAF ruleset in the http_request_firewall_custom phase +# blocks `(ip.src in $crowdsec_ban)` and managed-challenges +# `(ip.src in $crowdsec_captcha)`. Because it's a ZONE rule it enforces +# across ALL proxied hosts in the zone (~135), not just the handful a +# Worker would route. (The previous Worker+KV design only covered the ~27 +# hosts the rybbit Worker routed; the analytics Worker in worker/ is +# unrelated and stays.) +# +# This file is the CONTROL PLANE that keeps those lists in sync with LAPI: +# 1. the two empty IP Lists (list ITEMS are owned by the CronJob at runtime, +# NOT by Terraform — see the lifecycle ignore_changes on `item`), +# 2. a LEAST-PRIVILEGE Cloudflare API token (account Filter-Lists edit only, +# scoped to this account) the sync job authenticates with, +# 3. a CronJob running lapi_kv_sync.py every 2 min to full-reconcile LAPI +# decisions into the two lists (mirrors monitoring/alert_digest.tf: stock +# python:3.12-alpine + pure-stdlib script from a ConfigMap, no pip/apk at +# runtime). +# +# Cloudflare provider is pinned v4.52.7 (~> 4) — v4 schema is used throughout +# (v5 differs greatly: policy is a block here not a `policies = [...]` list; +# resources is a map not a jsonencode'd string; ruleset `rules` is a repeatable +# block; list items use `item { value { ip = ... } }`; permission groups are +# looked up via data.cloudflare_api_token_permission_groups, not a v5 *_list +# data source). context7 only indexes v5, so the v4 arguments below were +# verified against the v4.52.7 provider docs (github tag v4.52.7) — items +# FLAGGED ### VERIFY for tg-plan are noted inline. +# ============================================================================= + +data "cloudflare_accounts" "main" {} + +locals { + cf_account_id = data.cloudflare_accounts.main.accounts[0].id +} + +# ----------------------------------------------------------------------------- +# IP Lists — empty shells. The CronJob owns the items at runtime via the CF +# Rules-Lists API; TF must NOT manage items or every 2-min sync would fight the +# next `terragrunt apply` (apply would try to delete the runtime items). +# +# ### VERIFY (v4.52.7): cloudflare_list args account_id/name/kind/description; +# kind="ip" is one of {ip, redirect, hostname, asn}. The optional items +# block is named `item` (singular, Block Set) with `item { value { ip=... } +# comment=... }`. We declare NO `item` blocks (empty list) and +# ignore_changes=[item] so runtime items don't show as drift. +# NOTE: list `name` must match /^[a-zA-Z0-9_]+$/ (underscores ok, no dashes) +# — hence crowdsec_ban / crowdsec_captcha (underscore, not dash). +# ----------------------------------------------------------------------------- +resource "cloudflare_list" "crowdsec_ban" { + account_id = local.cf_account_id + name = "crowdsec_ban" + kind = "ip" + description = "CrowdSec ban decisions (synced from LAPI)" + + lifecycle { + # The crowdsec-cf-sync CronJob adds/removes items at runtime; TF owns only + # the empty list shell. Without this, every apply would delete live bans. + ignore_changes = [item] + } +} + +resource "cloudflare_list" "crowdsec_captcha" { + account_id = local.cf_account_id + name = "crowdsec_captcha" + kind = "ip" + description = "CrowdSec captcha decisions (synced from LAPI)" + + lifecycle { + ignore_changes = [item] + } +} + +# ----------------------------------------------------------------------------- +# Zone-scoped WAF custom ruleset — the actual enforcement. One ruleset, two +# rules, applied to EVERY proxied host in the zone. +# +# ### VERIFY (v4.52.7): cloudflare_ruleset with zone_id + kind="zone" + +# phase="http_request_firewall_custom"; `rules` is a repeatable block with +# action/expression/description/enabled. actions "block" and +# "managed_challenge" are both valid. List references in WAF expressions use +# the list NAME with a `$` prefix (NOT the list id): ($crowdsec_ban). +# Rule order matters — ban (block) is evaluated before captcha so a +# double-listed IP is blocked outright (the sync script also enforces +# ban-wins, so an IP is never in both lists, but order is belt-and-braces). +# +# zone_id is the viktorbarzin.me zone — the single zone id used repo-wide +# (default of var.cloudflare_zone_id in modules/kubernetes/ingress_factory and +# hardcoded the same in stacks/kms/main.tf; source of truth is the git-crypt'd +# config.tfvars). Hardcoded here (with the conventional marker comment) because +# the rybbit stack does not import the ingress_factory module. +# ----------------------------------------------------------------------------- +resource "cloudflare_ruleset" "crowdsec" { + zone_id = "fd2c5dd4efe8fe38958944e74d0ced6d" # cloudflare_zone_id (viktorbarzin.me) + name = "crowdsec-ip-enforcement" + description = "Block/challenge IPs CrowdSec flagged (synced from LAPI into CF IP Lists)" + kind = "zone" + phase = "http_request_firewall_custom" + + rules { + action = "block" + expression = "(ip.src in $crowdsec_ban)" + description = "CrowdSec: block banned IPs" + enabled = true + } + + rules { + action = "managed_challenge" + expression = "(ip.src in $crowdsec_captcha)" + description = "CrowdSec: managed-challenge captcha'd IPs" + enabled = true + } +} + +# ----------------------------------------------------------------------------- +# Least-privilege API token for the sync job: account-level Filter-Lists edit +# ONLY, scoped to this single account (no zone/DNS/Workers access). The token +# value is sensitive and lands in TF state (Tier-1 PG, encrypted at rest) and +# in the rybbit Secret below — same trust level as the CF Global API Key +# already in state. +# +# ### VERIFY (v4.52.7): cloudflare_api_token with a repeatable `policy` block +# (effect / permission_groups = Set of String / resources = Map of String); +# token secret is exposed as `.value` (sensitive). +# +# ### VERIFY — PERMISSION GROUP NAME (highest-risk item). v4.52.7 deprecates +# the flat `.permissions[...]` map ("some permissions overlap resource +# scope"); the non-deprecated lookup is the scoped `.account[...]` map. +# Cloudflare's current permissions reference calls the account list-edit +# group "Account Filter Lists Edit" (and read "Account Filter Lists Read"). +# An OLDER community gist instead shows "Account Rule Lists Read/Write" — +# Cloudflare has renamed this group over time. If `tg plan` errors with a +# missing key, try (in order): .account["Account Filter Lists Edit"] -> +# .account["Account Rule Lists Write"], or enumerate the live names with: +# terraform console +# > data.cloudflare_api_token_permission_groups.all.account +# Read is not strictly required for edit (Edit = full CRUDL) but the sync +# job GETs items, so we include Read too to be safe. +# ----------------------------------------------------------------------------- +data "cloudflare_api_token_permission_groups" "all" {} + +resource "cloudflare_api_token" "list_sync" { + name = "rybbit-crowdsec-list-sync" + + policy { + effect = "allow" + permission_groups = [ + data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Edit"], + data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Read"], + ] + resources = { + "com.cloudflare.api.account.${local.cf_account_id}" = "*" + } + } +} + +# ----------------------------------------------------------------------------- +# Pure-stdlib sync script, mounted into the CronJob from a ConfigMap (the +# alert_digest pattern — no per-run package installs). +# ----------------------------------------------------------------------------- +resource "kubernetes_config_map" "crowdsec_cf_sync_script" { + metadata { + name = "crowdsec-cf-sync-script" + namespace = "rybbit" + } + data = { + "lapi_kv_sync.py" = file("${path.module}/lapi_kv_sync.py") + } +} + +# Secrets consumed by the sync job: the LAPI bouncer key (registered in LAPI, +# stored in Vault secret/platform -> kvsync_bouncer_key) and the minted CF +# token value. Account id and list ids are NOT secret and are passed as plain +# env values on the CronJob. +resource "kubernetes_secret" "crowdsec_cf_sync" { + metadata { + name = "crowdsec-cf-sync" + namespace = "rybbit" + } + type = "Opaque" + data = { + LAPI_KEY = data.vault_kv_secret_v2.cf_platform.data["kvsync_bouncer_key"] + CF_API_TOKEN = cloudflare_api_token.list_sync.value + } +} + +resource "kubernetes_cron_job_v1" "crowdsec_cf_sync" { + metadata { + name = "crowdsec-cf-sync" + namespace = "rybbit" + labels = { + app = "crowdsec-cf-sync" + tier = local.tiers.aux + } + } + spec { + concurrency_policy = "Forbid" + failed_jobs_history_limit = 3 + successful_jobs_history_limit = 3 + schedule = "*/2 * * * *" + starting_deadline_seconds = 110 + job_template { + metadata {} + spec { + backoff_limit = 2 + ttl_seconds_after_finished = 3600 + template { + metadata { + labels = { + app = "crowdsec-cf-sync" + } + } + spec { + restart_policy = "OnFailure" + container { + name = "crowdsec-cf-sync" + image = "docker.io/library/python:3.12-alpine" + image_pull_policy = "IfNotPresent" + command = ["python3", "/scripts/lapi_kv_sync.py"] + env { + name = "LAPI_KEY" + value_from { + secret_key_ref { + name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name + key = "LAPI_KEY" + } + } + } + env { + name = "CF_API_TOKEN" + value_from { + secret_key_ref { + name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name + key = "CF_API_TOKEN" + } + } + } + env { + name = "CF_ACCOUNT_ID" + value = local.cf_account_id + } + env { + name = "CF_BAN_LIST_ID" + value = cloudflare_list.crowdsec_ban.id + } + env { + name = "CF_CAPTCHA_LIST_ID" + value = cloudflare_list.crowdsec_captcha.id + } + env { + name = "PUSHGATEWAY_URL" + value = "http://prometheus-prometheus-pushgateway.monitoring:9091" + } + volume_mount { + name = "script" + mount_path = "/scripts" + read_only = true + } + resources { + requests = { + cpu = "10m" + memory = "48Mi" + } + limits = { + memory = "96Mi" + } + } + } + volume { + name = "script" + config_map { + name = kubernetes_config_map.crowdsec_cf_sync_script.metadata[0].name + } + } + dns_config { + option { + name = "ndots" + value = "2" + } + } + } + } + } + } + } + lifecycle { + # KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2 + ignore_changes = [spec[0].job_template[0].spec[0].template[0].spec[0].dns_config] + } +} diff --git a/stacks/rybbit/lapi_kv_sync.py b/stacks/rybbit/lapi_kv_sync.py index abe7192d..942912c9 100644 --- a/stacks/rybbit/lapi_kv_sync.py +++ b/stacks/rybbit/lapi_kv_sync.py @@ -1,48 +1,84 @@ #!/usr/bin/env python3 -"""Sync CrowdSec LAPI decisions -> Cloudflare Workers KV. +"""Sync CrowdSec LAPI decisions -> two Cloudflare account IP Lists. -The rybbit edge Worker enforces CrowdSec bans/captchas for Cloudflare-PROXIED -hosts by reading an edge-local KV entry (`ip:` -> `ban`|`captcha`) on each -request. This job is the control plane that keeps that KV in sync with LAPI: -the request path NEVER calls LAPI (no per-request hop) — exactly the nginx -"tail logs -> inject rules" model, just projected onto Workers KV instead of -nftables. +Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec +bouncer (which keys on the client IP Traefik sees) never decides on them. We +push the decisions into the edge instead: a zone-scoped WAF custom rule blocks +`(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)` +across EVERY proxied host in the zone. This job is the control plane that keeps +those two IP Lists in sync with LAPI. + +(Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior +Workers-KV design; it no longer touches KV — it reconciles CF Rules Lists.) Design notes: * Pure Python stdlib (no pip/apk at runtime) — runs on stock python:3.12-alpine - mounted from a ConfigMap, the alert_digest pattern (avoids the disk - anti-pattern of installing packages every run). - * FULL RECONCILE each run: read the complete current decision set from LAPI, - compute the desired KV state, then upsert present keys and delete stale ones. - This makes an un-ban (cscli decisions delete) clear from the edge within one - interval instead of lingering until the original TTL — important for getting - a false-positive un-blocked fast. - * FAIL-SAFE: if LAPI can't be read, we SKIP the run (leave KV untouched) rather - than wipe every ban. Existing KV entries then simply expire by their TTL, so - a LAPI outage degrades toward fail-OPEN, never toward a stale all-block. - * Only Ip-scope ban/captcha decisions are projected. Range-scope and other - remediations are ignored (the Worker keys on a single IP). + mounted from a ConfigMap, the alert_digest pattern. + * FULL RECONCILE each run: read the complete decision set from LAPI, partition + into ban / captcha desired sets, then for each list compute add (desired - + existing) and remove (existing - desired) and apply both. An IP listed for + BOTH ban and captcha is placed in BAN ONLY (ban wins; the WAF rule order + also blocks-before-challenges as belt-and-braces). A `cscli decisions + delete` therefore clears from the edge within one interval (<=2 min). + * FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (lists untouched, + exit 0). A LAPI outage thus freezes the edge state rather than wiping every + ban — degrade toward the last-known-good block set, never toward all-block + or a thundering un-ban. (Decisions linger only until the next successful + sync, not their TTL — we reconcile to LAPI truth, we don't expire entries.) + * FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits + non-zero so the failure is visible (CronJob backoff + missing success + metric + the next run retries). + +Cloudflare Rules-Lists API (account-level IP list items), verified against the +official API reference (developers.cloudflare.com, 2026): + * GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page + cursor at result_info.cursors.after, passed back as ?cursor=. Each + item = {"id","ip","created_on",...}. + * POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY + [{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list). + ASYNCHRONOUS: returns {"result":{"operation_id":...}}. + * DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id": + ""},...]} (delete by item id, not ip). ASYNCHRONOUS. + * GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in + {pending,running,completed,failed} (failed carries `error`). + ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT. + So we must NOT fire add+delete (or both lists) concurrently — we serialize and + poll each operation_id to a terminal state (short bounded timeout) before the + next mutation. If a poll times out we stop mutating for this run and report + partial success (the next 2-min run reconciles the rest); we never abandon an + in-flight op and immediately issue another (that would 409/reject). """ import json import os -import re import sys import time import urllib.error import urllib.parse import urllib.request -LAPI_URL = os.environ.get("LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080").rstrip("/") +LAPI_URL = os.environ.get( + "LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080" +).rstrip("/") LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] -CF_NAMESPACE_ID = os.environ["CF_KV_NAMESPACE_ID"] -CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Workers KV Storage:Edit +CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit +CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"] +CF_CAPTCHA_LIST_ID = os.environ["CF_CAPTCHA_LIST_ID"] PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional -KEY_PREFIX = "ip:" -# CrowdSec remediation type -> KV value the Worker understands. -TYPE_MAP = {"ban": "ban", "captcha": "captcha"} + CF_API = "https://api.cloudflare.com/client/v4" -MIN_TTL = 60 # Cloudflare KV minimum expiration_ttl (seconds) +# Cloudflare item objects expose the ip differently between list and create +# responses; for an IP-kind list each GET item carries a top-level "ip". +# Batch sizes: no official per-request cap is documented, so keep batches +# generous but bounded (well under the global 1200 req / 5 min limit). +BATCH = 1000 +# Async op polling: 1 pending bulk op per account, so poll to terminal state. +POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget) +POLL_INTERVAL = 1.0 + + +class CFError(Exception): + """Cloudflare API failure -> job should exit non-zero (fail loud).""" def _req(url, *, method="GET", headers=None, data=None, timeout=20): @@ -52,136 +88,229 @@ def _req(url, *, method="GET", headers=None, data=None, timeout=20): return json.loads(body) if body else None -def parse_duration_seconds(dur): - """Parse a Go duration string (e.g. '167h59m51.5s') into whole seconds. - - LAPI returns the REMAINING duration of each decision here. We floor to the - second and clamp to Cloudflare's 60s minimum so the edge entry outlives the - next sync interval. - """ - if not dur: - return MIN_TTL - dur = dur.strip().lstrip("+") - if dur.startswith("-"): # already expired; give it the floor and move on - return MIN_TTL - total = 0.0 - for value, unit in re.findall(r"(\d+(?:\.\d+)?)(h|m|s|ms|us|µs|ns)", dur): - v = float(value) - total += {"h": 3600, "m": 60, "s": 1, "ms": 1e-3, "us": 1e-6, "µs": 1e-6, "ns": 1e-9}[unit] * v - return max(MIN_TTL, int(total)) +def _cf(url, *, method="GET", payload=None, timeout=20): + """Call the CF API with the bearer token; raise CFError on any failure.""" + headers = {"Authorization": f"Bearer {CF_API_TOKEN}"} + data = None + if payload is not None: + headers["Content-Type"] = "application/json" + data = json.dumps(payload).encode() + try: + res = _req(url, method=method, headers=headers, data=data, timeout=timeout) + except urllib.error.HTTPError as e: + detail = "" + try: + detail = e.read().decode(errors="replace")[:500] + except Exception: + pass + raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e + except urllib.error.URLError as e: + raise CFError(f"{method} {url} -> {e}") from e + if res is not None and not res.get("success", True): + raise CFError(f"{method} {url} -> not success: {res.get('errors')}") + return res +# --------------------------------------------------------------------------- # +# LAPI +# --------------------------------------------------------------------------- # def fetch_decisions(): - """Return desired KV state {('ip:'): (value, ttl_seconds)} from LAPI. + """Return (ban_set, captcha_set) of IPs from LAPI. - Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). + Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). An + IP appearing in both ban and captcha is placed in BAN only. Raises on + transport/HTTP error so the caller can SKIP the run (fail-safe). """ - out = {} - data = _req(f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}) + data = _req( + f"{LAPI_URL}/v1/decisions", + headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}, + ) + ban, captcha = set(), set() for d in data or []: if (d.get("scope") or "").lower() != "ip": continue - value = TYPE_MAP.get((d.get("type") or "").lower()) - if not value: - continue ip = d.get("value") if not ip: continue - key = KEY_PREFIX + ip - ttl = parse_duration_seconds(d.get("duration")) - # If the same IP carries multiple decisions, ban beats captcha and the - # longest TTL wins. - if key in out: - prev_val, prev_ttl = out[key] - value = "ban" if "ban" in (value, prev_val) else value - ttl = max(ttl, prev_ttl) - out[key] = (value, ttl) - return out + dtype = (d.get("type") or "").lower() + if dtype == "ban": + ban.add(ip) + elif dtype == "captcha": + captcha.add(ip) + # other remediation types (e.g. throttle) are ignored + captcha -= ban # ban wins: never list the same IP in both + return ban, captcha -def cf_list_keys(): - """List existing `ip:` keys currently in the KV namespace (paginated).""" - keys = [] +# --------------------------------------------------------------------------- # +# Cloudflare list items +# --------------------------------------------------------------------------- # +def cf_list_items(list_id): + """Return {ip: item_id} for every item currently in the list (paginated).""" + out = {} cursor = "" while True: - url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/keys?prefix={KEY_PREFIX}&limit=1000" + url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000" if cursor: url += f"&cursor={urllib.parse.quote(cursor)}" - res = _req(url, headers={"Authorization": f"Bearer {CF_API_TOKEN}"}) - keys.extend(k["name"] for k in (res.get("result") or [])) - cursor = (res.get("result_info") or {}).get("cursor") or "" + res = _cf(url) + for it in (res.get("result") or []): + ip = it.get("ip") + if ip: + out[ip] = it.get("id") + cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or "" if not cursor: - return keys + return out -def cf_bulk_put(items): - """items: list of (key, value, ttl). Cloudflare bulk PUT (<=10000/call).""" - for i in range(0, len(items), 10000): - chunk = [{"key": k, "value": v, "expiration_ttl": t} for k, v, t in items[i:i + 10000]] - _req( - f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk", - method="PUT", - headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, - data=json.dumps(chunk).encode(), - ) +def _wait_for_op(op_id): + """Poll a bulk operation to a terminal state. Returns True if completed, + False if it timed out (still pending/running). Raises CFError if it failed. + + We must reach a terminal state before issuing the next mutation: CF allows + only one pending bulk op per account, so firing another while this is + in-flight would be rejected. + """ + if not op_id: + return True + deadline = time.time() + POLL_TIMEOUT + url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}" + while time.time() < deadline: + res = _cf(url) + status = ((res.get("result") or {}).get("status") or "").lower() + if status == "completed": + return True + if status == "failed": + raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}") + time.sleep(POLL_INTERVAL) + print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; " + f"stopping further mutations this run (next run reconciles)", file=sys.stderr) + return False -def cf_bulk_delete(keys): - for i in range(0, len(keys), 10000): - _req( - f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk/delete", +def _op_id(res): + return ((res or {}).get("result") or {}).get("operation_id") + + +def cf_add_items(list_id, ips): + """POST new IPs to the list (append). Returns the operation_id (async).""" + # If callers ever exceed one batch, each POST is a separate bulk op and the + # single-pending-op rule forces us to wait between them. + last_op = None + for i in range(0, len(ips), BATCH): + chunk = [{"ip": ip} for ip in ips[i : i + BATCH]] + res = _cf( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="POST", - headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, - data=json.dumps(list(keys)).encode(), + payload=chunk, ) + last_op = _op_id(res) + if i + BATCH < len(ips): # more chunks coming -> serialize + if not _wait_for_op(last_op): + return last_op # timed out; bail (partial), next run continues + return last_op -def push_metrics(synced, deleted, ok): +def cf_delete_items(list_id, item_ids): + """DELETE items by id. Returns the operation_id (async).""" + last_op = None + for i in range(0, len(item_ids), BATCH): + chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]} + res = _cf( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", + method="DELETE", + payload=chunk, + ) + last_op = _op_id(res) + if i + BATCH < len(item_ids): + if not _wait_for_op(last_op): + return last_op + return last_op + + +def reconcile(label, list_id, desired): + """Reconcile one list to `desired` (a set of IPs). + + Returns (added, removed). Serializes add->wait->delete so we respect the + one-pending-bulk-op-per-account limit. Raises CFError on hard failure. + """ + existing = cf_list_items(list_id) # {ip: item_id} + existing_ips = set(existing) + to_add = sorted(desired - existing_ips) + to_remove_ids = [existing[ip] for ip in (existing_ips - desired)] + + added = removed = 0 + if to_add: + op = cf_add_items(list_id, to_add) + if _wait_for_op(op): + added = len(to_add) + else: + # add op didn't finish; skip the delete this run to avoid stacking a + # second pending op, and report what we attempted. + print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr) + return added, removed + if to_remove_ids: + op = cf_delete_items(list_id, to_remove_ids) + if _wait_for_op(op): + removed = len(to_remove_ids) + print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})") + return added, removed + + +# --------------------------------------------------------------------------- # +# Metrics (best-effort) +# --------------------------------------------------------------------------- # +def push_metrics(ban_n, captcha_n, ok): if not PUSHGATEWAY: return payload = ( - "# TYPE crowdsec_kv_sync_decisions gauge\n" - f"crowdsec_kv_sync_decisions {synced}\n" - "# TYPE crowdsec_kv_sync_deleted gauge\n" - f"crowdsec_kv_sync_deleted {deleted}\n" - "# TYPE crowdsec_kv_sync_success gauge\n" - f"crowdsec_kv_sync_success {1 if ok else 0}\n" - "# TYPE crowdsec_kv_sync_last_run_seconds gauge\n" - f"crowdsec_kv_sync_last_run_seconds {int(time.time())}\n" + "# TYPE crowdsec_cf_list_ban_count gauge\n" + f"crowdsec_cf_list_ban_count {ban_n}\n" + "# TYPE crowdsec_cf_list_captcha_count gauge\n" + f"crowdsec_cf_list_captcha_count {captcha_n}\n" + "# TYPE crowdsec_cf_list_sync_success gauge\n" + f"crowdsec_cf_list_sync_success {1 if ok else 0}\n" + "# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n" + f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n" ) try: - _req(f"{PUSHGATEWAY}/metrics/job/crowdsec-kv-sync", method="PUT", - headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10) + _req( + f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync", + method="PUT", + headers={"Content-Type": "text/plain"}, + data=payload.encode(), + timeout=10, + ) except Exception as e: # metrics are best-effort, never fail the job print(f"[warn] pushgateway: {e}", file=sys.stderr) +# --------------------------------------------------------------------------- # def main(): - # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe, leave KV). + # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe). try: - desired = fetch_decisions() + ban, captcha = fetch_decisions() except Exception as e: - print(f"[skip] LAPI unreadable ({e}); leaving KV untouched so existing " - f"bans expire by TTL (fail-open).", file=sys.stderr) + print( + f"[skip] LAPI unreadable ({e}); leaving CF lists untouched " + f"(fail-safe: freeze last-known edge state).", + file=sys.stderr, + ) push_metrics(0, 0, ok=False) return 0 - # 2. Current edge state. - existing = set(cf_list_keys()) - desired_keys = set(desired) + print(f"[info] LAPI desired: {len(ban)} ban / {len(captcha)} captcha (ip-scope)") - upserts = [(k, v, t) for k, (v, t) in desired.items()] - stale = existing - desired_keys + # 2. Reconcile both lists. CF errors fail loud (non-zero exit). + try: + reconcile("ban", CF_BAN_LIST_ID, ban) + reconcile("captcha", CF_CAPTCHA_LIST_ID, captcha) + except CFError as e: + print(f"[error] Cloudflare API failure: {e}", file=sys.stderr) + push_metrics(len(ban), len(captcha), ok=False) + return 1 - if upserts: - cf_bulk_put(upserts) - if stale: - cf_bulk_delete(stale) - - print(f"[ok] synced {len(upserts)} decision(s) to KV, removed {len(stale)} stale; " - f"{sum(1 for _, v, _ in upserts if v == 'ban')} ban / " - f"{sum(1 for _, v, _ in upserts if v == 'captcha')} captcha") - push_metrics(len(upserts), len(stale), ok=True) + push_metrics(len(ban), len(captcha), ok=True) return 0 From ca8d617e72974cec7bd0a839741ae5dde65cdcfa Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sat, 20 Jun 2026 09:41:41 +0000 Subject: [PATCH 6/6] rybbit: use 'Account Rule Lists' permission group for the CF sync token (v4) tg plan verified the agent's guess 'Account Filter Lists Edit/Read' is not a key in the v4.52.7 permission-group map; the live CF API lists the correct account-scoped groups as 'Account Rule Lists Read'/'Write'. Co-Authored-By: Claude Opus 4.8 --- stacks/rybbit/crowdsec_edge.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stacks/rybbit/crowdsec_edge.tf b/stacks/rybbit/crowdsec_edge.tf index 598cde28..0c1ff5dc 100644 --- a/stacks/rybbit/crowdsec_edge.tf +++ b/stacks/rybbit/crowdsec_edge.tf @@ -154,8 +154,8 @@ resource "cloudflare_api_token" "list_sync" { policy { effect = "allow" permission_groups = [ - data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Edit"], - data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Read"], + data.cloudflare_api_token_permission_groups.all.account["Account Rule Lists Write"], + data.cloudflare_api_token_permission_groups.all.account["Account Rule Lists Read"], ] resources = { "com.cloudflare.api.account.${local.cf_account_id}" = "*"