rybbit: proxied CrowdSec enforcement via Cloudflare IP Lists + WAF rule

Replaces the Worker+KV approach (which only covered the ~27 routed hosts) with a
zone-wide mechanism that covers ALL proxied hosts: two CF account IP Lists
(crowdsec_ban, crowdsec_captcha) + one zone WAF custom rule that blocks
`(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)`.
No per-request Worker, no cookie machinery — the rybbit Worker stays
analytics-only. lapi_kv_sync.py now full-reconciles the two lists from LAPI
(fail-safe: a LAPI blip skips the run and freezes the last-known-good block set;
serializes CF bulk ops since CF allows one pending op per account). A
least-privilege CF API token (Account Filter Lists Edit) is minted in TF.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-20 09:18:33 +00:00
parent 7e646e1c7c
commit cc4bfb593b
2 changed files with 542 additions and 114 deletions

View file

@ -0,0 +1,299 @@
# =============================================================================
# CrowdSec edge enforcement for Cloudflare-PROXIED hosts control plane
# =============================================================================
# Proxied hosts terminate at the Cloudflare edge, so the in-cluster CrowdSec
# bouncer (which keys on the real client IP seen by Traefik) never gets to
# decide on them. To enforce CrowdSec bans/captchas on proxied traffic we push
# the decision INTO the Cloudflare edge as account-level IP Lists + a single
# zone-scoped WAF custom rule:
#
# * Two account IP Lists `crowdsec_ban` and `crowdsec_captcha` hold the
# banned / captcha'd source IPs (empty in TF; populated at runtime).
# * A zone-scoped WAF ruleset in the http_request_firewall_custom phase
# blocks `(ip.src in $crowdsec_ban)` and managed-challenges
# `(ip.src in $crowdsec_captcha)`. Because it's a ZONE rule it enforces
# across ALL proxied hosts in the zone (~135), not just the handful a
# Worker would route. (The previous Worker+KV design only covered the ~27
# hosts the rybbit Worker routed; the analytics Worker in worker/ is
# unrelated and stays.)
#
# This file is the CONTROL PLANE that keeps those lists in sync with LAPI:
# 1. the two empty IP Lists (list ITEMS are owned by the CronJob at runtime,
# NOT by Terraform see the lifecycle ignore_changes on `item`),
# 2. a LEAST-PRIVILEGE Cloudflare API token (account Filter-Lists edit only,
# scoped to this account) the sync job authenticates with,
# 3. a CronJob running lapi_kv_sync.py every 2 min to full-reconcile LAPI
# decisions into the two lists (mirrors monitoring/alert_digest.tf: stock
# python:3.12-alpine + pure-stdlib script from a ConfigMap, no pip/apk at
# runtime).
#
# Cloudflare provider is pinned v4.52.7 (~> 4) v4 schema is used throughout
# (v5 differs greatly: policy is a block here not a `policies = [...]` list;
# resources is a map not a jsonencode'd string; ruleset `rules` is a repeatable
# block; list items use `item { value { ip = ... } }`; permission groups are
# looked up via data.cloudflare_api_token_permission_groups, not a v5 *_list
# data source). context7 only indexes v5, so the v4 arguments below were
# verified against the v4.52.7 provider docs (github tag v4.52.7) items
# FLAGGED ### VERIFY for tg-plan are noted inline.
# =============================================================================
data "cloudflare_accounts" "main" {}
locals {
cf_account_id = data.cloudflare_accounts.main.accounts[0].id
}
# -----------------------------------------------------------------------------
# IP Lists empty shells. The CronJob owns the items at runtime via the CF
# Rules-Lists API; TF must NOT manage items or every 2-min sync would fight the
# next `terragrunt apply` (apply would try to delete the runtime items).
#
# ### VERIFY (v4.52.7): cloudflare_list args account_id/name/kind/description;
# kind="ip" is one of {ip, redirect, hostname, asn}. The optional items
# block is named `item` (singular, Block Set) with `item { value { ip=... }
# comment=... }`. We declare NO `item` blocks (empty list) and
# ignore_changes=[item] so runtime items don't show as drift.
# NOTE: list `name` must match /^[a-zA-Z0-9_]+$/ (underscores ok, no dashes)
# hence crowdsec_ban / crowdsec_captcha (underscore, not dash).
# -----------------------------------------------------------------------------
resource "cloudflare_list" "crowdsec_ban" {
account_id = local.cf_account_id
name = "crowdsec_ban"
kind = "ip"
description = "CrowdSec ban decisions (synced from LAPI)"
lifecycle {
# The crowdsec-cf-sync CronJob adds/removes items at runtime; TF owns only
# the empty list shell. Without this, every apply would delete live bans.
ignore_changes = [item]
}
}
resource "cloudflare_list" "crowdsec_captcha" {
account_id = local.cf_account_id
name = "crowdsec_captcha"
kind = "ip"
description = "CrowdSec captcha decisions (synced from LAPI)"
lifecycle {
ignore_changes = [item]
}
}
# -----------------------------------------------------------------------------
# Zone-scoped WAF custom ruleset the actual enforcement. One ruleset, two
# rules, applied to EVERY proxied host in the zone.
#
# ### VERIFY (v4.52.7): cloudflare_ruleset with zone_id + kind="zone" +
# phase="http_request_firewall_custom"; `rules` is a repeatable block with
# action/expression/description/enabled. actions "block" and
# "managed_challenge" are both valid. List references in WAF expressions use
# the list NAME with a `$` prefix (NOT the list id): ($crowdsec_ban).
# Rule order matters ban (block) is evaluated before captcha so a
# double-listed IP is blocked outright (the sync script also enforces
# ban-wins, so an IP is never in both lists, but order is belt-and-braces).
#
# zone_id is the viktorbarzin.me zone the single zone id used repo-wide
# (default of var.cloudflare_zone_id in modules/kubernetes/ingress_factory and
# hardcoded the same in stacks/kms/main.tf; source of truth is the git-crypt'd
# config.tfvars). Hardcoded here (with the conventional marker comment) because
# the rybbit stack does not import the ingress_factory module.
# -----------------------------------------------------------------------------
resource "cloudflare_ruleset" "crowdsec" {
zone_id = "fd2c5dd4efe8fe38958944e74d0ced6d" # cloudflare_zone_id (viktorbarzin.me)
name = "crowdsec-ip-enforcement"
description = "Block/challenge IPs CrowdSec flagged (synced from LAPI into CF IP Lists)"
kind = "zone"
phase = "http_request_firewall_custom"
rules {
action = "block"
expression = "(ip.src in $crowdsec_ban)"
description = "CrowdSec: block banned IPs"
enabled = true
}
rules {
action = "managed_challenge"
expression = "(ip.src in $crowdsec_captcha)"
description = "CrowdSec: managed-challenge captcha'd IPs"
enabled = true
}
}
# -----------------------------------------------------------------------------
# Least-privilege API token for the sync job: account-level Filter-Lists edit
# ONLY, scoped to this single account (no zone/DNS/Workers access). The token
# value is sensitive and lands in TF state (Tier-1 PG, encrypted at rest) and
# in the rybbit Secret below same trust level as the CF Global API Key
# already in state.
#
# ### VERIFY (v4.52.7): cloudflare_api_token with a repeatable `policy` block
# (effect / permission_groups = Set of String / resources = Map of String);
# token secret is exposed as `.value` (sensitive).
#
# ### VERIFY PERMISSION GROUP NAME (highest-risk item). v4.52.7 deprecates
# the flat `.permissions[...]` map ("some permissions overlap resource
# scope"); the non-deprecated lookup is the scoped `.account[...]` map.
# Cloudflare's current permissions reference calls the account list-edit
# group "Account Filter Lists Edit" (and read "Account Filter Lists Read").
# An OLDER community gist instead shows "Account Rule Lists Read/Write"
# Cloudflare has renamed this group over time. If `tg plan` errors with a
# missing key, try (in order): .account["Account Filter Lists Edit"] ->
# .account["Account Rule Lists Write"], or enumerate the live names with:
# terraform console
# > data.cloudflare_api_token_permission_groups.all.account
# Read is not strictly required for edit (Edit = full CRUDL) but the sync
# job GETs items, so we include Read too to be safe.
# -----------------------------------------------------------------------------
data "cloudflare_api_token_permission_groups" "all" {}
resource "cloudflare_api_token" "list_sync" {
name = "rybbit-crowdsec-list-sync"
policy {
effect = "allow"
permission_groups = [
data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Edit"],
data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Read"],
]
resources = {
"com.cloudflare.api.account.${local.cf_account_id}" = "*"
}
}
}
# -----------------------------------------------------------------------------
# Pure-stdlib sync script, mounted into the CronJob from a ConfigMap (the
# alert_digest pattern no per-run package installs).
# -----------------------------------------------------------------------------
resource "kubernetes_config_map" "crowdsec_cf_sync_script" {
metadata {
name = "crowdsec-cf-sync-script"
namespace = "rybbit"
}
data = {
"lapi_kv_sync.py" = file("${path.module}/lapi_kv_sync.py")
}
}
# Secrets consumed by the sync job: the LAPI bouncer key (registered in LAPI,
# stored in Vault secret/platform -> kvsync_bouncer_key) and the minted CF
# token value. Account id and list ids are NOT secret and are passed as plain
# env values on the CronJob.
resource "kubernetes_secret" "crowdsec_cf_sync" {
metadata {
name = "crowdsec-cf-sync"
namespace = "rybbit"
}
type = "Opaque"
data = {
LAPI_KEY = data.vault_kv_secret_v2.cf_platform.data["kvsync_bouncer_key"]
CF_API_TOKEN = cloudflare_api_token.list_sync.value
}
}
resource "kubernetes_cron_job_v1" "crowdsec_cf_sync" {
metadata {
name = "crowdsec-cf-sync"
namespace = "rybbit"
labels = {
app = "crowdsec-cf-sync"
tier = local.tiers.aux
}
}
spec {
concurrency_policy = "Forbid"
failed_jobs_history_limit = 3
successful_jobs_history_limit = 3
schedule = "*/2 * * * *"
starting_deadline_seconds = 110
job_template {
metadata {}
spec {
backoff_limit = 2
ttl_seconds_after_finished = 3600
template {
metadata {
labels = {
app = "crowdsec-cf-sync"
}
}
spec {
restart_policy = "OnFailure"
container {
name = "crowdsec-cf-sync"
image = "docker.io/library/python:3.12-alpine"
image_pull_policy = "IfNotPresent"
command = ["python3", "/scripts/lapi_kv_sync.py"]
env {
name = "LAPI_KEY"
value_from {
secret_key_ref {
name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name
key = "LAPI_KEY"
}
}
}
env {
name = "CF_API_TOKEN"
value_from {
secret_key_ref {
name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name
key = "CF_API_TOKEN"
}
}
}
env {
name = "CF_ACCOUNT_ID"
value = local.cf_account_id
}
env {
name = "CF_BAN_LIST_ID"
value = cloudflare_list.crowdsec_ban.id
}
env {
name = "CF_CAPTCHA_LIST_ID"
value = cloudflare_list.crowdsec_captcha.id
}
env {
name = "PUSHGATEWAY_URL"
value = "http://prometheus-prometheus-pushgateway.monitoring:9091"
}
volume_mount {
name = "script"
mount_path = "/scripts"
read_only = true
}
resources {
requests = {
cpu = "10m"
memory = "48Mi"
}
limits = {
memory = "96Mi"
}
}
}
volume {
name = "script"
config_map {
name = kubernetes_config_map.crowdsec_cf_sync_script.metadata[0].name
}
}
dns_config {
option {
name = "ndots"
value = "2"
}
}
}
}
}
}
}
lifecycle {
# KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2
ignore_changes = [spec[0].job_template[0].spec[0].template[0].spec[0].dns_config]
}
}

