#!/usr/bin/env python3 """Sync CrowdSec LAPI decisions -> two Cloudflare account IP Lists. Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec bouncer (which keys on the client IP Traefik sees) never decides on them. We push the decisions into the edge instead: a zone-scoped WAF custom rule blocks `(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)` across EVERY proxied host in the zone. This job is the control plane that keeps those two IP Lists in sync with LAPI. (Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior Workers-KV design; it no longer touches KV — it reconciles CF Rules Lists.) Design notes: * Pure Python stdlib (no pip/apk at runtime) — runs on stock python:3.12-alpine mounted from a ConfigMap, the alert_digest pattern. * FULL RECONCILE each run: read the complete decision set from LAPI, partition into ban / captcha desired sets, then for each list compute add (desired - existing) and remove (existing - desired) and apply both. An IP listed for BOTH ban and captcha is placed in BAN ONLY (ban wins; the WAF rule order also blocks-before-challenges as belt-and-braces). A `cscli decisions delete` therefore clears from the edge within one interval (<=2 min). * FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (lists untouched, exit 0). A LAPI outage thus freezes the edge state rather than wiping every ban — degrade toward the last-known-good block set, never toward all-block or a thundering un-ban. (Decisions linger only until the next successful sync, not their TTL — we reconcile to LAPI truth, we don't expire entries.) * FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits non-zero so the failure is visible (CronJob backoff + missing success metric + the next run retries). Cloudflare Rules-Lists API (account-level IP list items), verified against the official API reference (developers.cloudflare.com, 2026): * GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page cursor at result_info.cursors.after, passed back as ?cursor=. Each item = {"id","ip","created_on",...}. * POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY [{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list). ASYNCHRONOUS: returns {"result":{"operation_id":...}}. * DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id": ""},...]} (delete by item id, not ip). ASYNCHRONOUS. * GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in {pending,running,completed,failed} (failed carries `error`). ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT. So we must NOT fire add+delete (or both lists) concurrently — we serialize and poll each operation_id to a terminal state (short bounded timeout) before the next mutation. If a poll times out we stop mutating for this run and report partial success (the next 2-min run reconciles the rest); we never abandon an in-flight op and immediately issue another (that would 409/reject). """ import json import os import sys import time import urllib.error import urllib.parse import urllib.request LAPI_URL = os.environ.get( "LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080" ).rstrip("/") LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"] CF_CAPTCHA_LIST_ID = os.environ["CF_CAPTCHA_LIST_ID"] PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional CF_API = "https://api.cloudflare.com/client/v4" # Cloudflare item objects expose the ip differently between list and create # responses; for an IP-kind list each GET item carries a top-level "ip". # Batch sizes: no official per-request cap is documented, so keep batches # generous but bounded (well under the global 1200 req / 5 min limit). BATCH = 1000 # Async op polling: 1 pending bulk op per account, so poll to terminal state. POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget) POLL_INTERVAL = 1.0 class CFError(Exception): """Cloudflare API failure -> job should exit non-zero (fail loud).""" def _req(url, *, method="GET", headers=None, data=None, timeout=20): req = urllib.request.Request(url, method=method, headers=headers or {}, data=data) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read() return json.loads(body) if body else None def _cf(url, *, method="GET", payload=None, timeout=20): """Call the CF API with the bearer token; raise CFError on any failure.""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}"} data = None if payload is not None: headers["Content-Type"] = "application/json" data = json.dumps(payload).encode() try: res = _req(url, method=method, headers=headers, data=data, timeout=timeout) except urllib.error.HTTPError as e: detail = "" try: detail = e.read().decode(errors="replace")[:500] except Exception: pass raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e except urllib.error.URLError as e: raise CFError(f"{method} {url} -> {e}") from e if res is not None and not res.get("success", True): raise CFError(f"{method} {url} -> not success: {res.get('errors')}") return res # --------------------------------------------------------------------------- # # LAPI # --------------------------------------------------------------------------- # def fetch_decisions(): """Return (ban_set, captcha_set) of IPs from LAPI. Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). An IP appearing in both ban and captcha is placed in BAN only. Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). """ data = _req( f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}, ) ban, captcha = set(), set() for d in data or []: if (d.get("scope") or "").lower() != "ip": continue ip = d.get("value") if not ip: continue dtype = (d.get("type") or "").lower() if dtype == "ban": ban.add(ip) elif dtype == "captcha": captcha.add(ip) # other remediation types (e.g. throttle) are ignored captcha -= ban # ban wins: never list the same IP in both return ban, captcha # --------------------------------------------------------------------------- # # Cloudflare list items # --------------------------------------------------------------------------- # def cf_list_items(list_id): """Return {ip: item_id} for every item currently in the list (paginated).""" out = {} cursor = "" while True: url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000" if cursor: url += f"&cursor={urllib.parse.quote(cursor)}" res = _cf(url) for it in (res.get("result") or []): ip = it.get("ip") if ip: out[ip] = it.get("id") cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or "" if not cursor: return out def _wait_for_op(op_id): """Poll a bulk operation to a terminal state. Returns True if completed, False if it timed out (still pending/running). Raises CFError if it failed. We must reach a terminal state before issuing the next mutation: CF allows only one pending bulk op per account, so firing another while this is in-flight would be rejected. """ if not op_id: return True deadline = time.time() + POLL_TIMEOUT url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}" while time.time() < deadline: res = _cf(url) status = ((res.get("result") or {}).get("status") or "").lower() if status == "completed": return True if status == "failed": raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}") time.sleep(POLL_INTERVAL) print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; " f"stopping further mutations this run (next run reconciles)", file=sys.stderr) return False def _op_id(res): return ((res or {}).get("result") or {}).get("operation_id") def cf_add_items(list_id, ips): """POST new IPs to the list (append). Returns the operation_id (async).""" # If callers ever exceed one batch, each POST is a separate bulk op and the # single-pending-op rule forces us to wait between them. last_op = None for i in range(0, len(ips), BATCH): chunk = [{"ip": ip} for ip in ips[i : i + BATCH]] res = _cf( f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="POST", payload=chunk, ) last_op = _op_id(res) if i + BATCH < len(ips): # more chunks coming -> serialize if not _wait_for_op(last_op): return last_op # timed out; bail (partial), next run continues return last_op def cf_delete_items(list_id, item_ids): """DELETE items by id. Returns the operation_id (async).""" last_op = None for i in range(0, len(item_ids), BATCH): chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]} res = _cf( f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="DELETE", payload=chunk, ) last_op = _op_id(res) if i + BATCH < len(item_ids): if not _wait_for_op(last_op): return last_op return last_op def reconcile(label, list_id, desired): """Reconcile one list to `desired` (a set of IPs). Returns (added, removed). Serializes add->wait->delete so we respect the one-pending-bulk-op-per-account limit. Raises CFError on hard failure. """ existing = cf_list_items(list_id) # {ip: item_id} existing_ips = set(existing) to_add = sorted(desired - existing_ips) to_remove_ids = [existing[ip] for ip in (existing_ips - desired)] added = removed = 0 if to_add: op = cf_add_items(list_id, to_add) if _wait_for_op(op): added = len(to_add) else: # add op didn't finish; skip the delete this run to avoid stacking a # second pending op, and report what we attempted. print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr) return added, removed if to_remove_ids: op = cf_delete_items(list_id, to_remove_ids) if _wait_for_op(op): removed = len(to_remove_ids) print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})") return added, removed # --------------------------------------------------------------------------- # # Metrics (best-effort) # --------------------------------------------------------------------------- # def push_metrics(ban_n, captcha_n, ok): if not PUSHGATEWAY: return payload = ( "# TYPE crowdsec_cf_list_ban_count gauge\n" f"crowdsec_cf_list_ban_count {ban_n}\n" "# TYPE crowdsec_cf_list_captcha_count gauge\n" f"crowdsec_cf_list_captcha_count {captcha_n}\n" "# TYPE crowdsec_cf_list_sync_success gauge\n" f"crowdsec_cf_list_sync_success {1 if ok else 0}\n" "# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n" f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n" ) try: _req( f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync", method="PUT", headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10, ) except Exception as e: # metrics are best-effort, never fail the job print(f"[warn] pushgateway: {e}", file=sys.stderr) # --------------------------------------------------------------------------- # def main(): # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe). try: ban, captcha = fetch_decisions() except Exception as e: print( f"[skip] LAPI unreadable ({e}); leaving CF lists untouched " f"(fail-safe: freeze last-known edge state).", file=sys.stderr, ) push_metrics(0, 0, ok=False) return 0 print(f"[info] LAPI desired: {len(ban)} ban / {len(captcha)} captcha (ip-scope)") # 2. Reconcile both lists. CF errors fail loud (non-zero exit). try: reconcile("ban", CF_BAN_LIST_ID, ban) reconcile("captcha", CF_CAPTCHA_LIST_ID, captcha) except CFError as e: print(f"[error] Cloudflare API failure: {e}", file=sys.stderr) push_metrics(len(ban), len(captcha), ok=False) return 1 push_metrics(len(ban), len(captcha), ok=True) return 0 if __name__ == "__main__": sys.exit(main())