CrowdSec real enforcement: edge WAF (proxied) + firewall-bouncer (direct) #2

Merged
viktor merged 6 commits from wizard/crowdsec-enforcement into master 2026-06-20 09:42:41 +00:00
6 changed files with 903 additions and 1 deletions

View file

@ -32,4 +32,8 @@ module "crowdsec" {
# Same key the traefik-stack bouncer middleware uses seeded into LAPI so the
# bouncer authenticates and pulls decisions (was unregistered 403 fail-open).
ingress_bouncer_key = data.vault_kv_secret_v2.secrets.data["ingress_crowdsec_api_key"]
# Real enforcement replacing the dead Traefik plugin: kvsync feeds the proxied
# edge Worker via Cloudflare KV; firewall is the direct-host nftables bouncer.
kvsync_bouncer_key = data.vault_kv_secret_v2.secrets.data["kvsync_bouncer_key"]
firewall_bouncer_key = data.vault_kv_secret_v2.secrets.data["firewall_bouncer_key"]
}

View file

@ -0,0 +1,254 @@
# =============================================================================
# cs-firewall-bouncer in-kernel (nftables) enforcement DaemonSet
# =============================================================================
# CrowdSec currently enforces NOTHING at the network layer: the Traefik Yaegi
# (lua) bouncer plugin is dead, so banned IPs still reach Traefik. For DIRECT
# (non-Cloudflare-proxied) hosts we drop banned source IPs IN-KERNEL via
# cs-firewall-bouncer's nftables backend the packet is dropped before it ever
# reaches Traefik, costing zero per-request hops.
#
# Topology this respects (do NOT change without re-reading docs/architecture/networking.md):
# - Calico CNI + kube-proxy in IPTABLES mode (NOT eBPF).
# - Traefik is a LoadBalancer Service at 10.0.20.203, externalTrafficPolicy=Local
# (real client IP preserved that's the whole point of the dedicated .203 IP).
# - LB traffic is DNAT'd to the Traefik POD, so the original source IP survives
# into the `forward` netfilter hook. The drop rule MUST therefore cover the
# `forward` hook, not only `input` (a pod-destined packet traverses forward,
# not input, on the node). Hence nftables_hooks: [input, forward] below.
#
# Packaging: cs-firewall-bouncer publishes NO container image. We pin the
# official release binary (v0.0.34, 2025-08-04 latest stable) and fetch it at
# runtime: an initContainer (curlimages/curl has curl + tar, alpine) downloads
# + extracts the static binary into an emptyDir; the main container
# (debian:bookworm-slim) runs it. The nftables backend talks netlink DIRECTLY
# via github.com/google/nftables (go.mod + pkg/nftables/nftables.go: no os/exec)
# and the docs confirm "mode nftables relies on github.com/google/nftables to
# create table, chain and set" — so NO `nft` userspace CLI is needed and a plain
# slim base image suffices. The binary is built CGO_ENABLED=0 / -extldflags
# -static (Makefile), so it runs on glibc (debian) or musl (alpine) alike.
#
# Source: https://github.com/crowdsecurity/cs-firewall-bouncer
# https://docs.crowdsec.net/u/bouncers/firewall/
#
# nodeSelector pins this to ONE node (k8s-node2, which runs a Traefik pod) for first validation.
# !!! REMOVING THE nodeSelector ROLLS THIS DAEMONSET CLUSTER-WIDE !!!
# Do that ONLY after the one-node validation checklist passes (see commit/PR).
locals {
# Pin a specific stable release. Bump deliberately (re-validate on one node first).
firewall_bouncer_version = "v0.0.34"
firewall_bouncer_tgz_url = "https://github.com/crowdsecurity/cs-firewall-bouncer/releases/download/${local.firewall_bouncer_version}/crowdsec-firewall-bouncer-linux-amd64.tgz"
firewall_bouncer_bin_path = "/opt/firewall-bouncer/crowdsec-firewall-bouncer"
firewall_bouncer_cfg_path = "/etc/crowdsec/bouncers/crowdsec-firewall-bouncer.yaml"
# Rendered firewall-bouncer config. Lives in a Secret (NOT a ConfigMap) because
# it embeds api_key. Key names/structure verified against the reference config
# (config/crowdsec-firewall-bouncer.yaml @ v0.0.34):
# - `set-only` uses a HYPHEN (not set_only).
# - `nftables_hooks` is a TOP-LEVEL list (sibling of `nftables:`, underscore).
# - `deny_action` values are uppercase DROP / REJECT.
# - `log_mode: stdout` sends logs to the container's stdout (default is `file`).
# - api_url carries a trailing slash (matches the reference default).
firewall_bouncer_yaml = <<-YAML
mode: nftables
update_frequency: 10s
log_mode: stdout
log_level: info
api_url: http://crowdsec-service.crowdsec.svc.cluster.local:8080/
api_key: ${var.firewall_bouncer_key}
insecure_skip_verify: false
disable_ipv6: false
deny_action: DROP
deny_log: true
nftables:
ipv4:
enabled: true
set-only: false
table: crowdsec
chain: crowdsec-chain
priority: -10
ipv6:
enabled: true
set-only: false
table: crowdsec6
chain: crowdsec6-chain
priority: -10
nftables_hooks:
- input
- forward
YAML
}
resource "kubernetes_secret" "firewall_bouncer_config" {
metadata {
name = "crowdsec-firewall-bouncer-config"
namespace = kubernetes_namespace.crowdsec.metadata[0].name
labels = {
"app.kubernetes.io/name" = "crowdsec-firewall-bouncer"
tier = var.tier
}
annotations = {
# Rotate the pods if the API key / config ever changes (the binary reads
# the config only at startup).
"reloader.stakater.com/match" = "true"
}
}
data = {
"crowdsec-firewall-bouncer.yaml" = local.firewall_bouncer_yaml
}
type = "Opaque"
}
resource "kubernetes_daemon_set_v1" "firewall_bouncer" {
metadata {
name = "crowdsec-firewall-bouncer"
namespace = kubernetes_namespace.crowdsec.metadata[0].name
labels = {
"app.kubernetes.io/name" = "crowdsec-firewall-bouncer"
tier = var.tier
}
}
spec {
selector {
match_labels = {
"app.kubernetes.io/name" = "crowdsec-firewall-bouncer"
}
}
template {
metadata {
labels = {
"app.kubernetes.io/name" = "crowdsec-firewall-bouncer"
tier = var.tier
}
annotations = {
# Bounce pods when the config Secret changes (api_key rotation etc.).
"secret.reloader.stakater.com/reload" = kubernetes_secret.firewall_bouncer_config.metadata[0].name
}
}
spec {
priority_class_name = "tier-1-cluster"
# Program the HOST's nftables ruleset (not the pod netns) the bouncer's
# drop rules must live in the host network namespace where DNAT'd LB
# traffic transits the forward hook.
host_network = true
dns_policy = "ClusterFirstWithHostNet"
# ---- FIRST-VALIDATION PIN ----------------------------------------------
# Pinned to a SINGLE node so a mistake in the nftables rules can only
# affect one node. k8s-node2 is chosen because it currently runs a Traefik
# pod required to validate the `forward`-hook drop on DNAT'd LoadBalancer
# traffic (under ETP=Local a node with no Traefik pod never sees that path,
# so the validation would be meaningless there).
# REMOVE this nodeSelector to roll the bouncer to EVERY node (the normal
# end state for a firewall bouncer) but ONLY after the one-node
# validation checklist passes.
node_selector = {
"kubernetes.io/hostname" = "k8s-node2"
}
# ------------------------------------------------------------------------
# initContainer fetches + extracts the pinned release binary into the
# shared emptyDir. curlimages/curl is alpine and ships curl + tar.
init_container {
name = "fetch-bouncer"
image = "curlimages/curl:8.10.1"
command = [
"sh", "-c",
<<-EOT
set -eu
echo "Downloading cs-firewall-bouncer ${local.firewall_bouncer_version}..."
curl -fsSL "${local.firewall_bouncer_tgz_url}" -o /tmp/fb.tgz
# Archive layout (verified @ v0.0.34): a single versioned top dir
# `crowdsec-firewall-bouncer-vX.Y.Z/` containing the binary plus
# config/, scripts/, install.sh. Strip that dir and extract ONLY the
# binary the `*/crowdsec-firewall-bouncer` glob matches one path
# segment after the top dir, so config/...yaml is NOT pulled.
tar -xzf /tmp/fb.tgz -C /opt/firewall-bouncer --strip-components=1 \
--wildcards '*/crowdsec-firewall-bouncer'
chmod +x ${local.firewall_bouncer_bin_path}
echo "Fetched: $(ls -l ${local.firewall_bouncer_bin_path})"
EOT
]
volume_mount {
name = "binary"
mount_path = "/opt/firewall-bouncer"
}
resources {
requests = {
cpu = "10m"
memory = "32Mi"
}
limits = {
memory = "64Mi"
}
}
}
container {
name = "firewall-bouncer"
image = "debian:bookworm-slim"
command = [
local.firewall_bouncer_bin_path,
"-c", local.firewall_bouncer_cfg_path,
]
# nftables backend needs NET_ADMIN to program the host ruleset. NET_RAW
# is the proven-safe companion the reference container images add. We
# deliberately AVOID full `privileged: true` these two caps are
# sufficient for the netlink nftables path (no iptables/ipset shell-out
# here). If validation shows rules are NOT being installed, the next
# thing to try is privileged:true (see checklist) but start minimal.
security_context {
capabilities {
add = ["NET_ADMIN", "NET_RAW"]
}
}
volume_mount {
name = "binary"
mount_path = "/opt/firewall-bouncer"
read_only = true
}
volume_mount {
name = "config"
mount_path = local.firewall_bouncer_cfg_path
sub_path = "crowdsec-firewall-bouncer.yaml"
read_only = true
}
# No liveness probe: the bouncer runs as PID 1, so a crash or a bad
# config that makes it exit non-zero at startup surfaces on its own as
# a pod restart / CrashLoopBackOff. This avoids coupling pod liveness to
# a periodic LAPI round-trip (a brief LAPI blip must NOT bounce the pod).
resources {
requests = {
cpu = "10m"
memory = "64Mi"
}
# crowdsec-quota enforces limits.memory on every pod in the ns.
limits = {
memory = "128Mi"
}
}
}
volume {
name = "binary"
empty_dir {}
}
volume {
name = "config"
secret {
secret_name = kubernetes_secret.firewall_bouncer_config.metadata[0].name
}
}
}
}
}
lifecycle {
# KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2
ignore_changes = [spec[0].template[0].spec[0].dns_config]
}
}

