kms: per-connection state in notifier (vlmcsd is multi-threaded)

Bug found via E2E test against the Windows VM (VMID 300). The single
shared `state` dict in slack-notifier.py worked when vlmcsd processed
one connection at a time, but real Windows KMS activations hold the
connection open ~30 seconds (handshake + keep-alive). During that
window vlmcsd accepts other concurrent connections — most relevantly
the new kubelet TCP readiness probe every 5s — and each new OPEN line
reset the shared state, wiping the in-flight activation's
app/product/host before its CLOSE arrived. Result: real activations
were misclassified as probes (no Slack post, no metric increment).

Fix: state is now a dict keyed by `ip:port` with one sub-dict per
in-flight connection. A `__current` pointer tracks the most recent
OPEN so unkeyed detail lines (Application ID, Workstation name, etc.)
can be attributed correctly — vlmcsd writes detail lines immediately
after the OPEN and before any subsequent OPEN, so the heuristic holds.
Orphan CLOSEs (notifier started mid-conn) are now silently dropped
instead of emitting an empty probe event.

Two new regression tests:
- test_kubelet_probe_during_long_activation: 5s probe interleaved into
  a 31s activation block — exact production failure mode.
- test_orphan_close_no_event: bare CLOSE without prior OPEN.

Verified live: triggered slmgr /upk + /ipk + /skms 10.0.20.202 + /ato
on WIN10Pro-DS32. vlmcsd logged the full activation block, notifier
posted to Slack with ip=192.168.1.230 source=external
product='Windows 10 Professional' host='WIN10Pro-DS32.viktorbarzin.lan'
and kms_activations_total{product=Windows 10 Professional,
status=Licensed} 1 — real WAN client IP preserved through the
ETP=Local + dedicated MetalLB IP chain end to end.
This commit is contained in:
Viktor Barzin 2026-05-10 13:21:38 +00:00
parent 9163909fad
commit d67c3027bc
2 changed files with 81 additions and 27 deletions

View file

@ -45,8 +45,8 @@ DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW_SECONDS", "3600"))
DEDUP_MAX = 4096 DEDUP_MAX = 4096
METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101")) METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101"))
OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):\d+") OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):(\d+)")
CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):\d+") CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):(\d+)")
APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)") PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$") HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$")
@ -106,40 +106,53 @@ class Event:
def process_line(line: str, state: dict): def process_line(line: str, state: dict):
"""Drive the parser one line at a time. Returns (new_state, event_or_None). """Drive the parser one line at a time. Returns (new_state, event_or_None).
On `connection accepted`, a new state dict is started. `state` tracks per-connection state keyed by `ip:port` (vlmcsd is
On `connection closed`, an Event is emitted and state is reset. multi-threaded concurrent connections interleave in the log, so a
Other lines accumulate fields into the current state. single shared dict gets clobbered). The special key `__current` points
at the most recent OPEN's key so detail lines (which lack ip:port
info) can be attributed to the right connection. Detail lines arrive
before the next OPEN under vlmcsd's processing model.
""" """
if (m := OPEN_RE.search(line)): if (m := OPEN_RE.search(line)):
return {"ip": m.group(1)}, None ip = m.group(1)
if not state: key = f"{ip}:{m.group(2)}"
state[key] = {"ip": ip}
state["__current"] = key
return state, None return state, None
if (m := APP_RE.search(line)):
state["app"] = m.group(1) if (m := CLOSE_RE.search(line)):
return state, None key = f"{m.group(1)}:{m.group(2)}"
if (m := PROD_RE.search(line)): conn = state.pop(key, None)
state["product"] = m.group(1) if state.get("__current") == key:
return state, None state.pop("__current", None)
if (m := HOST_RE.search(line)): if conn is None:
state["host"] = m.group(1) return state, None # orphan close (e.g., notifier started mid-conn)
return state, None ip = conn.get("ip", "?")
if (m := STATUS_RE.search(line)):
state["status"] = m.group(1)
return state, None
if CLOSE_RE.search(line):
ip = state.get("ip", "?")
source = classify_source(ip) source = classify_source(ip)
if is_probe(state): if is_probe(conn):
event = Event("probe", ip, source) event = Event("probe", ip, source)
else: else:
event = Event( event = Event(
"activation", ip, source, "activation", ip, source,
app=state.get("app", ""), app=conn.get("app", ""),
product=state.get("product", state.get("app", "unknown")), product=conn.get("product", conn.get("app", "unknown")),
host=state.get("host", "?"), host=conn.get("host", "?"),
status=state.get("status", "unknown"), status=conn.get("status", "unknown"),
) )
return {}, event return state, event
current = state.get("__current")
if not current or current not in state:
return state, None
conn = state[current]
if (m := APP_RE.search(line)):
conn["app"] = m.group(1)
elif (m := PROD_RE.search(line)):
conn["product"] = m.group(1)
elif (m := HOST_RE.search(line)):
conn["host"] = m.group(1)
elif (m := STATUS_RE.search(line)):
conn["status"] = m.group(1)
return state, None return state, None

