kms: per-connection state in notifier (vlmcsd is multi-threaded)
Bug found via E2E test against the Windows VM (VMID 300). The single
shared `state` dict in slack-notifier.py worked when vlmcsd processed
one connection at a time, but real Windows KMS activations hold the
connection open ~30 seconds (handshake + keep-alive). During that
window vlmcsd accepts other concurrent connections — most relevantly
the new kubelet TCP readiness probe every 5s — and each new OPEN line
reset the shared state, wiping the in-flight activation's
app/product/host before its CLOSE arrived. Result: real activations
were misclassified as probes (no Slack post, no metric increment).
Fix: state is now a dict keyed by `ip:port` with one sub-dict per
in-flight connection. A `__current` pointer tracks the most recent
OPEN so unkeyed detail lines (Application ID, Workstation name, etc.)
can be attributed correctly — vlmcsd writes detail lines immediately
after the OPEN and before any subsequent OPEN, so the heuristic holds.
Orphan CLOSEs (notifier started mid-conn) are now silently dropped
instead of emitting an empty probe event.
Two new regression tests:
- test_kubelet_probe_during_long_activation: 5s probe interleaved into
a 31s activation block — exact production failure mode.
- test_orphan_close_no_event: bare CLOSE without prior OPEN.
Verified live: triggered slmgr /upk + /ipk + /skms 10.0.20.202 + /ato
on WIN10Pro-DS32. vlmcsd logged the full activation block, notifier
posted to Slack with ip=192.168.1.230 source=external
product='Windows 10 Professional' host='WIN10Pro-DS32.viktorbarzin.lan'
and kms_activations_total{product=Windows 10 Professional,
status=Licensed} 1 — real WAN client IP preserved through the
ETP=Local + dedicated MetalLB IP chain end to end.
This commit is contained in:
parent
4a3ca572e8
commit
d85b54d89d
2 changed files with 81 additions and 27 deletions
|
|
@ -45,8 +45,8 @@ DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW_SECONDS", "3600"))
|
|||
DEDUP_MAX = 4096
|
||||
METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101"))
|
||||
|
||||
OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):\d+")
|
||||
CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):\d+")
|
||||
OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):(\d+)")
|
||||
CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):(\d+)")
|
||||
APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
|
||||
PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
|
||||
HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$")
|
||||
|
|
@ -106,40 +106,53 @@ class Event:
|
|||
def process_line(line: str, state: dict):
|
||||
"""Drive the parser one line at a time. Returns (new_state, event_or_None).
|
||||
|
||||
On `connection accepted`, a new state dict is started.
|
||||
On `connection closed`, an Event is emitted and state is reset.
|
||||
Other lines accumulate fields into the current state.
|
||||
`state` tracks per-connection state keyed by `ip:port` (vlmcsd is
|
||||
multi-threaded — concurrent connections interleave in the log, so a
|
||||
single shared dict gets clobbered). The special key `__current` points
|
||||
at the most recent OPEN's key so detail lines (which lack ip:port
|
||||
info) can be attributed to the right connection. Detail lines arrive
|
||||
before the next OPEN under vlmcsd's processing model.
|
||||
"""
|
||||
if (m := OPEN_RE.search(line)):
|
||||
return {"ip": m.group(1)}, None
|
||||
if not state:
|
||||
ip = m.group(1)
|
||||
key = f"{ip}:{m.group(2)}"
|
||||
state[key] = {"ip": ip}
|
||||
state["__current"] = key
|
||||
return state, None
|
||||
if (m := APP_RE.search(line)):
|
||||
state["app"] = m.group(1)
|
||||
return state, None
|
||||
if (m := PROD_RE.search(line)):
|
||||
state["product"] = m.group(1)
|
||||
return state, None
|
||||
if (m := HOST_RE.search(line)):
|
||||
state["host"] = m.group(1)
|
||||
return state, None
|
||||
if (m := STATUS_RE.search(line)):
|
||||
state["status"] = m.group(1)
|
||||
return state, None
|
||||
if CLOSE_RE.search(line):
|
||||
ip = state.get("ip", "?")
|
||||
|
||||
if (m := CLOSE_RE.search(line)):
|
||||
key = f"{m.group(1)}:{m.group(2)}"
|
||||
conn = state.pop(key, None)
|
||||
if state.get("__current") == key:
|
||||
state.pop("__current", None)
|
||||
if conn is None:
|
||||
return state, None # orphan close (e.g., notifier started mid-conn)
|
||||
ip = conn.get("ip", "?")
|
||||
source = classify_source(ip)
|
||||
if is_probe(state):
|
||||
if is_probe(conn):
|
||||
event = Event("probe", ip, source)
|
||||
else:
|
||||
event = Event(
|
||||
"activation", ip, source,
|
||||
app=state.get("app", ""),
|
||||
product=state.get("product", state.get("app", "unknown")),
|
||||
host=state.get("host", "?"),
|
||||
status=state.get("status", "unknown"),
|
||||
app=conn.get("app", ""),
|
||||
product=conn.get("product", conn.get("app", "unknown")),
|
||||
host=conn.get("host", "?"),
|
||||
status=conn.get("status", "unknown"),
|
||||
)
|
||||
return {}, event
|
||||
return state, event
|
||||
|
||||
current = state.get("__current")
|
||||
if not current or current not in state:
|
||||
return state, None
|
||||
conn = state[current]
|
||||
if (m := APP_RE.search(line)):
|
||||
conn["app"] = m.group(1)
|
||||
elif (m := PROD_RE.search(line)):
|
||||
conn["product"] = m.group(1)
|
||||
elif (m := HOST_RE.search(line)):
|
||||
conn["host"] = m.group(1)
|
||||
elif (m := STATUS_RE.search(line)):
|
||||
conn["status"] = m.group(1)
|
||||
return state, None
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -105,6 +105,47 @@ class StateMachineTests(unittest.TestCase):
|
|||
kinds = [e.kind for e in events]
|
||||
self.assertEqual(kinds, ["probe", "activation"])
|
||||
|
||||
def test_kubelet_probe_during_long_activation(self):
|
||||
"""vlmcsd is multi-threaded. While a real KMS RPC's connection
|
||||
sits open (Windows holds it ~30s), kubelet's TCP readiness probe
|
||||
every 5s opens+closes its own connection. The notifier MUST NOT
|
||||
let the probe's OPEN/CLOSE wipe the in-flight activation's state.
|
||||
Reproduces the production bug seen on 2026-05-10.
|
||||
"""
|
||||
interleaved = [
|
||||
"2026-05-10 13:12:17: IPv4 connection accepted: 192.168.1.230:53140.",
|
||||
"2026-05-10 13:12:17: <<< Incoming KMS request",
|
||||
"2026-05-10 13:12:17: Licensing status : 1 (Licensed)",
|
||||
"2026-05-10 13:12:17: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)",
|
||||
"2026-05-10 13:12:17: Activation ID (Product) : 2de67392-b7a7-462a-b1ca-108dd189f588 (Windows 10 Professional)",
|
||||
"2026-05-10 13:12:17: Workstation name : WIN10Pro-DS32.viktorbarzin.lan",
|
||||
# ── kubelet probe arrives mid-flight, MUST NOT clobber 53140's state ──
|
||||
"2026-05-10 13:12:19: IPv4 connection accepted: 10.0.20.102:46498.",
|
||||
"2026-05-10 13:12:19: IPv4 connection closed: 10.0.20.102:46498.",
|
||||
"2026-05-10 13:12:24: IPv4 connection accepted: 10.0.20.102:54454.",
|
||||
"2026-05-10 13:12:24: IPv4 connection closed: 10.0.20.102:54454.",
|
||||
# ── activation closes 31s after open ──
|
||||
"2026-05-10 13:12:48: IPv4 connection closed: 192.168.1.230:53140.",
|
||||
]
|
||||
events, _ = self._drive(interleaved)
|
||||
kinds = [e.kind for e in events]
|
||||
self.assertEqual(kinds, ["probe", "probe", "activation"])
|
||||
activation = events[-1]
|
||||
self.assertEqual(activation.ip, "192.168.1.230")
|
||||
self.assertEqual(activation.product, "Windows 10 Professional")
|
||||
self.assertEqual(activation.host, "WIN10Pro-DS32.viktorbarzin.lan")
|
||||
self.assertEqual(activation.status, "Licensed")
|
||||
|
||||
def test_orphan_close_no_event(self):
|
||||
"""If the notifier starts mid-conn, the open was missed but the
|
||||
close still fires. We MUST NOT emit an event for that — it would
|
||||
show up with empty fields and look like a probe."""
|
||||
orphan = [
|
||||
"2026-05-10 13:00:00: IPv4 connection closed: 192.168.1.230:55555.",
|
||||
]
|
||||
events, _ = self._drive(orphan)
|
||||
self.assertEqual(events, [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue