diff --git a/docs/runbooks/kms-public-exposure.md b/docs/runbooks/kms-public-exposure.md index 9a6a4a6f..2cb13c44 100644 --- a/docs/runbooks/kms-public-exposure.md +++ b/docs/runbooks/kms-public-exposure.md @@ -9,15 +9,22 @@ how to tune the rate limit, how to revoke if abused. ## Architecture -- **K8s service**: `windows-kms` in namespace `kms`, MetalLB shared LB IP - `10.0.20.200:1688`. ETP=Cluster, so client IPs in vlmcsd logs are SNAT'd - k8s node IPs (not real-world client IPs). Trade-off accepted — - preserving real client IPs would require a dedicated MetalLB IP with - ETP=Local or a PROXY-protocol bounce; vlmcsd doesn't speak PROXY-v2. -- **pfSense WAN forward**: `WAN TCP/1688 → k8s_shared_lb:1688` - (alias = `10.0.20.200`). Description: `KMS public — kms.viktorbarzin.me`. -- **Filter rule** on the WAN interface, TCP/1688, with state-table - per-source caps: +- **K8s service**: `windows-kms` in namespace `kms`, MetalLB **dedicated** + LB IP `10.0.20.202:1688`. ETP=Local, so vlmcsd sees real WAN client IPs + in its log (pfSense WAN forwards do DNAT-only, no SNAT; ETP=Local skips + the kube-proxy SNAT too). Same pattern mailserver used pre-2026-04-19. + Sharing `10.0.20.200` isn't an option — all 10 services there are + ETP=Cluster and MetalLB requires a single ETP per shared IP. +- **Pod fluidity**: deployment has `replicas=1` (notifier dedup state is + per-pod) with no node affinity. TCP readiness/liveness probes on 1688 + gate Pod Ready on the listener actually being up, so MetalLB only + advertises `10.0.20.202` from a node where vlmcsd is serving. +- **pfSense WAN forward**: `WAN TCP/1688 → k8s_kms_lb:1688` + (alias = `10.0.20.202`, dedicated to KMS). Description: `KMS public — + kms.viktorbarzin.me`. Other forwards using `k8s_shared_lb` (WireGuard, + HTTPS, shadowsocks, smtps, etc.) are unaffected. +- **Filter rule** on the WAN interface, TCP/1688 destination + ``, with state-table per-source caps: - `max-src-conn 50` — concurrent connections per source IP - `max-src-conn-rate 10/60` — 10 new connections per 60 seconds per source @@ -26,6 +33,13 @@ how to tune the rate limit, how to revoke if abused. flushed. (`virusprot` is the only table pfSense's filter generator targets for `overload`; see `/etc/inc/filter.inc`. Don't try to point it at a custom table — the schema doesn't expose that knob.) +- **Probe filter in slack-notifier**: a bare TCP open/close (no + Application/Activation block from vlmcsd) is treated as a probe — Uptime + Kuma's port-type monitor on `windows-kms.kms.svc:1688` and the kubelet + readiness/liveness probes both hit this path. Probes increment + `kms_connection_probes_total{source}` (`source` ∈ `internal_pod`, + `cluster_node`, `external`) and log to stdout, but never post to Slack. + Real activations still post. ## Where the logs are @@ -39,8 +53,11 @@ kubectl logs -n kms -l app=kms-service -c windows-kms --tail=50 -f kubectl logs -n kms -l app=kms-service -c windows-kms | grep "Incoming KMS request" ``` -Source IPs in this log are the SNAT'd node IPs because the LB Service uses -ETP=Cluster on a shared MetalLB IP. Don't expect real WAN client IPs here. +Source IPs from the WAN are real client IPs (pfSense DNAT-only + ETP=Local +preserve them through the chain). LAN clients hitting the LB IP directly +appear as their own IP. Pod-source probes (Uptime Kuma) appear as a Calico +pod IP in `10.10.0.0/16`. Kubelet readiness/liveness probes appear as the +hosting node IP in `10.0.20.0/24`. ### Slack notifier (kms namespace, k8s) @@ -53,6 +70,17 @@ also increment the Prometheus counter `kms_activations_total{product,status}` exposed on the same pod at `:9101/metrics` (scraped by the cluster-wide `kubernetes-pods` job; query via Prometheus or Grafana directly). +Probe-only TCP connections (open+close, no KMS RPC) are silently filtered +out of Slack and counted in `kms_connection_probes_total{source}`. Useful +queries: +```promql +# Probe rate by source +rate(kms_connection_probes_total[5m]) +# Probes from the public WAN (a non-zero rate here means real port-scans +# are reaching us, not just internal monitoring) +rate(kms_connection_probes_total{source="external"}[5m]) +``` + ### pfSense — virusprot table and filter hits ```bash @@ -93,18 +121,19 @@ The `overload` table entry survives pf reloads. Running If the activation surface needs to come down (abuse, legal, audit): 1. **pfSense web UI** → `Firewall → NAT → Port Forward` → find - `WAN TCP/1688 → k8s_shared_lb` → **delete** (or disable). Apply. + `WAN TCP/1688 → k8s_kms_lb` → **delete** (or disable). Apply. 2. **pfSense web UI** → `Firewall → Rules → WAN` → find `KMS public — kms.viktorbarzin.me` → **delete** (or disable). Apply. 3. Verify externally: from a phone tether, `nc -zw3 kms.viktorbarzin.me 1688` should now fail. The k8s service stays reachable on the LAN -(`10.0.20.200:1688` and the internal `kms.viktorbarzin.lan` ingress for -the webpage) — only the WAN port-forward is removed. +(`10.0.20.202:1688` directly, and the website at `kms.viktorbarzin.lan` +via Traefik on `10.0.20.200:443`) — only the WAN port-forward is removed. -To put it back, recreate the NAT rule (target alias `k8s_shared_lb`, -port `1688`) and the filter rule with the same per-source caps. +To put it back, recreate the NAT rule (target alias `k8s_kms_lb`, +port `1688`) and the filter rule with the same per-source caps. The alias +itself is independent of any forward and persists across delete/restore. ## Related diff --git a/stacks/kms/files/.gitignore b/stacks/kms/files/.gitignore new file mode 100644 index 00000000..7a60b85e --- /dev/null +++ b/stacks/kms/files/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc diff --git a/stacks/kms/files/slack-notifier.py b/stacks/kms/files/slack-notifier.py index e6f20df2..22c75017 100644 --- a/stacks/kms/files/slack-notifier.py +++ b/stacks/kms/files/slack-notifier.py @@ -12,16 +12,21 @@ vlmcsd verbose output emits a multi-line block per request: ... : IPv4 connection closed: :. -We accumulate per-connection state and emit on close. Dedupes by -(client_ip, product) within DEDUP_WINDOW_SECONDS to avoid spam from -Windows' default 7-day re-activation cycle hitting us repeatedly. +A bare TCP open/close pair (no Application/Activation lines) is a probe — +typically Uptime Kuma's port-type monitor on windows-kms.kms:1688. Probes +are counted in `kms_connection_probes_total` but never posted to Slack. + +Real activations dedupe by (client_ip, product) within DEDUP_WINDOW_SECONDS +to avoid spam from Windows' default 7-day re-activation cycle. Prometheus metrics (text format, no client_ip label — cardinality): kms_activations_total{product, status} counter kms_activations_dedup_skipped_total{product} counter + kms_connection_probes_total{source} counter (probe-only conns) kms_last_activation_timestamp_seconds gauge kms_slack_notifier_up gauge (heartbeat) """ +import ipaddress import json import os import re @@ -47,9 +52,16 @@ PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+ HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$") STATUS_RE = re.compile(r":\s*Licensing status\s*:\s*\d+\s*\((.+?)\)\s*$") +# Pod CIDR (Calico, kube-proxy SNAT-free intra-cluster traffic) and cluster +# LAN (kube-proxy SNATs ETP=Cluster external traffic to a node IP). Anything +# else is a real client IP that arrived via ETP=Local or pod-to-svc routing. +POD_CIDR = ipaddress.ip_network("10.10.0.0/16") +CLUSTER_LAN_CIDR = ipaddress.ip_network("10.0.20.0/24") + _metrics_lock = threading.Lock() _activations: dict = {} _dedup_skipped: dict = {} +_probes: dict = {} _last_activation_ts: float = 0.0 @@ -57,6 +69,80 @@ def _esc(value: str) -> str: return str(value).replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") +def classify_source(ip: str) -> str: + """Return 'internal_pod', 'cluster_node', or 'external' for a vlmcsd-logged IP.""" + raw = ip.strip().strip("[]") + try: + addr = ipaddress.ip_address(raw) + except ValueError: + return "external" + if isinstance(addr, ipaddress.IPv4Address): + if addr in POD_CIDR: + return "internal_pod" + if addr in CLUSTER_LAN_CIDR: + return "cluster_node" + return "external" + + +def is_probe(state: dict) -> bool: + """A connection that closed without any Application/Activation block.""" + return "app" not in state and "product" not in state + + +class Event: + __slots__ = ("kind", "ip", "source", "app", "product", "host", "status") + + def __init__(self, kind: str, ip: str, source: str, *, app: str = "", + product: str = "", host: str = "", status: str = "") -> None: + self.kind = kind + self.ip = ip + self.source = source + self.app = app + self.product = product + self.host = host + self.status = status + + +def process_line(line: str, state: dict): + """Drive the parser one line at a time. Returns (new_state, event_or_None). + + On `connection accepted`, a new state dict is started. + On `connection closed`, an Event is emitted and state is reset. + Other lines accumulate fields into the current state. + """ + if (m := OPEN_RE.search(line)): + return {"ip": m.group(1)}, None + if not state: + return state, None + if (m := APP_RE.search(line)): + state["app"] = m.group(1) + return state, None + if (m := PROD_RE.search(line)): + state["product"] = m.group(1) + return state, None + if (m := HOST_RE.search(line)): + state["host"] = m.group(1) + return state, None + if (m := STATUS_RE.search(line)): + state["status"] = m.group(1) + return state, None + if CLOSE_RE.search(line): + ip = state.get("ip", "?") + source = classify_source(ip) + if is_probe(state): + event = Event("probe", ip, source) + else: + event = Event( + "activation", ip, source, + app=state.get("app", ""), + product=state.get("product", state.get("app", "unknown")), + host=state.get("host", "?"), + status=state.get("status", "unknown"), + ) + return {}, event + return state, None + + def record_activation(product: str, status: str) -> None: global _last_activation_ts with _metrics_lock: @@ -70,11 +156,17 @@ def record_dedup_skip(product: str) -> None: _dedup_skipped[product] = _dedup_skipped.get(product, 0) + 1 +def record_probe(source: str) -> None: + with _metrics_lock: + _probes[source] = _probes.get(source, 0) + 1 + + def render_metrics() -> bytes: out = [] with _metrics_lock: activations = dict(_activations) dedup_skipped = dict(_dedup_skipped) + probes = dict(_probes) last_ts = _last_activation_ts out.append("# HELP kms_activations_total KMS activation events that resulted in a Slack post.") @@ -89,6 +181,11 @@ def render_metrics() -> bytes: for product, count in sorted(dedup_skipped.items()): out.append(f'kms_activations_dedup_skipped_total{{product="{_esc(product)}"}} {count}') + out.append("# HELP kms_connection_probes_total Probe-only TCP connections (open+close, no KMS RPC).") + out.append("# TYPE kms_connection_probes_total counter") + for source, count in sorted(probes.items()): + out.append(f'kms_connection_probes_total{{source="{_esc(source)}"}} {count}') + out.append("# HELP kms_last_activation_timestamp_seconds Unix ts of the last non-deduped activation.") out.append("# TYPE kms_last_activation_timestamp_seconds gauge") out.append(f"kms_last_activation_timestamp_seconds {last_ts}") @@ -174,6 +271,40 @@ def follow(path: str): time.sleep(1) +def handle_event(event: "Event", dedup: "DedupCache") -> None: + if event.kind == "probe": + record_probe(event.source) + print( + f"[slack-notifier] probe: ip={event.ip} source={event.source}", + flush=True, + ) + return + + key = f"{event.ip}|{event.product}" + if not dedup.should_send(key): + record_dedup_skip(event.product) + print( + f"[slack-notifier] dedup-skip: ip={event.ip} product={event.product!r}", + flush=True, + ) + return + + text = ( + f":computer: KMS activation\n" + f"• *Client*: `{event.ip}` ({event.source})\n" + f"• *Workstation*: `{event.host}`\n" + f"• *Product*: `{event.product}`\n" + f"• *Status before*: {event.status}" + ) + slack_post(text) + record_activation(event.product, event.status) + print( + f"[slack-notifier] sent: ip={event.ip} source={event.source} " + f"product={event.product!r} host={event.host!r}", + flush=True, + ) + + def main() -> None: threading.Thread(target=start_metrics_server, daemon=True).start() @@ -182,40 +313,9 @@ def main() -> None: state: dict = {} for line in follow(LOG_PATH): - if (m := OPEN_RE.search(line)): - state = {"ip": m.group(1)} - continue - if not state: - continue - if (m := APP_RE.search(line)): - state["app"] = m.group(1) - elif (m := PROD_RE.search(line)): - state["product"] = m.group(1) - elif (m := HOST_RE.search(line)): - state["host"] = m.group(1) - elif (m := STATUS_RE.search(line)): - state["status"] = m.group(1) - elif CLOSE_RE.search(line): - ip = state.get("ip", "?") - product = state.get("product", state.get("app", "unknown")) - host = state.get("host", "?") - status = state.get("status", "unknown") - key = f"{ip}|{product}" - if dedup.should_send(key): - text = ( - f":computer: KMS activation\n" - f"• *Client*: `{ip}`\n" - f"• *Workstation*: `{host}`\n" - f"• *Product*: `{product}`\n" - f"• *Status before*: {status}" - ) - slack_post(text) - record_activation(product, status) - print(f"[slack-notifier] sent: ip={ip} product={product!r} host={host!r}", flush=True) - else: - record_dedup_skip(product) - print(f"[slack-notifier] dedup-skip: ip={ip} product={product!r}", flush=True) - state = {} + state, event = process_line(line, state) + if event is not None: + handle_event(event, dedup) if __name__ == "__main__": diff --git a/stacks/kms/files/test_slack_notifier.py b/stacks/kms/files/test_slack_notifier.py new file mode 100644 index 00000000..e541a79a --- /dev/null +++ b/stacks/kms/files/test_slack_notifier.py @@ -0,0 +1,110 @@ +"""Unit tests for slack_notifier classification + state machine. + +Run with: cd infra/stacks/kms/files && python3 -m unittest test_slack_notifier +""" +import importlib.util +import os +import unittest +from pathlib import Path + +# Load the notifier module from the dashed filename without executing main(). +os.environ.setdefault("SLACK_WEBHOOK_URL", "http://example.invalid/webhook") +_spec = importlib.util.spec_from_file_location( + "slack_notifier", Path(__file__).parent / "slack-notifier.py" +) +nm = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(nm) + + +class ClassifySourceTests(unittest.TestCase): + def test_pod_cidr_is_internal_pod(self): + self.assertEqual(nm.classify_source("10.10.107.224"), "internal_pod") + self.assertEqual(nm.classify_source("10.10.0.1"), "internal_pod") + + def test_cluster_lan_is_cluster_node(self): + self.assertEqual(nm.classify_source("10.0.20.103"), "cluster_node") + self.assertEqual(nm.classify_source("10.0.20.200"), "cluster_node") + + def test_unknown_source_is_external(self): + self.assertEqual(nm.classify_source("8.8.8.8"), "external") + self.assertEqual(nm.classify_source("203.0.113.42"), "external") + + def test_ipv6_external_default(self): + self.assertEqual(nm.classify_source("[2001:db8::1]"), "external") + + +class IsProbeTests(unittest.TestCase): + def test_open_close_only_is_probe(self): + self.assertTrue(nm.is_probe({"ip": "10.10.107.224"})) + + def test_application_id_only_is_not_probe(self): + self.assertFalse(nm.is_probe({"ip": "10.0.20.103", "app": "Windows"})) + + def test_product_only_is_not_probe(self): + self.assertFalse(nm.is_probe({"ip": "10.0.20.103", "product": "Office 2021"})) + + def test_full_activation_is_not_probe(self): + state = { + "ip": "10.0.20.103", + "app": "Windows", + "product": "Windows 11 Pro", + "host": "DESKTOP-X", + "status": "Notification", + } + self.assertFalse(nm.is_probe(state)) + + +class StateMachineTests(unittest.TestCase): + """Drive the regex parser through real-shaped vlmcsd log blocks.""" + + PROBE_BLOCK = [ + "2026-05-10 11:00:00: IPv4 connection accepted: 10.10.107.224:54321.", + "2026-05-10 11:00:00: IPv4 connection closed: 10.10.107.224:54321.", + ] + + ACTIVATION_BLOCK = [ + "2026-05-10 11:00:01: IPv4 connection accepted: 10.0.20.103:50001.", + "2026-05-10 11:00:01: <<< Incoming KMS request", + "2026-05-10 11:00:01: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)", + "2026-05-10 11:00:01: Activation ID (Product): 73111121-5638-40f6-bc11-f1d7b0d64300 (Windows 11 Pro)", + "2026-05-10 11:00:01: Workstation name : DESKTOP-MO2323B", + "2026-05-10 11:00:01: Licensing status : 2 (Notification)", + "2026-05-10 11:00:01: IPv4 connection closed: 10.0.20.103:50001.", + ] + + def _drive(self, lines): + events = [] + state = {} + for line in lines: + state, event = nm.process_line(line, state) + if event is not None: + events.append(event) + return events, state + + def test_probe_block_emits_probe_event(self): + events, state = self._drive(self.PROBE_BLOCK) + self.assertEqual(len(events), 1) + ev = events[0] + self.assertEqual(ev.kind, "probe") + self.assertEqual(ev.ip, "10.10.107.224") + self.assertEqual(state, {}) + + def test_activation_block_emits_activation_event(self): + events, state = self._drive(self.ACTIVATION_BLOCK) + self.assertEqual(len(events), 1) + ev = events[0] + self.assertEqual(ev.kind, "activation") + self.assertEqual(ev.ip, "10.0.20.103") + self.assertEqual(ev.product, "Windows 11 Pro") + self.assertEqual(ev.host, "DESKTOP-MO2323B") + self.assertEqual(ev.status, "Notification") + self.assertEqual(state, {}) + + def test_interleaved_probe_then_activation(self): + events, _ = self._drive(self.PROBE_BLOCK + self.ACTIVATION_BLOCK) + kinds = [e.kind for e in events] + self.assertEqual(kinds, ["probe", "activation"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/stacks/kms/main.tf b/stacks/kms/main.tf index 127568a2..2408f523 100644 --- a/stacks/kms/main.tf +++ b/stacks/kms/main.tf @@ -228,6 +228,23 @@ resource "kubernetes_deployment" "windows_kms" { port { container_port = 1688 } + # Gate Pod Ready on the listener actually being up. Required for + # ETP=Local: MetalLB only advertises 10.0.20.202 from a node where + # the backing pod is Ready, so without this the pod is "Ready" + # before vlmcsd has bound 1688 and ARP can briefly point at a node + # that drops connections during pod start. + readiness_probe { + tcp_socket { port = 1688 } + initial_delay_seconds = 1 + period_seconds = 5 + failure_threshold = 3 + } + liveness_probe { + tcp_socket { port = 1688 } + initial_delay_seconds = 5 + period_seconds = 30 + failure_threshold = 3 + } volume_mount { name = "vlmcsd-log" mount_path = "/var/log/vlmcsd" @@ -300,14 +317,17 @@ resource "kubernetes_service" "windows_kms" { app = "kms-service" } annotations = { - "metallb.io/loadBalancerIPs" = "10.0.20.200" - "metallb.io/allow-shared-ip" = "shared" + # Dedicated MetalLB IP (not shared) so ETP=Local can preserve real + # client IPs in the vlmcsd log. Sharing 10.0.20.200 isn't an option: + # all 10 services there are ETP=Cluster and MetalLB requires a single + # ETP per shared IP. + "metallb.io/loadBalancerIPs" = "10.0.20.202" } } spec { type = "LoadBalancer" - external_traffic_policy = "Cluster" + external_traffic_policy = "Local" selector = { app = "kms-service" }