diff --git a/stacks/kms/files/slack-notifier.py b/stacks/kms/files/slack-notifier.py index 22c75017..48e98ecd 100644 --- a/stacks/kms/files/slack-notifier.py +++ b/stacks/kms/files/slack-notifier.py @@ -45,8 +45,8 @@ DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW_SECONDS", "3600")) DEDUP_MAX = 4096 METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101")) -OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):\d+") -CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):\d+") +OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):(\d+)") +CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):(\d+)") APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$") @@ -106,40 +106,53 @@ class Event: def process_line(line: str, state: dict): """Drive the parser one line at a time. Returns (new_state, event_or_None). - On `connection accepted`, a new state dict is started. - On `connection closed`, an Event is emitted and state is reset. - Other lines accumulate fields into the current state. + `state` tracks per-connection state keyed by `ip:port` (vlmcsd is + multi-threaded — concurrent connections interleave in the log, so a + single shared dict gets clobbered). The special key `__current` points + at the most recent OPEN's key so detail lines (which lack ip:port + info) can be attributed to the right connection. Detail lines arrive + before the next OPEN under vlmcsd's processing model. """ if (m := OPEN_RE.search(line)): - return {"ip": m.group(1)}, None - if not state: + ip = m.group(1) + key = f"{ip}:{m.group(2)}" + state[key] = {"ip": ip} + state["__current"] = key return state, None - if (m := APP_RE.search(line)): - state["app"] = m.group(1) - return state, None - if (m := PROD_RE.search(line)): - state["product"] = m.group(1) - return state, None - if (m := HOST_RE.search(line)): - state["host"] = m.group(1) - return state, None - if (m := STATUS_RE.search(line)): - state["status"] = m.group(1) - return state, None - if CLOSE_RE.search(line): - ip = state.get("ip", "?") + + if (m := CLOSE_RE.search(line)): + key = f"{m.group(1)}:{m.group(2)}" + conn = state.pop(key, None) + if state.get("__current") == key: + state.pop("__current", None) + if conn is None: + return state, None # orphan close (e.g., notifier started mid-conn) + ip = conn.get("ip", "?") source = classify_source(ip) - if is_probe(state): + if is_probe(conn): event = Event("probe", ip, source) else: event = Event( "activation", ip, source, - app=state.get("app", ""), - product=state.get("product", state.get("app", "unknown")), - host=state.get("host", "?"), - status=state.get("status", "unknown"), + app=conn.get("app", ""), + product=conn.get("product", conn.get("app", "unknown")), + host=conn.get("host", "?"), + status=conn.get("status", "unknown"), ) - return {}, event + return state, event + + current = state.get("__current") + if not current or current not in state: + return state, None + conn = state[current] + if (m := APP_RE.search(line)): + conn["app"] = m.group(1) + elif (m := PROD_RE.search(line)): + conn["product"] = m.group(1) + elif (m := HOST_RE.search(line)): + conn["host"] = m.group(1) + elif (m := STATUS_RE.search(line)): + conn["status"] = m.group(1) return state, None diff --git a/stacks/kms/files/test_slack_notifier.py b/stacks/kms/files/test_slack_notifier.py index e541a79a..d873211b 100644 --- a/stacks/kms/files/test_slack_notifier.py +++ b/stacks/kms/files/test_slack_notifier.py @@ -105,6 +105,47 @@ class StateMachineTests(unittest.TestCase): kinds = [e.kind for e in events] self.assertEqual(kinds, ["probe", "activation"]) + def test_kubelet_probe_during_long_activation(self): + """vlmcsd is multi-threaded. While a real KMS RPC's connection + sits open (Windows holds it ~30s), kubelet's TCP readiness probe + every 5s opens+closes its own connection. The notifier MUST NOT + let the probe's OPEN/CLOSE wipe the in-flight activation's state. + Reproduces the production bug seen on 2026-05-10. + """ + interleaved = [ + "2026-05-10 13:12:17: IPv4 connection accepted: 192.168.1.230:53140.", + "2026-05-10 13:12:17: <<< Incoming KMS request", + "2026-05-10 13:12:17: Licensing status : 1 (Licensed)", + "2026-05-10 13:12:17: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)", + "2026-05-10 13:12:17: Activation ID (Product) : 2de67392-b7a7-462a-b1ca-108dd189f588 (Windows 10 Professional)", + "2026-05-10 13:12:17: Workstation name : WIN10Pro-DS32.viktorbarzin.lan", + # ── kubelet probe arrives mid-flight, MUST NOT clobber 53140's state ── + "2026-05-10 13:12:19: IPv4 connection accepted: 10.0.20.102:46498.", + "2026-05-10 13:12:19: IPv4 connection closed: 10.0.20.102:46498.", + "2026-05-10 13:12:24: IPv4 connection accepted: 10.0.20.102:54454.", + "2026-05-10 13:12:24: IPv4 connection closed: 10.0.20.102:54454.", + # ── activation closes 31s after open ── + "2026-05-10 13:12:48: IPv4 connection closed: 192.168.1.230:53140.", + ] + events, _ = self._drive(interleaved) + kinds = [e.kind for e in events] + self.assertEqual(kinds, ["probe", "probe", "activation"]) + activation = events[-1] + self.assertEqual(activation.ip, "192.168.1.230") + self.assertEqual(activation.product, "Windows 10 Professional") + self.assertEqual(activation.host, "WIN10Pro-DS32.viktorbarzin.lan") + self.assertEqual(activation.status, "Licensed") + + def test_orphan_close_no_event(self): + """If the notifier starts mid-conn, the open was missed but the + close still fires. We MUST NOT emit an event for that — it would + show up with empty fields and look like a probe.""" + orphan = [ + "2026-05-10 13:00:00: IPv4 connection closed: 192.168.1.230:55555.", + ] + events, _ = self._drive(orphan) + self.assertEqual(events, []) + if __name__ == "__main__": unittest.main()