View file

@ -105,6 +105,47 @@ class StateMachineTests(unittest.TestCase):
kinds = [e.kind for e in events] kinds = [e.kind for e in events]
self.assertEqual(kinds, ["probe", "activation"]) self.assertEqual(kinds, ["probe", "activation"])
def test_kubelet_probe_during_long_activation(self):
"""vlmcsd is multi-threaded. While a real KMS RPC's connection
sits open (Windows holds it ~30s), kubelet's TCP readiness probe
every 5s opens+closes its own connection. The notifier MUST NOT
let the probe's OPEN/CLOSE wipe the in-flight activation's state.
Reproduces the production bug seen on 2026-05-10.
"""
interleaved = [
"2026-05-10 13:12:17: IPv4 connection accepted: 192.168.1.230:53140.",
"2026-05-10 13:12:17: <<< Incoming KMS request",
"2026-05-10 13:12:17: Licensing status : 1 (Licensed)",
"2026-05-10 13:12:17: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)",
"2026-05-10 13:12:17: Activation ID (Product) : 2de67392-b7a7-462a-b1ca-108dd189f588 (Windows 10 Professional)",
"2026-05-10 13:12:17: Workstation name : WIN10Pro-DS32.viktorbarzin.lan",
# ── kubelet probe arrives mid-flight, MUST NOT clobber 53140's state ──
"2026-05-10 13:12:19: IPv4 connection accepted: 10.0.20.102:46498.",
"2026-05-10 13:12:19: IPv4 connection closed: 10.0.20.102:46498.",
"2026-05-10 13:12:24: IPv4 connection accepted: 10.0.20.102:54454.",
"2026-05-10 13:12:24: IPv4 connection closed: 10.0.20.102:54454.",
# ── activation closes 31s after open ──
"2026-05-10 13:12:48: IPv4 connection closed: 192.168.1.230:53140.",
]
events, _ = self._drive(interleaved)
kinds = [e.kind for e in events]
self.assertEqual(kinds, ["probe", "probe", "activation"])
activation = events[-1]
self.assertEqual(activation.ip, "192.168.1.230")
self.assertEqual(activation.product, "Windows 10 Professional")
self.assertEqual(activation.host, "WIN10Pro-DS32.viktorbarzin.lan")
self.assertEqual(activation.status, "Licensed")
def test_orphan_close_no_event(self):
"""If the notifier starts mid-conn, the open was missed but the
close still fires. We MUST NOT emit an event for that it would
show up with empty fields and look like a probe."""
orphan = [
"2026-05-10 13:00:00: IPv4 connection closed: 192.168.1.230:55555.",
]
events, _ = self._drive(orphan)
self.assertEqual(events, [])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()