View file

@ -1,48 +1,84 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Sync CrowdSec LAPI decisions -> Cloudflare Workers KV. """Sync CrowdSec LAPI decisions -> two Cloudflare account IP Lists.
The rybbit edge Worker enforces CrowdSec bans/captchas for Cloudflare-PROXIED Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec
hosts by reading an edge-local KV entry (`ip:<addr>` -> `ban`|`captcha`) on each bouncer (which keys on the client IP Traefik sees) never decides on them. We
request. This job is the control plane that keeps that KV in sync with LAPI: push the decisions into the edge instead: a zone-scoped WAF custom rule blocks
the request path NEVER calls LAPI (no per-request hop) exactly the nginx `(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)`
"tail logs -> inject rules" model, just projected onto Workers KV instead of across EVERY proxied host in the zone. This job is the control plane that keeps
nftables. those two IP Lists in sync with LAPI.
(Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior
Workers-KV design; it no longer touches KV it reconciles CF Rules Lists.)
Design notes: Design notes:
* Pure Python stdlib (no pip/apk at runtime) runs on stock python:3.12-alpine * Pure Python stdlib (no pip/apk at runtime) runs on stock python:3.12-alpine
mounted from a ConfigMap, the alert_digest pattern (avoids the disk mounted from a ConfigMap, the alert_digest pattern.
anti-pattern of installing packages every run). * FULL RECONCILE each run: read the complete decision set from LAPI, partition
* FULL RECONCILE each run: read the complete current decision set from LAPI, into ban / captcha desired sets, then for each list compute add (desired -
compute the desired KV state, then upsert present keys and delete stale ones. existing) and remove (existing - desired) and apply both. An IP listed for
This makes an un-ban (cscli decisions delete) clear from the edge within one BOTH ban and captcha is placed in BAN ONLY (ban wins; the WAF rule order
interval instead of lingering until the original TTL important for getting also blocks-before-challenges as belt-and-braces). A `cscli decisions
a false-positive un-blocked fast. delete` therefore clears from the edge within one interval (<=2 min).
* FAIL-SAFE: if LAPI can't be read, we SKIP the run (leave KV untouched) rather * FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (lists untouched,
than wipe every ban. Existing KV entries then simply expire by their TTL, so exit 0). A LAPI outage thus freezes the edge state rather than wiping every
a LAPI outage degrades toward fail-OPEN, never toward a stale all-block. ban degrade toward the last-known-good block set, never toward all-block
* Only Ip-scope ban/captcha decisions are projected. Range-scope and other or a thundering un-ban. (Decisions linger only until the next successful
remediations are ignored (the Worker keys on a single IP). sync, not their TTL we reconcile to LAPI truth, we don't expire entries.)
* FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits
non-zero so the failure is visible (CronJob backoff + missing success
metric + the next run retries).
Cloudflare Rules-Lists API (account-level IP list items), verified against the
official API reference (developers.cloudflare.com, 2026):
* GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page
cursor at result_info.cursors.after, passed back as ?cursor=. Each
item = {"id","ip","created_on",...}.
* POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY
[{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list).
ASYNCHRONOUS: returns {"result":{"operation_id":...}}.
* DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id":
"<item_id>"},...]} (delete by item id, not ip). ASYNCHRONOUS.
* GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in
{pending,running,completed,failed} (failed carries `error`).
ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT.
So we must NOT fire add+delete (or both lists) concurrently we serialize and
poll each operation_id to a terminal state (short bounded timeout) before the
next mutation. If a poll times out we stop mutating for this run and report
partial success (the next 2-min run reconciles the rest); we never abandon an
in-flight op and immediately issue another (that would 409/reject).
""" """
import json import json
import os import os
import re
import sys import sys
import time import time
import urllib.error import urllib.error
import urllib.parse import urllib.parse
import urllib.request import urllib.request
LAPI_URL = os.environ.get("LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080").rstrip("/") LAPI_URL = os.environ.get(
"LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080"
).rstrip("/")
LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI
CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"]
CF_NAMESPACE_ID = os.environ["CF_KV_NAMESPACE_ID"] CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit
CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Workers KV Storage:Edit CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"]
CF_CAPTCHA_LIST_ID = os.environ["CF_CAPTCHA_LIST_ID"]
PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional
KEY_PREFIX = "ip:"
# CrowdSec remediation type -> KV value the Worker understands.
TYPE_MAP = {"ban": "ban", "captcha": "captcha"}
CF_API = "https://api.cloudflare.com/client/v4" CF_API = "https://api.cloudflare.com/client/v4"
MIN_TTL = 60 # Cloudflare KV minimum expiration_ttl (seconds) # Cloudflare item objects expose the ip differently between list and create
# responses; for an IP-kind list each GET item carries a top-level "ip".
# Batch sizes: no official per-request cap is documented, so keep batches
# generous but bounded (well under the global 1200 req / 5 min limit).
BATCH = 1000
# Async op polling: 1 pending bulk op per account, so poll to terminal state.
POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget)
POLL_INTERVAL = 1.0
class CFError(Exception):
"""Cloudflare API failure -> job should exit non-zero (fail loud)."""
def _req(url, *, method="GET", headers=None, data=None, timeout=20): def _req(url, *, method="GET", headers=None, data=None, timeout=20):
@ -52,136 +88,229 @@ def _req(url, *, method="GET", headers=None, data=None, timeout=20):
return json.loads(body) if body else None return json.loads(body) if body else None
def parse_duration_seconds(dur): def _cf(url, *, method="GET", payload=None, timeout=20):
"""Parse a Go duration string (e.g. '167h59m51.5s') into whole seconds. """Call the CF API with the bearer token; raise CFError on any failure."""
headers = {"Authorization": f"Bearer {CF_API_TOKEN}"}
LAPI returns the REMAINING duration of each decision here. We floor to the data = None
second and clamp to Cloudflare's 60s minimum so the edge entry outlives the if payload is not None:
next sync interval. headers["Content-Type"] = "application/json"
""" data = json.dumps(payload).encode()
if not dur: try:
return MIN_TTL res = _req(url, method=method, headers=headers, data=data, timeout=timeout)
dur = dur.strip().lstrip("+") except urllib.error.HTTPError as e:
if dur.startswith("-"): # already expired; give it the floor and move on detail = ""
return MIN_TTL try:
total = 0.0 detail = e.read().decode(errors="replace")[:500]
for value, unit in re.findall(r"(\d+(?:\.\d+)?)(h|m|s|ms|us|µs|ns)", dur): except Exception:
v = float(value) pass
total += {"h": 3600, "m": 60, "s": 1, "ms": 1e-3, "us": 1e-6, "µs": 1e-6, "ns": 1e-9}[unit] * v raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e
return max(MIN_TTL, int(total)) except urllib.error.URLError as e:
raise CFError(f"{method} {url} -> {e}") from e
if res is not None and not res.get("success", True):
raise CFError(f"{method} {url} -> not success: {res.get('errors')}")
return res
# --------------------------------------------------------------------------- #
# LAPI
# --------------------------------------------------------------------------- #
def fetch_decisions(): def fetch_decisions():
"""Return desired KV state {('ip:<addr>'): (value, ttl_seconds)} from LAPI. """Return (ban_set, captcha_set) of IPs from LAPI.
Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). An
IP appearing in both ban and captcha is placed in BAN only. Raises on
transport/HTTP error so the caller can SKIP the run (fail-safe).
""" """
out = {} data = _req(
data = _req(f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}) f"{LAPI_URL}/v1/decisions",
headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"},
)
ban, captcha = set(), set()
for d in data or []: for d in data or []:
if (d.get("scope") or "").lower() != "ip": if (d.get("scope") or "").lower() != "ip":
continue continue
value = TYPE_MAP.get((d.get("type") or "").lower())
if not value:
continue
ip = d.get("value") ip = d.get("value")
if not ip: if not ip:
continue continue
key = KEY_PREFIX + ip dtype = (d.get("type") or "").lower()
ttl = parse_duration_seconds(d.get("duration")) if dtype == "ban":
# If the same IP carries multiple decisions, ban beats captcha and the ban.add(ip)
# longest TTL wins. elif dtype == "captcha":
if key in out: captcha.add(ip)
prev_val, prev_ttl = out[key] # other remediation types (e.g. throttle) are ignored
value = "ban" if "ban" in (value, prev_val) else value captcha -= ban # ban wins: never list the same IP in both
ttl = max(ttl, prev_ttl) return ban, captcha
out[key] = (value, ttl)
# --------------------------------------------------------------------------- #
# Cloudflare list items
# --------------------------------------------------------------------------- #
def cf_list_items(list_id):
"""Return {ip: item_id} for every item currently in the list (paginated)."""
out = {}
cursor = ""
while True:
url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000"
if cursor:
url += f"&cursor={urllib.parse.quote(cursor)}"
res = _cf(url)
for it in (res.get("result") or []):
ip = it.get("ip")
if ip:
out[ip] = it.get("id")
cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or ""
if not cursor:
return out return out
def cf_list_keys(): def _wait_for_op(op_id):
"""List existing `ip:` keys currently in the KV namespace (paginated).""" """Poll a bulk operation to a terminal state. Returns True if completed,
keys = [] False if it timed out (still pending/running). Raises CFError if it failed.
cursor = ""
while True: We must reach a terminal state before issuing the next mutation: CF allows
url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/keys?prefix={KEY_PREFIX}&limit=1000" only one pending bulk op per account, so firing another while this is
if cursor: in-flight would be rejected.
url += f"&cursor={urllib.parse.quote(cursor)}" """
res = _req(url, headers={"Authorization": f"Bearer {CF_API_TOKEN}"}) if not op_id:
keys.extend(k["name"] for k in (res.get("result") or [])) return True
cursor = (res.get("result_info") or {}).get("cursor") or "" deadline = time.time() + POLL_TIMEOUT
if not cursor: url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}"
return keys while time.time() < deadline:
res = _cf(url)
status = ((res.get("result") or {}).get("status") or "").lower()
if status == "completed":
return True
if status == "failed":
raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}")
time.sleep(POLL_INTERVAL)
print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; "
f"stopping further mutations this run (next run reconciles)", file=sys.stderr)
return False
def cf_bulk_put(items): def _op_id(res):
"""items: list of (key, value, ttl). Cloudflare bulk PUT (<=10000/call).""" return ((res or {}).get("result") or {}).get("operation_id")
for i in range(0, len(items), 10000):
chunk = [{"key": k, "value": v, "expiration_ttl": t} for k, v, t in items[i:i + 10000]]
_req(
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk",
method="PUT",
headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"},
data=json.dumps(chunk).encode(),
)
def cf_bulk_delete(keys): def cf_add_items(list_id, ips):
for i in range(0, len(keys), 10000): """POST new IPs to the list (append). Returns the operation_id (async)."""
_req( # If callers ever exceed one batch, each POST is a separate bulk op and the
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk/delete", # single-pending-op rule forces us to wait between them.
last_op = None
for i in range(0, len(ips), BATCH):
chunk = [{"ip": ip} for ip in ips[i : i + BATCH]]
res = _cf(
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items",
method="POST", method="POST",
headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, payload=chunk,
data=json.dumps(list(keys)).encode(),
) )
last_op = _op_id(res)
if i + BATCH < len(ips): # more chunks coming -> serialize
if not _wait_for_op(last_op):
return last_op # timed out; bail (partial), next run continues
return last_op
def push_metrics(synced, deleted, ok): def cf_delete_items(list_id, item_ids):
"""DELETE items by id. Returns the operation_id (async)."""
last_op = None
for i in range(0, len(item_ids), BATCH):
chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]}
res = _cf(
f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items",
method="DELETE",
payload=chunk,
)
last_op = _op_id(res)
if i + BATCH < len(item_ids):
if not _wait_for_op(last_op):
return last_op
return last_op
def reconcile(label, list_id, desired):
"""Reconcile one list to `desired` (a set of IPs).
Returns (added, removed). Serializes add->wait->delete so we respect the
one-pending-bulk-op-per-account limit. Raises CFError on hard failure.
"""
existing = cf_list_items(list_id) # {ip: item_id}
existing_ips = set(existing)
to_add = sorted(desired - existing_ips)
to_remove_ids = [existing[ip] for ip in (existing_ips - desired)]
added = removed = 0
if to_add:
op = cf_add_items(list_id, to_add)
if _wait_for_op(op):
added = len(to_add)
else:
# add op didn't finish; skip the delete this run to avoid stacking a
# second pending op, and report what we attempted.
print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr)
return added, removed
if to_remove_ids:
op = cf_delete_items(list_id, to_remove_ids)
if _wait_for_op(op):
removed = len(to_remove_ids)
print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})")
return added, removed
# --------------------------------------------------------------------------- #
# Metrics (best-effort)
# --------------------------------------------------------------------------- #
def push_metrics(ban_n, captcha_n, ok):
if not PUSHGATEWAY: if not PUSHGATEWAY:
return return
payload = ( payload = (
"# TYPE crowdsec_kv_sync_decisions gauge\n" "# TYPE crowdsec_cf_list_ban_count gauge\n"
f"crowdsec_kv_sync_decisions {synced}\n" f"crowdsec_cf_list_ban_count {ban_n}\n"
"# TYPE crowdsec_kv_sync_deleted gauge\n" "# TYPE crowdsec_cf_list_captcha_count gauge\n"
f"crowdsec_kv_sync_deleted {deleted}\n" f"crowdsec_cf_list_captcha_count {captcha_n}\n"
"# TYPE crowdsec_kv_sync_success gauge\n" "# TYPE crowdsec_cf_list_sync_success gauge\n"
f"crowdsec_kv_sync_success {1 if ok else 0}\n" f"crowdsec_cf_list_sync_success {1 if ok else 0}\n"
"# TYPE crowdsec_kv_sync_last_run_seconds gauge\n" "# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n"
f"crowdsec_kv_sync_last_run_seconds {int(time.time())}\n" f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n"
) )
try: try:
_req(f"{PUSHGATEWAY}/metrics/job/crowdsec-kv-sync", method="PUT", _req(
headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10) f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync",
method="PUT",
headers={"Content-Type": "text/plain"},
data=payload.encode(),
timeout=10,
)
except Exception as e: # metrics are best-effort, never fail the job except Exception as e: # metrics are best-effort, never fail the job
print(f"[warn] pushgateway: {e}", file=sys.stderr) print(f"[warn] pushgateway: {e}", file=sys.stderr)
# --------------------------------------------------------------------------- #
def main(): def main():
# 1. Desired state from LAPI. Any failure here = SKIP (fail-safe, leave KV). # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe).
try: try:
desired = fetch_decisions() ban, captcha = fetch_decisions()
except Exception as e: except Exception as e:
print(f"[skip] LAPI unreadable ({e}); leaving KV untouched so existing " print(
f"bans expire by TTL (fail-open).", file=sys.stderr) f"[skip] LAPI unreadable ({e}); leaving CF lists untouched "
f"(fail-safe: freeze last-known edge state).",
file=sys.stderr,
)
push_metrics(0, 0, ok=False) push_metrics(0, 0, ok=False)
return 0 return 0
# 2. Current edge state. print(f"[info] LAPI desired: {len(ban)} ban / {len(captcha)} captcha (ip-scope)")
existing = set(cf_list_keys())
desired_keys = set(desired)
upserts = [(k, v, t) for k, (v, t) in desired.items()] # 2. Reconcile both lists. CF errors fail loud (non-zero exit).
stale = existing - desired_keys try:
reconcile("ban", CF_BAN_LIST_ID, ban)
reconcile("captcha", CF_CAPTCHA_LIST_ID, captcha)
except CFError as e:
print(f"[error] Cloudflare API failure: {e}", file=sys.stderr)
push_metrics(len(ban), len(captcha), ok=False)
return 1
if upserts: push_metrics(len(ban), len(captcha), ok=True)
cf_bulk_put(upserts)
if stale:
cf_bulk_delete(stale)
print(f"[ok] synced {len(upserts)} decision(s) to KV, removed {len(stale)} stale; "
f"{sum(1 for _, v, _ in upserts if v == 'ban')} ban / "
f"{sum(1 for _, v, _ in upserts if v == 'captcha')} captcha")
push_metrics(len(upserts), len(stale), ok=True)
return 0 return 0