View file

@ -21,6 +21,16 @@ variable "ingress_bouncer_key" {
sensitive = true
description = "API key for the Traefik CrowdSec bouncer plugin. Seeded into LAPI via BOUNCER_KEY_traefik so the bouncer authenticates and pulls decisions — the same key the traefik-stack middleware presents."
}
variable "kvsync_bouncer_key" {
type = string
sensitive = true
description = "API key for the LAPI->Cloudflare-KV sync job (proxied-edge control plane). Seeded into LAPI via BOUNCER_KEY_kvsync; the rybbit-stack CronJob presents the same key to pull decisions."
}
variable "firewall_bouncer_key" {
type = string
sensitive = true
description = "API key for the cs-firewall-bouncer DaemonSet (direct-host in-kernel enforcement). Seeded into LAPI via BOUNCER_KEY_firewall; the DaemonSet presents the same key to stream decisions."
}
module "tls_secret" {
source = "../../../../modules/kubernetes/setup_tls_secret"
@ -102,6 +112,15 @@ resource "kubernetes_config_map" "crowdsec_whitelist" {
reason: "Trusted IP - never block"
ip:
- "176.12.22.76"
cidr:
# Never ban internal/cluster/LAN/tailnet sources. Enforcement (edge
# Worker + firewall-bouncer) drops on real source IP, so an internal
# range slipping into a decision could blackhole legit traffic this
# makes that structurally impossible at the decision layer.
- "10.0.0.0/8" # k8s nodes/pods/services + VLAN 10/20
- "172.16.0.0/12" # RFC1918
- "192.168.0.0/16" # LAN (192.168.1.0/24) + Sofia
- "100.64.0.0/10" # Headscale tailnet (CGNAT)
---
name: viktor/immich-asset-paths-whitelist
description: "Don't penalise legit Immich timeline bursts (mobile scrub, web grid)"
@ -153,7 +172,7 @@ resource "helm_release" "crowdsec" {
repository = "https://crowdsecurity.github.io/helm-charts"
chart = "crowdsec"
values = [templatefile("${path.module}/values.yaml", { homepage_username = var.homepage_username, homepage_password = var.homepage_password, DB_PASSWORD = var.db_password, ENROLL_KEY = var.enroll_key, SLACK_WEBHOOK_URL = var.slack_webhook_url, mysql_host = var.mysql_host, postgresql_host = var.postgresql_host, INGRESS_CROWDSEC_API_KEY = var.ingress_bouncer_key })]
values = [templatefile("${path.module}/values.yaml", { homepage_username = var.homepage_username, homepage_password = var.homepage_password, DB_PASSWORD = var.db_password, ENROLL_KEY = var.enroll_key, SLACK_WEBHOOK_URL = var.slack_webhook_url, mysql_host = var.mysql_host, postgresql_host = var.postgresql_host, INGRESS_CROWDSEC_API_KEY = var.ingress_bouncer_key, KVSYNC_CROWDSEC_API_KEY = var.kvsync_bouncer_key, FIREWALL_CROWDSEC_API_KEY = var.firewall_bouncer_key })]
timeout = 1200
wait = true
wait_for_jobs = true

View file

@ -135,6 +135,14 @@ lapi:
# the prior manual registration was lost in the MySQL→PostgreSQL migration).
- name: BOUNCER_KEY_traefik
value: "${INGRESS_CROWDSEC_API_KEY}"
# Real enforcement path that replaces the dead Traefik Yaegi plugin:
# kvsync -> LAPI->Cloudflare-KV sync CronJob (proxied hosts, edge Worker)
# firewall -> cs-firewall-bouncer DaemonSet (direct hosts, in-kernel nftables drop)
# Registered at LAPI startup (idempotent across the 3 replicas / restarts).
- name: BOUNCER_KEY_kvsync
value: "${KVSYNC_CROWDSEC_API_KEY}"
- name: BOUNCER_KEY_firewall
value: "${FIREWALL_CROWDSEC_API_KEY}"
dashboard:
enabled: true
env:

View file

@ -0,0 +1,299 @@
# =============================================================================
# CrowdSec edge enforcement for Cloudflare-PROXIED hosts control plane
# =============================================================================
# Proxied hosts terminate at the Cloudflare edge, so the in-cluster CrowdSec
# bouncer (which keys on the real client IP seen by Traefik) never gets to
# decide on them. To enforce CrowdSec bans/captchas on proxied traffic we push
# the decision INTO the Cloudflare edge as account-level IP Lists + a single
# zone-scoped WAF custom rule:
#
# * Two account IP Lists `crowdsec_ban` and `crowdsec_captcha` hold the
# banned / captcha'd source IPs (empty in TF; populated at runtime).
# * A zone-scoped WAF ruleset in the http_request_firewall_custom phase
# blocks `(ip.src in $crowdsec_ban)` and managed-challenges
# `(ip.src in $crowdsec_captcha)`. Because it's a ZONE rule it enforces
# across ALL proxied hosts in the zone (~135), not just the handful a
# Worker would route. (The previous Worker+KV design only covered the ~27
# hosts the rybbit Worker routed; the analytics Worker in worker/ is
# unrelated and stays.)
#
# This file is the CONTROL PLANE that keeps those lists in sync with LAPI:
# 1. the two empty IP Lists (list ITEMS are owned by the CronJob at runtime,
# NOT by Terraform see the lifecycle ignore_changes on `item`),
# 2. a LEAST-PRIVILEGE Cloudflare API token (account Filter-Lists edit only,
# scoped to this account) the sync job authenticates with,
# 3. a CronJob running lapi_kv_sync.py every 2 min to full-reconcile LAPI
# decisions into the two lists (mirrors monitoring/alert_digest.tf: stock
# python:3.12-alpine + pure-stdlib script from a ConfigMap, no pip/apk at
# runtime).
#
# Cloudflare provider is pinned v4.52.7 (~> 4) v4 schema is used throughout
# (v5 differs greatly: policy is a block here not a `policies = [...]` list;
# resources is a map not a jsonencode'd string; ruleset `rules` is a repeatable
# block; list items use `item { value { ip = ... } }`; permission groups are
# looked up via data.cloudflare_api_token_permission_groups, not a v5 *_list
# data source). context7 only indexes v5, so the v4 arguments below were
# verified against the v4.52.7 provider docs (github tag v4.52.7) items
# FLAGGED ### VERIFY for tg-plan are noted inline.
# =============================================================================
data "cloudflare_accounts" "main" {}
locals {
cf_account_id = data.cloudflare_accounts.main.accounts[0].id
}
# -----------------------------------------------------------------------------
# IP Lists empty shells. The CronJob owns the items at runtime via the CF
# Rules-Lists API; TF must NOT manage items or every 2-min sync would fight the
# next `terragrunt apply` (apply would try to delete the runtime items).
#
# ### VERIFY (v4.52.7): cloudflare_list args account_id/name/kind/description;
# kind="ip" is one of {ip, redirect, hostname, asn}. The optional items
# block is named `item` (singular, Block Set) with `item { value { ip=... }
# comment=... }`. We declare NO `item` blocks (empty list) and
# ignore_changes=[item] so runtime items don't show as drift.
# NOTE: list `name` must match /^[a-zA-Z0-9_]+$/ (underscores ok, no dashes)
# hence crowdsec_ban / crowdsec_captcha (underscore, not dash).
# -----------------------------------------------------------------------------
resource "cloudflare_list" "crowdsec_ban" {
account_id = local.cf_account_id
name = "crowdsec_ban"
kind = "ip"
description = "CrowdSec ban decisions (synced from LAPI)"
lifecycle {
# The crowdsec-cf-sync CronJob adds/removes items at runtime; TF owns only
# the empty list shell. Without this, every apply would delete live bans.
ignore_changes = [item]
}
}
resource "cloudflare_list" "crowdsec_captcha" {
account_id = local.cf_account_id
name = "crowdsec_captcha"
kind = "ip"
description = "CrowdSec captcha decisions (synced from LAPI)"
lifecycle {
ignore_changes = [item]
}
}
# -----------------------------------------------------------------------------
# Zone-scoped WAF custom ruleset the actual enforcement. One ruleset, two
# rules, applied to EVERY proxied host in the zone.
#
# ### VERIFY (v4.52.7): cloudflare_ruleset with zone_id + kind="zone" +
# phase="http_request_firewall_custom"; `rules` is a repeatable block with
# action/expression/description/enabled. actions "block" and
# "managed_challenge" are both valid. List references in WAF expressions use
# the list NAME with a `$` prefix (NOT the list id): ($crowdsec_ban).
# Rule order matters ban (block) is evaluated before captcha so a
# double-listed IP is blocked outright (the sync script also enforces
# ban-wins, so an IP is never in both lists, but order is belt-and-braces).
#
# zone_id is the viktorbarzin.me zone the single zone id used repo-wide
# (default of var.cloudflare_zone_id in modules/kubernetes/ingress_factory and
# hardcoded the same in stacks/kms/main.tf; source of truth is the git-crypt'd
# config.tfvars). Hardcoded here (with the conventional marker comment) because
# the rybbit stack does not import the ingress_factory module.
# -----------------------------------------------------------------------------
resource "cloudflare_ruleset" "crowdsec" {
zone_id = "fd2c5dd4efe8fe38958944e74d0ced6d" # cloudflare_zone_id (viktorbarzin.me)
name = "crowdsec-ip-enforcement"
description = "Block/challenge IPs CrowdSec flagged (synced from LAPI into CF IP Lists)"
kind = "zone"
phase = "http_request_firewall_custom"
rules {
action = "block"
expression = "(ip.src in $crowdsec_ban)"
description = "CrowdSec: block banned IPs"
enabled = true
}
rules {
action = "managed_challenge"
expression = "(ip.src in $crowdsec_captcha)"
description = "CrowdSec: managed-challenge captcha'd IPs"
enabled = true
}
}
# -----------------------------------------------------------------------------
# Least-privilege API token for the sync job: account-level Filter-Lists edit
# ONLY, scoped to this single account (no zone/DNS/Workers access). The token
# value is sensitive and lands in TF state (Tier-1 PG, encrypted at rest) and
# in the rybbit Secret below same trust level as the CF Global API Key
# already in state.
#
# ### VERIFY (v4.52.7): cloudflare_api_token with a repeatable `policy` block
# (effect / permission_groups = Set of String / resources = Map of String);
# token secret is exposed as `.value` (sensitive).
#
# ### VERIFY PERMISSION GROUP NAME (highest-risk item). v4.52.7 deprecates
# the flat `.permissions[...]` map ("some permissions overlap resource
# scope"); the non-deprecated lookup is the scoped `.account[...]` map.
# Cloudflare's current permissions reference calls the account list-edit
# group "Account Filter Lists Edit" (and read "Account Filter Lists Read").
# An OLDER community gist instead shows "Account Rule Lists Read/Write"
# Cloudflare has renamed this group over time. If `tg plan` errors with a
# missing key, try (in order): .account["Account Filter Lists Edit"] ->
# .account["Account Rule Lists Write"], or enumerate the live names with:
# terraform console
# > data.cloudflare_api_token_permission_groups.all.account
# Read is not strictly required for edit (Edit = full CRUDL) but the sync
# job GETs items, so we include Read too to be safe.
# -----------------------------------------------------------------------------
data "cloudflare_api_token_permission_groups" "all" {}
resource "cloudflare_api_token" "list_sync" {
name = "rybbit-crowdsec-list-sync"
policy {
effect = "allow"
permission_groups = [
data.cloudflare_api_token_permission_groups.all.account["Account Rule Lists Write"],
data.cloudflare_api_token_permission_groups.all.account["Account Rule Lists Read"],
]
resources = {
"com.cloudflare.api.account.${local.cf_account_id}" = "*"
}
}
}
# -----------------------------------------------------------------------------
# Pure-stdlib sync script, mounted into the CronJob from a ConfigMap (the
# alert_digest pattern no per-run package installs).
# -----------------------------------------------------------------------------
resource "kubernetes_config_map" "crowdsec_cf_sync_script" {
metadata {
name = "crowdsec-cf-sync-script"
namespace = "rybbit"
}
data = {
"lapi_kv_sync.py" = file("${path.module}/lapi_kv_sync.py")
}
}
# Secrets consumed by the sync job: the LAPI bouncer key (registered in LAPI,
# stored in Vault secret/platform -> kvsync_bouncer_key) and the minted CF
# token value. Account id and list ids are NOT secret and are passed as plain
# env values on the CronJob.
resource "kubernetes_secret" "crowdsec_cf_sync" {
metadata {
name = "crowdsec-cf-sync"
namespace = "rybbit"
}
type = "Opaque"
data = {
LAPI_KEY = data.vault_kv_secret_v2.cf_platform.data["kvsync_bouncer_key"]
CF_API_TOKEN = cloudflare_api_token.list_sync.value
}
}
resource "kubernetes_cron_job_v1" "crowdsec_cf_sync" {
metadata {
name = "crowdsec-cf-sync"
namespace = "rybbit"
labels = {
app = "crowdsec-cf-sync"
tier = local.tiers.aux
}
}
spec {
concurrency_policy = "Forbid"
failed_jobs_history_limit = 3
successful_jobs_history_limit = 3
schedule = "*/2 * * * *"
starting_deadline_seconds = 110
job_template {
metadata {}
spec {
backoff_limit = 2
ttl_seconds_after_finished = 3600
template {
metadata {
labels = {
app = "crowdsec-cf-sync"
}
}
spec {
restart_policy = "OnFailure"
container {
name = "crowdsec-cf-sync"
image = "docker.io/library/python:3.12-alpine"
image_pull_policy = "IfNotPresent"
command = ["python3", "/scripts/lapi_kv_sync.py"]
env {
name = "LAPI_KEY"
value_from {
secret_key_ref {
name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name
key = "LAPI_KEY"
}
}
}
env {
name = "CF_API_TOKEN"
value_from {
secret_key_ref {
name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name
key = "CF_API_TOKEN"
}
}
}
env {
name = "CF_ACCOUNT_ID"
value = local.cf_account_id
}
env {
name = "CF_BAN_LIST_ID"
value = cloudflare_list.crowdsec_ban.id
}
env {
name = "CF_CAPTCHA_LIST_ID"
value = cloudflare_list.crowdsec_captcha.id
}
env {
name = "PUSHGATEWAY_URL"
value = "http://prometheus-prometheus-pushgateway.monitoring:9091"
}
volume_mount {
name = "script"
mount_path = "/scripts"
read_only = true
}
resources {
requests = {
cpu = "10m"
memory = "48Mi"
}
limits = {
memory = "96Mi"
}
}
}
volume {
name = "script"
config_map {
name = kubernetes_config_map.crowdsec_cf_sync_script.metadata[0].name
}
}
dns_config {
option {
name = "ndots"
value = "2"
}
}
}
}
}
}
}
lifecycle {
# KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2
ignore_changes = [spec[0].job_template[0].spec[0].template[0].spec[0].dns_config]
}
}

View file

@ -0,0 +1,318 @@
#!/usr/bin/env python3
"""Sync CrowdSec LAPI decisions -> two Cloudflare account IP Lists.
Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec
bouncer (which keys on the client IP Traefik sees) never decides on them. We
push the decisions into the edge instead: a zone-scoped WAF custom rule blocks
`(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)`
across EVERY proxied host in the zone. This job is the control plane that keeps
those two IP Lists in sync with LAPI.
(Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior
Workers-KV design; it no longer touches KV it reconciles CF Rules Lists.)
Design notes:
* Pure Python stdlib (no pip/apk at runtime) runs on stock python:3.12-alpine
mounted from a ConfigMap, the alert_digest pattern.
* FULL RECONCILE each run: read the complete decision set from LAPI, partition
into ban / captcha desired sets, then for each list compute add (desired -
existing) and remove (existing - desired) and apply both. An IP listed for
BOTH ban and captcha is placed in BAN ONLY (ban wins; the WAF rule order
also blocks-before-challenges as belt-and-braces). A `cscli decisions
delete` therefore clears from the edge within one interval (<=2 min).
* FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (lists untouched,
exit 0). A LAPI outage thus freezes the edge state rather than wiping every
ban degrade toward the last-known-good block set, never toward all-block
or a thundering un-ban. (Decisions linger only until the next successful
sync, not their TTL we reconcile to LAPI truth, we don't expire entries.)
* FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits
non-zero so the failure is visible (CronJob backoff + missing success
metric + the next run retries).
Cloudflare Rules-Lists API (account-level IP list items), verified against the
official API reference (developers.cloudflare.com, 2026):
* GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page
cursor at result_info.cursors.after, passed back as ?cursor=. Each
item = {"id","ip","created_on",...}.
* POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY
[{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list).
ASYNCHRONOUS: returns {"result":{"operation_id":...}}.
* DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id":
"<item_id>"},...]} (delete by item id, not ip). ASYNCHRONOUS.
* GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in
{pending,running,completed,failed} (failed carries `error`).
ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT.
So we must NOT fire add+delete (or both lists) concurrently we serialize and
poll each operation_id to a terminal state (short bounded timeout) before the
next mutation. If a poll times out we stop mutating for this run and report
partial success (the next 2-min run reconciles the rest); we never abandon an
in-flight op and immediately issue another (that would 409/reject).
"""
import json
import os
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
LAPI_URL = os.environ.get(
"LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080"
).rstrip("/")
LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI
CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"]
CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit
CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"]
CF_CAPTCHA_LIST_ID = os.environ["CF_CAPTCHA_LIST_ID"]
PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional
CF_API = "https://api.cloudflare.com/client/v4"
# Cloudflare item objects expose the ip differently between list and create
# responses; for an IP-kind list each GET item carries a top-level "ip".
# Batch sizes: no official per-request cap is documented, so keep batches
# generous but bounded (well under the global 1200 req / 5 min limit).
BATCH = 1000
# Async op polling: 1 pending bulk op per account, so poll to terminal state.
POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget)
POLL_INTERVAL = 1.0
class CFError(Exception):
"""Cloudflare API failure -> job should exit non-zero (fail loud)."""
def _req(url, *, method="GET", headers=None, data=None, timeout=20):
req = urllib.request.Request(url, method=method, headers=headers or {}, data=data)
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read()
return json.loads(body) if body else None
def _cf(url, *, method="GET", payload=None, timeout=20):
"""Call the CF API with the bearer token; raise CFError on any failure."""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}"}
data = None
if payload is not None:
headers["Content-Type"] = "application/json"
data = json.dumps(payload).encode()
try:
res = _req(url, method=method, headers=headers, data=data, timeout=timeout)
except urllib.error.HTTPError as e:
detail = ""
try:
detail = e.read().decode(errors="replace")[:500]
except Exception:
pass
raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e
except urllib.error.URLError as e:
raise CFError(f"{method} {url} -> {e}") from e
if res is not None and not res.get("success", True):
raise CFError(f"{method} {url} -> not success: {res.get('errors')}")
return res
# --------------------------------------------------------------------------- #
# LAPI
# --------------------------------------------------------------------------- #
def fetch_decisions():
"""Return (ban_set, captcha_set) of IPs from LAPI.
Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). An
IP appearing in both ban and captcha is placed in BAN only. Raises on
transport/HTTP error so the caller can SKIP the run (fail-safe).
"""
data = _req(
f"{LAPI_URL}/v1/decisions",
headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"},
)
ban, captcha = set(), set()
for d in data or []:
if (d.get("scope") or "").lower() != "ip":
continue
ip = d.get("value")
if not ip:
continue
dtype = (d.get("type") or "").lower()
if dtype == "ban":
ban.add(ip)
elif dtype == "captcha":
captcha.add(ip)
# other remediation types (e.g. throttle) are ignored
captcha -= ban # ban wins: never list the same IP in both
return ban, captcha
# --------------------------------------------------------------------------- #
# Cloudflare list items
# --------------------------------------------------------------------------- #
def cf_list_items(list_id):
"""Return {ip: item_id} for every item currently in the list (paginated)."""
out = {}
cursor = ""
while True:
url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000"
if cursor:
url += f"&cursor={urllib.parse.quote(cursor)}"
res = _cf(url)
for it in (res.get("result") or []):
ip = it.get("ip")
if ip:
out[ip] = it.get("id")
cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or ""
if not cursor:
return out
def _wait_for_op(op_id):
"""Poll a bulk operation to a terminal state. Returns True if completed,
False if it timed out (still pending/running). Raises CFError if it failed.
We must reach a terminal state before issuing the next mutation: CF allows
only one pending bulk op per account, so firing another while this is
in-flight would be rejected.
"""
if not op_id:
return True
deadline = time.time() + POLL_TIMEOUT
url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}"
while time.time() < deadline:
res = _cf(url)
status = ((res.get("result") or {}).get("status") or "").lower()
if status == "completed":
return True
if status == "failed":
raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}")
time.sleep(POLL_INTERVAL)
print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; "
f"stopping further mutations this run (next run reconciles)", file=sys.stderr)
return False
def _op_id(res):
return ((res or {}).get("result") or {}).get("operation_id")
def cf_add_items(list_id, ips):
"""POST new IPs to the list (append). Returns the operation_id (async)."""
# If callers ever exceed one batch, each POST is a separate bulk op and the
# single-pending-op rule forces us to wait between them.
last_op = None
for i in range(0, len(ips), BATCH):
chunk = [{"ip": ip} for ip in ips[i : i + BATCH]]
res = _cf(
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items",
method="POST",
payload=chunk,
)
last_op = _op_id(res)
if i + BATCH < len(ips): # more chunks coming -> serialize
if not _wait_for_op(last_op):
return last_op # timed out; bail (partial), next run continues
return last_op
def cf_delete_items(list_id, item_ids):
"""DELETE items by id. Returns the operation_id (async)."""
last_op = None
for i in range(0, len(item_ids), BATCH):
chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]}
res = _cf(
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items",
method="DELETE",
payload=chunk,
)
last_op = _op_id(res)
if i + BATCH < len(item_ids):
if not _wait_for_op(last_op):
return last_op
return last_op
def reconcile(label, list_id, desired):
"""Reconcile one list to `desired` (a set of IPs).
Returns (added, removed). Serializes add->wait->delete so we respect the
one-pending-bulk-op-per-account limit. Raises CFError on hard failure.
"""
existing = cf_list_items(list_id) # {ip: item_id}
existing_ips = set(existing)
to_add = sorted(desired - existing_ips)
to_remove_ids = [existing[ip] for ip in (existing_ips - desired)]
added = removed = 0
if to_add:
op = cf_add_items(list_id, to_add)
if _wait_for_op(op):
added = len(to_add)
else:
# add op didn't finish; skip the delete this run to avoid stacking a
# second pending op, and report what we attempted.
print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr)
return added, removed
if to_remove_ids:
op = cf_delete_items(list_id, to_remove_ids)
if _wait_for_op(op):
removed = len(to_remove_ids)
print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})")
return added, removed
# --------------------------------------------------------------------------- #
# Metrics (best-effort)
# --------------------------------------------------------------------------- #
def push_metrics(ban_n, captcha_n, ok):
if not PUSHGATEWAY:
return
payload = (
"# TYPE crowdsec_cf_list_ban_count gauge\n"
f"crowdsec_cf_list_ban_count {ban_n}\n"
"# TYPE crowdsec_cf_list_captcha_count gauge\n"
f"crowdsec_cf_list_captcha_count {captcha_n}\n"
"# TYPE crowdsec_cf_list_sync_success gauge\n"
f"crowdsec_cf_list_sync_success {1 if ok else 0}\n"
"# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n"
f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n"
)
try:
_req(
f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync",
method="PUT",
headers={"Content-Type": "text/plain"},
data=payload.encode(),
timeout=10,
)
except Exception as e: # metrics are best-effort, never fail the job
print(f"[warn] pushgateway: {e}", file=sys.stderr)
# --------------------------------------------------------------------------- #
def main():
# 1. Desired state from LAPI. Any failure here = SKIP (fail-safe).
try:
ban, captcha = fetch_decisions()
except Exception as e:
print(
f"[skip] LAPI unreadable ({e}); leaving CF lists untouched "
f"(fail-safe: freeze last-known edge state).",
file=sys.stderr,
)
push_metrics(0, 0, ok=False)
return 0
print(f"[info] LAPI desired: {len(ban)} ban / {len(captcha)} captcha (ip-scope)")
# 2. Reconcile both lists. CF errors fail loud (non-zero exit).
try:
reconcile("ban", CF_BAN_LIST_ID, ban)
reconcile("captcha", CF_CAPTCHA_LIST_ID, captcha)
except CFError as e:
print(f"[error] Cloudflare API failure: {e}", file=sys.stderr)
push_metrics(len(ban), len(captcha), ok=False)
return 1
push_metrics(len(ban), len(captcha), ok=True)
return 0
if __name__ == "__main__":
sys.exit(main())