From d85b54d89d261b7c038b73d5afc7137a3fe0eb64 Mon Sep 17 00:00:00 2001 From: Viktor Barzin Date: Sun, 10 May 2026 13:21:38 +0000 Subject: [PATCH] kms: per-connection state in notifier (vlmcsd is multi-threaded) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug found via E2E test against the Windows VM (VMID 300). The single shared `state` dict in slack-notifier.py worked when vlmcsd processed one connection at a time, but real Windows KMS activations hold the connection open ~30 seconds (handshake + keep-alive). During that window vlmcsd accepts other concurrent connections — most relevantly the new kubelet TCP readiness probe every 5s — and each new OPEN line reset the shared state, wiping the in-flight activation's app/product/host before its CLOSE arrived. Result: real activations were misclassified as probes (no Slack post, no metric increment). Fix: state is now a dict keyed by `ip:port` with one sub-dict per in-flight connection. A `__current` pointer tracks the most recent OPEN so unkeyed detail lines (Application ID, Workstation name, etc.) can be attributed correctly — vlmcsd writes detail lines immediately after the OPEN and before any subsequent OPEN, so the heuristic holds. Orphan CLOSEs (notifier started mid-conn) are now silently dropped instead of emitting an empty probe event. Two new regression tests: - test_kubelet_probe_during_long_activation: 5s probe interleaved into a 31s activation block — exact production failure mode. - test_orphan_close_no_event: bare CLOSE without prior OPEN. Verified live: triggered slmgr /upk + /ipk + /skms 10.0.20.202 + /ato on WIN10Pro-DS32. vlmcsd logged the full activation block, notifier posted to Slack with ip=192.168.1.230 source=external product='Windows 10 Professional' host='WIN10Pro-DS32.viktorbarzin.lan' and kms_activations_total{product=Windows 10 Professional, status=Licensed} 1 — real WAN client IP preserved through the ETP=Local + dedicated MetalLB IP chain end to end. --- stacks/kms/files/slack-notifier.py | 67 +++++++++++++++---------- stacks/kms/files/test_slack_notifier.py | 41 +++++++++++++++ 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/stacks/kms/files/slack-notifier.py b/stacks/kms/files/slack-notifier.py index 22c75017..48e98ecd 100644 --- a/stacks/kms/files/slack-notifier.py +++ b/stacks/kms/files/slack-notifier.py @@ -45,8 +45,8 @@ DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW_SECONDS", "3600")) DEDUP_MAX = 4096 METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101")) -OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):\d+") -CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):\d+") +OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):(\d+)") +CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):(\d+)") APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$") @@ -106,40 +106,53 @@ class Event: def process_line(line: str, state: dict): """Drive the parser one line at a time. Returns (new_state, event_or_None). - On `connection accepted`, a new state dict is started. - On `connection closed`, an Event is emitted and state is reset. - Other lines accumulate fields into the current state. + `state` tracks per-connection state keyed by `ip:port` (vlmcsd is + multi-threaded — concurrent connections interleave in the log, so a + single shared dict gets clobbered). The special key `__current` points + at the most recent OPEN's key so detail lines (which lack ip:port + info) can be attributed to the right connection. Detail lines arrive + before the next OPEN under vlmcsd's processing model. """ if (m := OPEN_RE.search(line)): - return {"ip": m.group(1)}, None - if not state: + ip = m.group(1) + key = f"{ip}:{m.group(2)}" + state[key] = {"ip": ip} + state["__current"] = key return state, None - if (m := APP_RE.search(line)): - state["app"] = m.group(1) - return state, None - if (m := PROD_RE.search(line)): - state["product"] = m.group(1) - return state, None - if (m := HOST_RE.search(line)): - state["host"] = m.group(1) - return state, None - if (m := STATUS_RE.search(line)): - state["status"] = m.group(1) - return state, None - if CLOSE_RE.search(line): - ip = state.get("ip", "?") + + if (m := CLOSE_RE.search(line)): + key = f"{m.group(1)}:{m.group(2)}" + conn = state.pop(key, None) + if state.get("__current") == key: + state.pop("__current", None) + if conn is None: + return state, None # orphan close (e.g., notifier started mid-conn) + ip = conn.get("ip", "?") source = classify_source(ip) - if is_probe(state): + if is_probe(conn): event = Event("probe", ip, source) else: event = Event( "activation", ip, source, - app=state.get("app", ""), - product=state.get("product", state.get("app", "unknown")), - host=state.get("host", "?"), - status=state.get("status", "unknown"), + app=conn.get("app", ""), + product=conn.get("product", conn.get("app", "unknown")), + host=conn.get("host", "?"), + status=conn.get("status", "unknown"), ) - return {}, event + return state, event + + current = state.get("__current") + if not current or current not in state: + return state, None + conn = state[current] + if (m := APP_RE.search(line)): + conn["app"] = m.group(1) + elif (m := PROD_RE.search(line)): + conn["product"] = m.group(1) + elif (m := HOST_RE.search(line)): + conn["host"] = m.group(1) + elif (m := STATUS_RE.search(line)): + conn["status"] = m.group(1) return state, None diff --git a/stacks/kms/files/test_slack_notifier.py b/stacks/kms/files/test_slack_notifier.py index e541a79a..d873211b 100644 --- a/stacks/kms/files/test_slack_notifier.py +++ b/stacks/kms/files/test_slack_notifier.py @@ -105,6 +105,47 @@ class StateMachineTests(unittest.TestCase): kinds = [e.kind for e in events] self.assertEqual(kinds, ["probe", "activation"]) + def test_kubelet_probe_during_long_activation(self): + """vlmcsd is multi-threaded. While a real KMS RPC's connection + sits open (Windows holds it ~30s), kubelet's TCP readiness probe + every 5s opens+closes its own connection. The notifier MUST NOT + let the probe's OPEN/CLOSE wipe the in-flight activation's state. + Reproduces the production bug seen on 2026-05-10. + """ + interleaved = [ + "2026-05-10 13:12:17: IPv4 connection accepted: 192.168.1.230:53140.", + "2026-05-10 13:12:17: <<< Incoming KMS request", + "2026-05-10 13:12:17: Licensing status : 1 (Licensed)", + "2026-05-10 13:12:17: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)", + "2026-05-10 13:12:17: Activation ID (Product) : 2de67392-b7a7-462a-b1ca-108dd189f588 (Windows 10 Professional)", + "2026-05-10 13:12:17: Workstation name : WIN10Pro-DS32.viktorbarzin.lan", + # ── kubelet probe arrives mid-flight, MUST NOT clobber 53140's state ── + "2026-05-10 13:12:19: IPv4 connection accepted: 10.0.20.102:46498.", + "2026-05-10 13:12:19: IPv4 connection closed: 10.0.20.102:46498.", + "2026-05-10 13:12:24: IPv4 connection accepted: 10.0.20.102:54454.", + "2026-05-10 13:12:24: IPv4 connection closed: 10.0.20.102:54454.", + # ── activation closes 31s after open ── + "2026-05-10 13:12:48: IPv4 connection closed: 192.168.1.230:53140.", + ] + events, _ = self._drive(interleaved) + kinds = [e.kind for e in events] + self.assertEqual(kinds, ["probe", "probe", "activation"]) + activation = events[-1] + self.assertEqual(activation.ip, "192.168.1.230") + self.assertEqual(activation.product, "Windows 10 Professional") + self.assertEqual(activation.host, "WIN10Pro-DS32.viktorbarzin.lan") + self.assertEqual(activation.status, "Licensed") + + def test_orphan_close_no_event(self): + """If the notifier starts mid-conn, the open was missed but the + close still fires. We MUST NOT emit an event for that — it would + show up with empty fields and look like a probe.""" + orphan = [ + "2026-05-10 13:00:00: IPv4 connection closed: 192.168.1.230:55555.", + ] + events, _ = self._drive(orphan) + self.assertEqual(events, []) + if __name__ == "__main__": unittest.main()