#!/usr/bin/env python3 """Sync CrowdSec LAPI decisions -> ONE Cloudflare account IP List (block-only). Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec bouncer (which keys on the client IP Traefik sees) never decides on them. We push the decisions into the edge instead: a zone-scoped WAF custom rule blocks `(ip.src in $crowdsec_ban)` across EVERY proxied host in the zone. This job is the control plane that keeps that one IP List in sync with LAPI. The CF account hard-limits to ONE Rules List, so enforcement is BLOCK-ONLY: BOTH ban AND captcha (scope=="ip") decisions are folded into the single crowdsec_ban list and captcha is downgraded to block at the proxied edge. (Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior Workers-KV design; it no longer touches KV — it reconciles a CF Rules List.) Design notes: * Pure Python stdlib (no pip/apk at runtime) — runs on stock python:3.12-alpine mounted from a ConfigMap, the alert_digest pattern. * FULL RECONCILE each run: read the complete decision set from LAPI, take the UNION of ban + captcha (scope=="ip") as the single desired set, then compute add (desired - existing) and remove (existing - desired) against the one crowdsec_ban list and apply both. A `cscli decisions delete` therefore clears from the edge within one interval (<=2 min). * FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (list untouched, exit 0). A LAPI outage thus freezes the edge state rather than wiping the block list — degrade toward the last-known-good block set, never toward all-block or a thundering un-ban. (Decisions linger only until the next successful sync, not their TTL — we reconcile to LAPI truth, we don't expire entries.) * FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits non-zero so the failure is visible (CronJob backoff + missing success metric + the next run retries). Cloudflare Rules-Lists API (account-level IP list items), verified against the official API reference (developers.cloudflare.com, 2026): * GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page cursor at result_info.cursors.after, passed back as ?cursor=. Each item = {"id","ip","created_on",...}. * POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY [{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list). ASYNCHRONOUS: returns {"result":{"operation_id":...}}. * DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id": ""},...]} (delete by item id, not ip). ASYNCHRONOUS. * GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in {pending,running,completed,failed} (failed carries `error`). ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT. So we must NOT fire add+delete concurrently — we serialize and poll each operation_id to a terminal state (short bounded timeout) before the next mutation. If a poll times out we stop mutating for this run and report partial success (the next 2-min run reconciles the rest); we never abandon an in-flight op and immediately issue another (that would 409/reject). """ import json import os import sys import time import urllib.error import urllib.parse import urllib.request LAPI_URL = os.environ.get( "LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080" ).rstrip("/") LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"] PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional CF_API = "https://api.cloudflare.com/client/v4" # Cloudflare item objects expose the ip differently between list and create # responses; for an IP-kind list each GET item carries a top-level "ip". # Batch sizes: no official per-request cap is documented, so keep batches # generous but bounded (well under the global 1200 req / 5 min limit). BATCH = 1000 # Async op polling: 1 pending bulk op per account, so poll to terminal state. POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget) POLL_INTERVAL = 1.0 class CFError(Exception): """Cloudflare API failure -> job should exit non-zero (fail loud).""" def _req(url, *, method="GET", headers=None, data=None, timeout=20): req = urllib.request.Request(url, method=method, headers=headers or {}, data=data) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read() return json.loads(body) if body else None def _cf(url, *, method="GET", payload=None, timeout=20): """Call the CF API with the bearer token; raise CFError on any failure.""" headers = {"Authorization": f"Bearer {CF_API_TOKEN}"} data = None if payload is not None: headers["Content-Type"] = "application/json" data = json.dumps(payload).encode() try: res = _req(url, method=method, headers=headers, data=data, timeout=timeout) except urllib.error.HTTPError as e: detail = "" try: detail = e.read().decode(errors="replace")[:500] except Exception: pass raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e except urllib.error.URLError as e: raise CFError(f"{method} {url} -> {e}") from e if res is not None and not res.get("success", True): raise CFError(f"{method} {url} -> not success: {res.get('errors')}") return res # --------------------------------------------------------------------------- # # LAPI # --------------------------------------------------------------------------- # def fetch_decisions(): """Return the single desired set of IPs to BLOCK at the edge. Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). The CF account allows only ONE Rules List, so BOTH "ban" AND "captcha" decisions are folded into one block set (captcha is downgraded to block at the proxied edge). Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). """ data = _req( f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}, ) block = set() for d in data or []: if (d.get("scope") or "").lower() != "ip": continue ip = d.get("value") if not ip: continue dtype = (d.get("type") or "").lower() if dtype in ("ban", "captcha"): block.add(ip) # other remediation types (e.g. throttle) are ignored return block # --------------------------------------------------------------------------- # # Cloudflare list items # --------------------------------------------------------------------------- # def cf_list_items(list_id): """Return {ip: item_id} for every item currently in the list (paginated).""" out = {} cursor = "" while True: url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000" if cursor: url += f"&cursor={urllib.parse.quote(cursor)}" res = _cf(url) for it in (res.get("result") or []): ip = it.get("ip") if ip: out[ip] = it.get("id") cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or "" if not cursor: return out def _wait_for_op(op_id): """Poll a bulk operation to a terminal state. Returns True if completed, False if it timed out (still pending/running). Raises CFError if it failed. We must reach a terminal state before issuing the next mutation: CF allows only one pending bulk op per account, so firing another while this is in-flight would be rejected. """ if not op_id: return True deadline = time.time() + POLL_TIMEOUT url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}" while time.time() < deadline: res = _cf(url) status = ((res.get("result") or {}).get("status") or "").lower() if status == "completed": return True if status == "failed": raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}") time.sleep(POLL_INTERVAL) print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; " f"stopping further mutations this run (next run reconciles)", file=sys.stderr) return False def _op_id(res): return ((res or {}).get("result") or {}).get("operation_id") def cf_add_items(list_id, ips): """POST new IPs to the list (append). Returns the operation_id (async).""" # If callers ever exceed one batch, each POST is a separate bulk op and the # single-pending-op rule forces us to wait between them. last_op = None for i in range(0, len(ips), BATCH): chunk = [{"ip": ip} for ip in ips[i : i + BATCH]] res = _cf( f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="POST", payload=chunk, ) last_op = _op_id(res) if i + BATCH < len(ips): # more chunks coming -> serialize if not _wait_for_op(last_op): return last_op # timed out; bail (partial), next run continues return last_op def cf_delete_items(list_id, item_ids): """DELETE items by id. Returns the operation_id (async).""" last_op = None for i in range(0, len(item_ids), BATCH): chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]} res = _cf( f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="DELETE", payload=chunk, ) last_op = _op_id(res) if i + BATCH < len(item_ids): if not _wait_for_op(last_op): return last_op return last_op def reconcile(label, list_id, desired): """Reconcile one list to `desired` (a set of IPs). Returns (added, removed). Serializes add->wait->delete so we respect the one-pending-bulk-op-per-account limit. Raises CFError on hard failure. """ existing = cf_list_items(list_id) # {ip: item_id} existing_ips = set(existing) to_add = sorted(desired - existing_ips) to_remove_ids = [existing[ip] for ip in (existing_ips - desired)] added = removed = 0 if to_add: op = cf_add_items(list_id, to_add) if _wait_for_op(op): added = len(to_add) else: # add op didn't finish; skip the delete this run to avoid stacking a # second pending op, and report what we attempted. print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr) return added, removed if to_remove_ids: op = cf_delete_items(list_id, to_remove_ids) if _wait_for_op(op): removed = len(to_remove_ids) print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})") return added, removed # --------------------------------------------------------------------------- # # Metrics (best-effort) # --------------------------------------------------------------------------- # def push_metrics(block_n, ok): if not PUSHGATEWAY: return payload = ( "# TYPE crowdsec_cf_list_ban_count gauge\n" f"crowdsec_cf_list_ban_count {block_n}\n" "# TYPE crowdsec_cf_list_sync_success gauge\n" f"crowdsec_cf_list_sync_success {1 if ok else 0}\n" "# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n" f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n" ) try: _req( f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync", method="PUT", headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10, ) except Exception as e: # metrics are best-effort, never fail the job print(f"[warn] pushgateway: {e}", file=sys.stderr) # --------------------------------------------------------------------------- # def main(): # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe). try: block = fetch_decisions() except Exception as e: print( f"[skip] LAPI unreadable ({e}); leaving the CF list untouched " f"(fail-safe: freeze last-known edge state).", file=sys.stderr, ) push_metrics(0, ok=False) return 0 print(f"[info] LAPI desired: {len(block)} block (ban+captcha, ip-scope)") # 2. Reconcile the single block list. CF errors fail loud (non-zero exit). try: reconcile("block", CF_BAN_LIST_ID, block) except CFError as e: print(f"[error] Cloudflare API failure: {e}", file=sys.stderr) push_metrics(len(block), ok=False) return 1 push_metrics(len(block), ok=True) return 0 if __name__ == "__main__": sys.exit(main())