diff --git a/stacks/rybbit/crowdsec_edge.tf b/stacks/rybbit/crowdsec_edge.tf new file mode 100644 index 00000000..598cde28 --- /dev/null +++ b/stacks/rybbit/crowdsec_edge.tf @@ -0,0 +1,299 @@ +# ============================================================================= +# CrowdSec edge enforcement for Cloudflare-PROXIED hosts — control plane +# ============================================================================= +# Proxied hosts terminate at the Cloudflare edge, so the in-cluster CrowdSec +# bouncer (which keys on the real client IP seen by Traefik) never gets to +# decide on them. To enforce CrowdSec bans/captchas on proxied traffic we push +# the decision INTO the Cloudflare edge as account-level IP Lists + a single +# zone-scoped WAF custom rule: +# +# * Two account IP Lists — `crowdsec_ban` and `crowdsec_captcha` — hold the +# banned / captcha'd source IPs (empty in TF; populated at runtime). +# * A zone-scoped WAF ruleset in the http_request_firewall_custom phase +# blocks `(ip.src in $crowdsec_ban)` and managed-challenges +# `(ip.src in $crowdsec_captcha)`. Because it's a ZONE rule it enforces +# across ALL proxied hosts in the zone (~135), not just the handful a +# Worker would route. (The previous Worker+KV design only covered the ~27 +# hosts the rybbit Worker routed; the analytics Worker in worker/ is +# unrelated and stays.) +# +# This file is the CONTROL PLANE that keeps those lists in sync with LAPI: +# 1. the two empty IP Lists (list ITEMS are owned by the CronJob at runtime, +# NOT by Terraform — see the lifecycle ignore_changes on `item`), +# 2. a LEAST-PRIVILEGE Cloudflare API token (account Filter-Lists edit only, +# scoped to this account) the sync job authenticates with, +# 3. a CronJob running lapi_kv_sync.py every 2 min to full-reconcile LAPI +# decisions into the two lists (mirrors monitoring/alert_digest.tf: stock +# python:3.12-alpine + pure-stdlib script from a ConfigMap, no pip/apk at +# runtime). +# +# Cloudflare provider is pinned v4.52.7 (~> 4) — v4 schema is used throughout +# (v5 differs greatly: policy is a block here not a `policies = [...]` list; +# resources is a map not a jsonencode'd string; ruleset `rules` is a repeatable +# block; list items use `item { value { ip = ... } }`; permission groups are +# looked up via data.cloudflare_api_token_permission_groups, not a v5 *_list +# data source). context7 only indexes v5, so the v4 arguments below were +# verified against the v4.52.7 provider docs (github tag v4.52.7) — items +# FLAGGED ### VERIFY for tg-plan are noted inline. +# ============================================================================= + +data "cloudflare_accounts" "main" {} + +locals { + cf_account_id = data.cloudflare_accounts.main.accounts[0].id +} + +# ----------------------------------------------------------------------------- +# IP Lists — empty shells. The CronJob owns the items at runtime via the CF +# Rules-Lists API; TF must NOT manage items or every 2-min sync would fight the +# next `terragrunt apply` (apply would try to delete the runtime items). +# +# ### VERIFY (v4.52.7): cloudflare_list args account_id/name/kind/description; +# kind="ip" is one of {ip, redirect, hostname, asn}. The optional items +# block is named `item` (singular, Block Set) with `item { value { ip=... } +# comment=... }`. We declare NO `item` blocks (empty list) and +# ignore_changes=[item] so runtime items don't show as drift. +# NOTE: list `name` must match /^[a-zA-Z0-9_]+$/ (underscores ok, no dashes) +# — hence crowdsec_ban / crowdsec_captcha (underscore, not dash). +# ----------------------------------------------------------------------------- +resource "cloudflare_list" "crowdsec_ban" { + account_id = local.cf_account_id + name = "crowdsec_ban" + kind = "ip" + description = "CrowdSec ban decisions (synced from LAPI)" + + lifecycle { + # The crowdsec-cf-sync CronJob adds/removes items at runtime; TF owns only + # the empty list shell. Without this, every apply would delete live bans. + ignore_changes = [item] + } +} + +resource "cloudflare_list" "crowdsec_captcha" { + account_id = local.cf_account_id + name = "crowdsec_captcha" + kind = "ip" + description = "CrowdSec captcha decisions (synced from LAPI)" + + lifecycle { + ignore_changes = [item] + } +} + +# ----------------------------------------------------------------------------- +# Zone-scoped WAF custom ruleset — the actual enforcement. One ruleset, two +# rules, applied to EVERY proxied host in the zone. +# +# ### VERIFY (v4.52.7): cloudflare_ruleset with zone_id + kind="zone" + +# phase="http_request_firewall_custom"; `rules` is a repeatable block with +# action/expression/description/enabled. actions "block" and +# "managed_challenge" are both valid. List references in WAF expressions use +# the list NAME with a `$` prefix (NOT the list id): ($crowdsec_ban). +# Rule order matters — ban (block) is evaluated before captcha so a +# double-listed IP is blocked outright (the sync script also enforces +# ban-wins, so an IP is never in both lists, but order is belt-and-braces). +# +# zone_id is the viktorbarzin.me zone — the single zone id used repo-wide +# (default of var.cloudflare_zone_id in modules/kubernetes/ingress_factory and +# hardcoded the same in stacks/kms/main.tf; source of truth is the git-crypt'd +# config.tfvars). Hardcoded here (with the conventional marker comment) because +# the rybbit stack does not import the ingress_factory module. +# ----------------------------------------------------------------------------- +resource "cloudflare_ruleset" "crowdsec" { + zone_id = "fd2c5dd4efe8fe38958944e74d0ced6d" # cloudflare_zone_id (viktorbarzin.me) + name = "crowdsec-ip-enforcement" + description = "Block/challenge IPs CrowdSec flagged (synced from LAPI into CF IP Lists)" + kind = "zone" + phase = "http_request_firewall_custom" + + rules { + action = "block" + expression = "(ip.src in $crowdsec_ban)" + description = "CrowdSec: block banned IPs" + enabled = true + } + + rules { + action = "managed_challenge" + expression = "(ip.src in $crowdsec_captcha)" + description = "CrowdSec: managed-challenge captcha'd IPs" + enabled = true + } +} + +# ----------------------------------------------------------------------------- +# Least-privilege API token for the sync job: account-level Filter-Lists edit +# ONLY, scoped to this single account (no zone/DNS/Workers access). The token +# value is sensitive and lands in TF state (Tier-1 PG, encrypted at rest) and +# in the rybbit Secret below — same trust level as the CF Global API Key +# already in state. +# +# ### VERIFY (v4.52.7): cloudflare_api_token with a repeatable `policy` block +# (effect / permission_groups = Set of String / resources = Map of String); +# token secret is exposed as `.value` (sensitive). +# +# ### VERIFY — PERMISSION GROUP NAME (highest-risk item). v4.52.7 deprecates +# the flat `.permissions[...]` map ("some permissions overlap resource +# scope"); the non-deprecated lookup is the scoped `.account[...]` map. +# Cloudflare's current permissions reference calls the account list-edit +# group "Account Filter Lists Edit" (and read "Account Filter Lists Read"). +# An OLDER community gist instead shows "Account Rule Lists Read/Write" — +# Cloudflare has renamed this group over time. If `tg plan` errors with a +# missing key, try (in order): .account["Account Filter Lists Edit"] -> +# .account["Account Rule Lists Write"], or enumerate the live names with: +# terraform console +# > data.cloudflare_api_token_permission_groups.all.account +# Read is not strictly required for edit (Edit = full CRUDL) but the sync +# job GETs items, so we include Read too to be safe. +# ----------------------------------------------------------------------------- +data "cloudflare_api_token_permission_groups" "all" {} + +resource "cloudflare_api_token" "list_sync" { + name = "rybbit-crowdsec-list-sync" + + policy { + effect = "allow" + permission_groups = [ + data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Edit"], + data.cloudflare_api_token_permission_groups.all.account["Account Filter Lists Read"], + ] + resources = { + "com.cloudflare.api.account.${local.cf_account_id}" = "*" + } + } +} + +# ----------------------------------------------------------------------------- +# Pure-stdlib sync script, mounted into the CronJob from a ConfigMap (the +# alert_digest pattern — no per-run package installs). +# ----------------------------------------------------------------------------- +resource "kubernetes_config_map" "crowdsec_cf_sync_script" { + metadata { + name = "crowdsec-cf-sync-script" + namespace = "rybbit" + } + data = { + "lapi_kv_sync.py" = file("${path.module}/lapi_kv_sync.py") + } +} + +# Secrets consumed by the sync job: the LAPI bouncer key (registered in LAPI, +# stored in Vault secret/platform -> kvsync_bouncer_key) and the minted CF +# token value. Account id and list ids are NOT secret and are passed as plain +# env values on the CronJob. +resource "kubernetes_secret" "crowdsec_cf_sync" { + metadata { + name = "crowdsec-cf-sync" + namespace = "rybbit" + } + type = "Opaque" + data = { + LAPI_KEY = data.vault_kv_secret_v2.cf_platform.data["kvsync_bouncer_key"] + CF_API_TOKEN = cloudflare_api_token.list_sync.value + } +} + +resource "kubernetes_cron_job_v1" "crowdsec_cf_sync" { + metadata { + name = "crowdsec-cf-sync" + namespace = "rybbit" + labels = { + app = "crowdsec-cf-sync" + tier = local.tiers.aux + } + } + spec { + concurrency_policy = "Forbid" + failed_jobs_history_limit = 3 + successful_jobs_history_limit = 3 + schedule = "*/2 * * * *" + starting_deadline_seconds = 110 + job_template { + metadata {} + spec { + backoff_limit = 2 + ttl_seconds_after_finished = 3600 + template { + metadata { + labels = { + app = "crowdsec-cf-sync" + } + } + spec { + restart_policy = "OnFailure" + container { + name = "crowdsec-cf-sync" + image = "docker.io/library/python:3.12-alpine" + image_pull_policy = "IfNotPresent" + command = ["python3", "/scripts/lapi_kv_sync.py"] + env { + name = "LAPI_KEY" + value_from { + secret_key_ref { + name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name + key = "LAPI_KEY" + } + } + } + env { + name = "CF_API_TOKEN" + value_from { + secret_key_ref { + name = kubernetes_secret.crowdsec_cf_sync.metadata[0].name + key = "CF_API_TOKEN" + } + } + } + env { + name = "CF_ACCOUNT_ID" + value = local.cf_account_id + } + env { + name = "CF_BAN_LIST_ID" + value = cloudflare_list.crowdsec_ban.id + } + env { + name = "CF_CAPTCHA_LIST_ID" + value = cloudflare_list.crowdsec_captcha.id + } + env { + name = "PUSHGATEWAY_URL" + value = "http://prometheus-prometheus-pushgateway.monitoring:9091" + } + volume_mount { + name = "script" + mount_path = "/scripts" + read_only = true + } + resources { + requests = { + cpu = "10m" + memory = "48Mi" + } + limits = { + memory = "96Mi" + } + } + } + volume { + name = "script" + config_map { + name = kubernetes_config_map.crowdsec_cf_sync_script.metadata[0].name + } + } + dns_config { + option { + name = "ndots" + value = "2" + } + } + } + } + } + } + } + lifecycle { + # KYVERNO_LIFECYCLE_V1: Kyverno admission webhook mutates dns_config with ndots=2 + ignore_changes = [spec[0].job_template[0].spec[0].template[0].spec[0].dns_config] + } +} diff --git a/stacks/rybbit/lapi_kv_sync.py b/stacks/rybbit/lapi_kv_sync.py index abe7192d..942912c9 100644 --- a/stacks/rybbit/lapi_kv_sync.py +++ b/stacks/rybbit/lapi_kv_sync.py @@ -1,48 +1,84 @@ #!/usr/bin/env python3 -"""Sync CrowdSec LAPI decisions -> Cloudflare Workers KV. +"""Sync CrowdSec LAPI decisions -> two Cloudflare account IP Lists. -The rybbit edge Worker enforces CrowdSec bans/captchas for Cloudflare-PROXIED -hosts by reading an edge-local KV entry (`ip:` -> `ban`|`captcha`) on each -request. This job is the control plane that keeps that KV in sync with LAPI: -the request path NEVER calls LAPI (no per-request hop) — exactly the nginx -"tail logs -> inject rules" model, just projected onto Workers KV instead of -nftables. +Cloudflare-PROXIED hosts terminate at the CF edge, so the in-cluster CrowdSec +bouncer (which keys on the client IP Traefik sees) never decides on them. We +push the decisions into the edge instead: a zone-scoped WAF custom rule blocks +`(ip.src in $crowdsec_ban)` and managed-challenges `(ip.src in $crowdsec_captcha)` +across EVERY proxied host in the zone. This job is the control plane that keeps +those two IP Lists in sync with LAPI. + +(Filename kept as lapi_kv_sync.py for path/ConfigMap continuity with the prior +Workers-KV design; it no longer touches KV — it reconciles CF Rules Lists.) Design notes: * Pure Python stdlib (no pip/apk at runtime) — runs on stock python:3.12-alpine - mounted from a ConfigMap, the alert_digest pattern (avoids the disk - anti-pattern of installing packages every run). - * FULL RECONCILE each run: read the complete current decision set from LAPI, - compute the desired KV state, then upsert present keys and delete stale ones. - This makes an un-ban (cscli decisions delete) clear from the edge within one - interval instead of lingering until the original TTL — important for getting - a false-positive un-blocked fast. - * FAIL-SAFE: if LAPI can't be read, we SKIP the run (leave KV untouched) rather - than wipe every ban. Existing KV entries then simply expire by their TTL, so - a LAPI outage degrades toward fail-OPEN, never toward a stale all-block. - * Only Ip-scope ban/captcha decisions are projected. Range-scope and other - remediations are ignored (the Worker keys on a single IP). + mounted from a ConfigMap, the alert_digest pattern. + * FULL RECONCILE each run: read the complete decision set from LAPI, partition + into ban / captcha desired sets, then for each list compute add (desired - + existing) and remove (existing - desired) and apply both. An IP listed for + BOTH ban and captcha is placed in BAN ONLY (ban wins; the WAF rule order + also blocks-before-challenges as belt-and-braces). A `cscli decisions + delete` therefore clears from the edge within one interval (<=2 min). + * FAIL-SAFE on LAPI: if LAPI can't be read we SKIP the run (lists untouched, + exit 0). A LAPI outage thus freezes the edge state rather than wiping every + ban — degrade toward the last-known-good block set, never toward all-block + or a thundering un-ban. (Decisions linger only until the next successful + sync, not their TTL — we reconcile to LAPI truth, we don't expire entries.) + * FAIL-LOUD on Cloudflare: any CF API error is logged and the job exits + non-zero so the failure is visible (CronJob backoff + missing success + metric + the next run retries). + +Cloudflare Rules-Lists API (account-level IP list items), verified against the +official API reference (developers.cloudflare.com, 2026): + * GET /accounts/{acct}/rules/lists/{list}/items -> paginated; next page + cursor at result_info.cursors.after, passed back as ?cursor=. Each + item = {"id","ip","created_on",...}. + * POST /accounts/{acct}/rules/lists/{list}/items -> body JSON ARRAY + [{"ip":"1.2.3.4"},...]. APPENDS/upserts (does NOT replace the list). + ASYNCHRONOUS: returns {"result":{"operation_id":...}}. + * DELETE /accounts/{acct}/rules/lists/{list}/items -> body {"items":[{"id": + ""},...]} (delete by item id, not ip). ASYNCHRONOUS. + * GET /accounts/{acct}/rules/lists/bulk_operations/{op_id} -> status in + {pending,running,completed,failed} (failed carries `error`). + ASYNC HANDLING: Cloudflare allows only ONE pending bulk operation per ACCOUNT. + So we must NOT fire add+delete (or both lists) concurrently — we serialize and + poll each operation_id to a terminal state (short bounded timeout) before the + next mutation. If a poll times out we stop mutating for this run and report + partial success (the next 2-min run reconciles the rest); we never abandon an + in-flight op and immediately issue another (that would 409/reject). """ import json import os -import re import sys import time import urllib.error import urllib.parse import urllib.request -LAPI_URL = os.environ.get("LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080").rstrip("/") +LAPI_URL = os.environ.get( + "LAPI_URL", "http://crowdsec-service.crowdsec.svc.cluster.local:8080" +).rstrip("/") LAPI_KEY = os.environ["LAPI_KEY"] # kvsync bouncer key, registered in LAPI CF_ACCOUNT_ID = os.environ["CF_ACCOUNT_ID"] -CF_NAMESPACE_ID = os.environ["CF_KV_NAMESPACE_ID"] -CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Workers KV Storage:Edit +CF_API_TOKEN = os.environ["CF_API_TOKEN"] # scoped: Account Filter Lists Edit +CF_BAN_LIST_ID = os.environ["CF_BAN_LIST_ID"] +CF_CAPTCHA_LIST_ID = os.environ["CF_CAPTCHA_LIST_ID"] PUSHGATEWAY = os.environ.get("PUSHGATEWAY_URL", "").rstrip("/") # optional -KEY_PREFIX = "ip:" -# CrowdSec remediation type -> KV value the Worker understands. -TYPE_MAP = {"ban": "ban", "captcha": "captcha"} + CF_API = "https://api.cloudflare.com/client/v4" -MIN_TTL = 60 # Cloudflare KV minimum expiration_ttl (seconds) +# Cloudflare item objects expose the ip differently between list and create +# responses; for an IP-kind list each GET item carries a top-level "ip". +# Batch sizes: no official per-request cap is documented, so keep batches +# generous but bounded (well under the global 1200 req / 5 min limit). +BATCH = 1000 +# Async op polling: 1 pending bulk op per account, so poll to terminal state. +POLL_TIMEOUT = 25 # seconds to wait for one bulk op (the run has ~110s budget) +POLL_INTERVAL = 1.0 + + +class CFError(Exception): + """Cloudflare API failure -> job should exit non-zero (fail loud).""" def _req(url, *, method="GET", headers=None, data=None, timeout=20): @@ -52,136 +88,229 @@ def _req(url, *, method="GET", headers=None, data=None, timeout=20): return json.loads(body) if body else None -def parse_duration_seconds(dur): - """Parse a Go duration string (e.g. '167h59m51.5s') into whole seconds. - - LAPI returns the REMAINING duration of each decision here. We floor to the - second and clamp to Cloudflare's 60s minimum so the edge entry outlives the - next sync interval. - """ - if not dur: - return MIN_TTL - dur = dur.strip().lstrip("+") - if dur.startswith("-"): # already expired; give it the floor and move on - return MIN_TTL - total = 0.0 - for value, unit in re.findall(r"(\d+(?:\.\d+)?)(h|m|s|ms|us|µs|ns)", dur): - v = float(value) - total += {"h": 3600, "m": 60, "s": 1, "ms": 1e-3, "us": 1e-6, "µs": 1e-6, "ns": 1e-9}[unit] * v - return max(MIN_TTL, int(total)) +def _cf(url, *, method="GET", payload=None, timeout=20): + """Call the CF API with the bearer token; raise CFError on any failure.""" + headers = {"Authorization": f"Bearer {CF_API_TOKEN}"} + data = None + if payload is not None: + headers["Content-Type"] = "application/json" + data = json.dumps(payload).encode() + try: + res = _req(url, method=method, headers=headers, data=data, timeout=timeout) + except urllib.error.HTTPError as e: + detail = "" + try: + detail = e.read().decode(errors="replace")[:500] + except Exception: + pass + raise CFError(f"{method} {url} -> HTTP {e.code} {detail}") from e + except urllib.error.URLError as e: + raise CFError(f"{method} {url} -> {e}") from e + if res is not None and not res.get("success", True): + raise CFError(f"{method} {url} -> not success: {res.get('errors')}") + return res +# --------------------------------------------------------------------------- # +# LAPI +# --------------------------------------------------------------------------- # def fetch_decisions(): - """Return desired KV state {('ip:'): (value, ttl_seconds)} from LAPI. + """Return (ban_set, captcha_set) of IPs from LAPI. - Raises on transport/HTTP error so the caller can SKIP the run (fail-safe). + Only scope=="ip" decisions are projected (the WAF rule keys on ip.src). An + IP appearing in both ban and captcha is placed in BAN only. Raises on + transport/HTTP error so the caller can SKIP the run (fail-safe). """ - out = {} - data = _req(f"{LAPI_URL}/v1/decisions", headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}) + data = _req( + f"{LAPI_URL}/v1/decisions", + headers={"X-Api-Key": LAPI_KEY, "Accept": "application/json"}, + ) + ban, captcha = set(), set() for d in data or []: if (d.get("scope") or "").lower() != "ip": continue - value = TYPE_MAP.get((d.get("type") or "").lower()) - if not value: - continue ip = d.get("value") if not ip: continue - key = KEY_PREFIX + ip - ttl = parse_duration_seconds(d.get("duration")) - # If the same IP carries multiple decisions, ban beats captcha and the - # longest TTL wins. - if key in out: - prev_val, prev_ttl = out[key] - value = "ban" if "ban" in (value, prev_val) else value - ttl = max(ttl, prev_ttl) - out[key] = (value, ttl) - return out + dtype = (d.get("type") or "").lower() + if dtype == "ban": + ban.add(ip) + elif dtype == "captcha": + captcha.add(ip) + # other remediation types (e.g. throttle) are ignored + captcha -= ban # ban wins: never list the same IP in both + return ban, captcha -def cf_list_keys(): - """List existing `ip:` keys currently in the KV namespace (paginated).""" - keys = [] +# --------------------------------------------------------------------------- # +# Cloudflare list items +# --------------------------------------------------------------------------- # +def cf_list_items(list_id): + """Return {ip: item_id} for every item currently in the list (paginated).""" + out = {} cursor = "" while True: - url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/keys?prefix={KEY_PREFIX}&limit=1000" + url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items?per_page=1000" if cursor: url += f"&cursor={urllib.parse.quote(cursor)}" - res = _req(url, headers={"Authorization": f"Bearer {CF_API_TOKEN}"}) - keys.extend(k["name"] for k in (res.get("result") or [])) - cursor = (res.get("result_info") or {}).get("cursor") or "" + res = _cf(url) + for it in (res.get("result") or []): + ip = it.get("ip") + if ip: + out[ip] = it.get("id") + cursor = (((res.get("result_info") or {}).get("cursors") or {}).get("after")) or "" if not cursor: - return keys + return out -def cf_bulk_put(items): - """items: list of (key, value, ttl). Cloudflare bulk PUT (<=10000/call).""" - for i in range(0, len(items), 10000): - chunk = [{"key": k, "value": v, "expiration_ttl": t} for k, v, t in items[i:i + 10000]] - _req( - f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk", - method="PUT", - headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, - data=json.dumps(chunk).encode(), - ) +def _wait_for_op(op_id): + """Poll a bulk operation to a terminal state. Returns True if completed, + False if it timed out (still pending/running). Raises CFError if it failed. + + We must reach a terminal state before issuing the next mutation: CF allows + only one pending bulk op per account, so firing another while this is + in-flight would be rejected. + """ + if not op_id: + return True + deadline = time.time() + POLL_TIMEOUT + url = f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/bulk_operations/{op_id}" + while time.time() < deadline: + res = _cf(url) + status = ((res.get("result") or {}).get("status") or "").lower() + if status == "completed": + return True + if status == "failed": + raise CFError(f"bulk op {op_id} failed: {(res.get('result') or {}).get('error')}") + time.sleep(POLL_INTERVAL) + print(f"[warn] bulk op {op_id} still {status or 'pending'} after {POLL_TIMEOUT}s; " + f"stopping further mutations this run (next run reconciles)", file=sys.stderr) + return False -def cf_bulk_delete(keys): - for i in range(0, len(keys), 10000): - _req( - f"{CF_API}/accounts/{CF_ACCOUNT_ID}/storage/kv/namespaces/{CF_NAMESPACE_ID}/bulk/delete", +def _op_id(res): + return ((res or {}).get("result") or {}).get("operation_id") + + +def cf_add_items(list_id, ips): + """POST new IPs to the list (append). Returns the operation_id (async).""" + # If callers ever exceed one batch, each POST is a separate bulk op and the + # single-pending-op rule forces us to wait between them. + last_op = None + for i in range(0, len(ips), BATCH): + chunk = [{"ip": ip} for ip in ips[i : i + BATCH]] + res = _cf( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", method="POST", - headers={"Authorization": f"Bearer {CF_API_TOKEN}", "Content-Type": "application/json"}, - data=json.dumps(list(keys)).encode(), + payload=chunk, ) + last_op = _op_id(res) + if i + BATCH < len(ips): # more chunks coming -> serialize + if not _wait_for_op(last_op): + return last_op # timed out; bail (partial), next run continues + return last_op -def push_metrics(synced, deleted, ok): +def cf_delete_items(list_id, item_ids): + """DELETE items by id. Returns the operation_id (async).""" + last_op = None + for i in range(0, len(item_ids), BATCH): + chunk = {"items": [{"id": iid} for iid in item_ids[i : i + BATCH]]} + res = _cf( + f"{CF_API}/accounts/{CF_ACCOUNT_ID}/rules/lists/{list_id}/items", + method="DELETE", + payload=chunk, + ) + last_op = _op_id(res) + if i + BATCH < len(item_ids): + if not _wait_for_op(last_op): + return last_op + return last_op + + +def reconcile(label, list_id, desired): + """Reconcile one list to `desired` (a set of IPs). + + Returns (added, removed). Serializes add->wait->delete so we respect the + one-pending-bulk-op-per-account limit. Raises CFError on hard failure. + """ + existing = cf_list_items(list_id) # {ip: item_id} + existing_ips = set(existing) + to_add = sorted(desired - existing_ips) + to_remove_ids = [existing[ip] for ip in (existing_ips - desired)] + + added = removed = 0 + if to_add: + op = cf_add_items(list_id, to_add) + if _wait_for_op(op): + added = len(to_add) + else: + # add op didn't finish; skip the delete this run to avoid stacking a + # second pending op, and report what we attempted. + print(f"[warn] {label}: add op not confirmed; deferring deletes", file=sys.stderr) + return added, removed + if to_remove_ids: + op = cf_delete_items(list_id, to_remove_ids) + if _wait_for_op(op): + removed = len(to_remove_ids) + print(f"[ok] {label}: +{added} / -{removed} (desired={len(desired)}, was={len(existing_ips)})") + return added, removed + + +# --------------------------------------------------------------------------- # +# Metrics (best-effort) +# --------------------------------------------------------------------------- # +def push_metrics(ban_n, captcha_n, ok): if not PUSHGATEWAY: return payload = ( - "# TYPE crowdsec_kv_sync_decisions gauge\n" - f"crowdsec_kv_sync_decisions {synced}\n" - "# TYPE crowdsec_kv_sync_deleted gauge\n" - f"crowdsec_kv_sync_deleted {deleted}\n" - "# TYPE crowdsec_kv_sync_success gauge\n" - f"crowdsec_kv_sync_success {1 if ok else 0}\n" - "# TYPE crowdsec_kv_sync_last_run_seconds gauge\n" - f"crowdsec_kv_sync_last_run_seconds {int(time.time())}\n" + "# TYPE crowdsec_cf_list_ban_count gauge\n" + f"crowdsec_cf_list_ban_count {ban_n}\n" + "# TYPE crowdsec_cf_list_captcha_count gauge\n" + f"crowdsec_cf_list_captcha_count {captcha_n}\n" + "# TYPE crowdsec_cf_list_sync_success gauge\n" + f"crowdsec_cf_list_sync_success {1 if ok else 0}\n" + "# TYPE crowdsec_cf_list_sync_last_run_seconds gauge\n" + f"crowdsec_cf_list_sync_last_run_seconds {int(time.time())}\n" ) try: - _req(f"{PUSHGATEWAY}/metrics/job/crowdsec-kv-sync", method="PUT", - headers={"Content-Type": "text/plain"}, data=payload.encode(), timeout=10) + _req( + f"{PUSHGATEWAY}/metrics/job/crowdsec-cf-list-sync", + method="PUT", + headers={"Content-Type": "text/plain"}, + data=payload.encode(), + timeout=10, + ) except Exception as e: # metrics are best-effort, never fail the job print(f"[warn] pushgateway: {e}", file=sys.stderr) +# --------------------------------------------------------------------------- # def main(): - # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe, leave KV). + # 1. Desired state from LAPI. Any failure here = SKIP (fail-safe). try: - desired = fetch_decisions() + ban, captcha = fetch_decisions() except Exception as e: - print(f"[skip] LAPI unreadable ({e}); leaving KV untouched so existing " - f"bans expire by TTL (fail-open).", file=sys.stderr) + print( + f"[skip] LAPI unreadable ({e}); leaving CF lists untouched " + f"(fail-safe: freeze last-known edge state).", + file=sys.stderr, + ) push_metrics(0, 0, ok=False) return 0 - # 2. Current edge state. - existing = set(cf_list_keys()) - desired_keys = set(desired) + print(f"[info] LAPI desired: {len(ban)} ban / {len(captcha)} captcha (ip-scope)") - upserts = [(k, v, t) for k, (v, t) in desired.items()] - stale = existing - desired_keys + # 2. Reconcile both lists. CF errors fail loud (non-zero exit). + try: + reconcile("ban", CF_BAN_LIST_ID, ban) + reconcile("captcha", CF_CAPTCHA_LIST_ID, captcha) + except CFError as e: + print(f"[error] Cloudflare API failure: {e}", file=sys.stderr) + push_metrics(len(ban), len(captcha), ok=False) + return 1 - if upserts: - cf_bulk_put(upserts) - if stale: - cf_bulk_delete(stale) - - print(f"[ok] synced {len(upserts)} decision(s) to KV, removed {len(stale)} stale; " - f"{sum(1 for _, v, _ in upserts if v == 'ban')} ban / " - f"{sum(1 for _, v, _ in upserts if v == 'captcha')} captcha") - push_metrics(len(upserts), len(stale), ok=True) + push_metrics(len(ban), len(captcha), ok=True) return 0