fix: restore tree dropped by 6d224861; land stem95su gdrive-sync (10m) [ci skip]

6d224861 came from a --no-checkout worktree whose empty index made the
commit drop every file except two. This restores 05b50d2b's full tree and
correctly adds stacks/stem95su/gdrive-sync.tf + the service-catalog stem95su
entry. Forward-only (parent=6d224861, no force-push); [ci skip] since the
live infra was never applied from the broken commit.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Viktor Barzin 2026-06-09 08:45:33 +00:00
parent 6d224861c4
commit fd0f4a0365
1166 changed files with 358546 additions and 0 deletions

2
stacks/kms/files/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
__pycache__/
*.pyc

View file

@ -0,0 +1,33 @@
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
MAX = 16384
class Handler(BaseHTTPRequestHandler):
def _respond(self, code):
self.send_response(code)
self.send_header('Content-Length', '0')
self.end_headers()
def do_POST(self):
if self.path.rstrip('/') != '/diag':
self._respond(404); return
try:
n = int(self.headers.get('Content-Length', 0) or 0)
n = min(n, MAX) if n > 0 else 0
raw = self.rfile.read(n).decode('utf-8', 'replace') if n else ''
obj = json.loads(raw) if raw.strip() else {}
if not isinstance(obj, dict):
obj = {'_raw': str(obj)[:1000]}
ip = self.headers.get('X-Forwarded-For', self.client_address[0]).split(',')[0].strip()
obj['_ip'] = ip
print('KMSDIAG ' + json.dumps(obj, separators=(',', ':'))[:MAX], flush=True)
except Exception as e:
print('KMSDIAG_ERR ' + repr(e)[:500], flush=True)
self._respond(204)
def do_GET(self):
self._respond(200 if self.path.rstrip('/') in ('/healthz', '/diag') else 404)
def log_message(self, *a):
pass
if __name__ == '__main__':
HTTPServer(('0.0.0.0', 9102), Handler).serve_forever()

View file

@ -0,0 +1,335 @@
#!/usr/bin/env python3
"""
Tail vlmcsd verbose log; post a Slack message per activation, and expose
Prometheus metrics on /metrics for activation counts.
vlmcsd verbose output emits a multi-line block per request:
<ts>: IPv4 connection accepted: <ip>:<port>.
<ts>: <<< Incoming KMS request
<ts>: Application ID : <uuid> (<name>)
<ts>: Activation ID (Product): <uuid> (<product>)
<ts>: Workstation name : <hostname>
...
<ts>: IPv4 connection closed: <ip>:<port>.
A bare TCP open/close pair (no Application/Activation lines) is a probe
typically Uptime Kuma's port-type monitor on windows-kms.kms:1688. Probes
are counted in `kms_connection_probes_total` but never posted to Slack.
Real activations dedupe by (client_ip, product) within DEDUP_WINDOW_SECONDS
to avoid spam from Windows' default 7-day re-activation cycle.
Prometheus metrics (text format, no client_ip label cardinality):
kms_activations_total{product, status} counter
kms_activations_dedup_skipped_total{product} counter
kms_connection_probes_total{source} counter (probe-only conns)
kms_last_activation_timestamp_seconds gauge
kms_slack_notifier_up gauge (heartbeat)
"""
import ipaddress
import json
import os
import re
import sys
import threading
import time
import urllib.error
import urllib.request
from collections import OrderedDict
from http.server import BaseHTTPRequestHandler, HTTPServer
LOG_PATH = os.environ.get("VLMCSD_LOG", "/var/log/vlmcsd/vlmcsd.log")
WEBHOOK = os.environ["SLACK_WEBHOOK_URL"]
CHANNEL = os.environ.get("SLACK_CHANNEL", "#alerts")
DEDUP_WINDOW = int(os.environ.get("DEDUP_WINDOW_SECONDS", "3600"))
DEDUP_MAX = 4096
METRICS_PORT = int(os.environ.get("METRICS_PORT", "9101"))
OPEN_RE = re.compile(r":\s*IPv[46] connection accepted:\s*([0-9a-f.:\[\]]+):(\d+)")
CLOSE_RE = re.compile(r":\s*IPv[46] connection closed:\s*([0-9a-f.:\[\]]+):(\d+)")
APP_RE = re.compile(r":\s*Application ID\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
PROD_RE = re.compile(r":\s*Activation ID \(Product\)\s*:\s*[0-9a-f-]+\s*\(([^)]+)\)")
HOST_RE = re.compile(r":\s*Workstation name\s*:\s*(.+?)\s*$")
STATUS_RE = re.compile(r":\s*Licensing status\s*:\s*\d+\s*\((.+?)\)\s*$")
# Pod CIDR (Calico, kube-proxy SNAT-free intra-cluster traffic) and cluster
# LAN (kube-proxy SNATs ETP=Cluster external traffic to a node IP). Anything
# else is a real client IP that arrived via ETP=Local or pod-to-svc routing.
POD_CIDR = ipaddress.ip_network("10.10.0.0/16")
CLUSTER_LAN_CIDR = ipaddress.ip_network("10.0.20.0/24")
_metrics_lock = threading.Lock()
_activations: dict = {}
_dedup_skipped: dict = {}
_probes: dict = {}
_last_activation_ts: float = 0.0
def _esc(value: str) -> str:
return str(value).replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
def classify_source(ip: str) -> str:
"""Return 'internal_pod', 'cluster_node', or 'external' for a vlmcsd-logged IP."""
raw = ip.strip().strip("[]")
try:
addr = ipaddress.ip_address(raw)
except ValueError:
return "external"
if isinstance(addr, ipaddress.IPv4Address):
if addr in POD_CIDR:
return "internal_pod"
if addr in CLUSTER_LAN_CIDR:
return "cluster_node"
return "external"
def is_probe(state: dict) -> bool:
"""A connection that closed without any Application/Activation block."""
return "app" not in state and "product" not in state
class Event:
__slots__ = ("kind", "ip", "source", "app", "product", "host", "status")
def __init__(self, kind: str, ip: str, source: str, *, app: str = "",
product: str = "", host: str = "", status: str = "") -> None:
self.kind = kind
self.ip = ip
self.source = source
self.app = app
self.product = product
self.host = host
self.status = status
def process_line(line: str, state: dict):
"""Drive the parser one line at a time. Returns (new_state, event_or_None).
`state` tracks per-connection state keyed by `ip:port` (vlmcsd is
multi-threaded concurrent connections interleave in the log, so a
single shared dict gets clobbered). The special key `__current` points
at the most recent OPEN's key so detail lines (which lack ip:port
info) can be attributed to the right connection. Detail lines arrive
before the next OPEN under vlmcsd's processing model.
"""
if (m := OPEN_RE.search(line)):
ip = m.group(1)
key = f"{ip}:{m.group(2)}"
state[key] = {"ip": ip}
state["__current"] = key
return state, None
if (m := CLOSE_RE.search(line)):
key = f"{m.group(1)}:{m.group(2)}"
conn = state.pop(key, None)
if state.get("__current") == key:
state.pop("__current", None)
if conn is None:
return state, None # orphan close (e.g., notifier started mid-conn)
ip = conn.get("ip", "?")
source = classify_source(ip)
if is_probe(conn):
event = Event("probe", ip, source)
else:
event = Event(
"activation", ip, source,
app=conn.get("app", ""),
product=conn.get("product", conn.get("app", "unknown")),
host=conn.get("host", "?"),
status=conn.get("status", "unknown"),
)
return state, event
current = state.get("__current")
if not current or current not in state:
return state, None
conn = state[current]
if (m := APP_RE.search(line)):
conn["app"] = m.group(1)
elif (m := PROD_RE.search(line)):
conn["product"] = m.group(1)
elif (m := HOST_RE.search(line)):
conn["host"] = m.group(1)
elif (m := STATUS_RE.search(line)):
conn["status"] = m.group(1)
return state, None
def record_activation(product: str, status: str) -> None:
global _last_activation_ts
with _metrics_lock:
key = (product, status)
_activations[key] = _activations.get(key, 0) + 1
_last_activation_ts = time.time()
def record_dedup_skip(product: str) -> None:
with _metrics_lock:
_dedup_skipped[product] = _dedup_skipped.get(product, 0) + 1
def record_probe(source: str) -> None:
with _metrics_lock:
_probes[source] = _probes.get(source, 0) + 1
def render_metrics() -> bytes:
out = []
with _metrics_lock:
activations = dict(_activations)
dedup_skipped = dict(_dedup_skipped)
probes = dict(_probes)
last_ts = _last_activation_ts
out.append("# HELP kms_activations_total KMS activation events that resulted in a Slack post.")
out.append("# TYPE kms_activations_total counter")
for (product, status), count in sorted(activations.items()):
out.append(
f'kms_activations_total{{product="{_esc(product)}",status="{_esc(status)}"}} {count}'
)
out.append("# HELP kms_activations_dedup_skipped_total KMS activation events suppressed by dedup window.")
out.append("# TYPE kms_activations_dedup_skipped_total counter")
for product, count in sorted(dedup_skipped.items()):
out.append(f'kms_activations_dedup_skipped_total{{product="{_esc(product)}"}} {count}')
out.append("# HELP kms_connection_probes_total Probe-only TCP connections (open+close, no KMS RPC).")
out.append("# TYPE kms_connection_probes_total counter")
for source, count in sorted(probes.items()):
out.append(f'kms_connection_probes_total{{source="{_esc(source)}"}} {count}')
out.append("# HELP kms_last_activation_timestamp_seconds Unix ts of the last non-deduped activation.")
out.append("# TYPE kms_last_activation_timestamp_seconds gauge")
out.append(f"kms_last_activation_timestamp_seconds {last_ts}")
out.append("# HELP kms_slack_notifier_up 1 while the notifier process is running.")
out.append("# TYPE kms_slack_notifier_up gauge")
out.append("kms_slack_notifier_up 1")
return ("\n".join(out) + "\n").encode("utf-8")
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/healthz":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"ok\n")
return
if self.path != "/metrics":
self.send_response(404)
self.end_headers()
return
body = render_metrics()
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, *args, **kwargs):
pass
def start_metrics_server() -> None:
server = HTTPServer(("0.0.0.0", METRICS_PORT), MetricsHandler)
print(f"[slack-notifier] metrics on :{METRICS_PORT}/metrics", flush=True)
server.serve_forever()
def slack_post(text: str) -> None:
payload = json.dumps({"channel": CHANNEL, "text": text, "username": "kms.viktorbarzin.me", "icon_emoji": ":computer:"}).encode("utf-8")
req = urllib.request.Request(WEBHOOK, data=payload, headers={"Content-Type": "application/json"})
try:
urllib.request.urlopen(req, timeout=10).read()
except urllib.error.URLError as exc:
print(f"[slack] post failed: {exc}", file=sys.stderr)
class DedupCache(OrderedDict):
def should_send(self, key: str) -> bool:
now = time.time()
while self and (now - next(iter(self.values()))) > DEDUP_WINDOW:
self.popitem(last=False)
if key in self and (now - self[key]) < DEDUP_WINDOW:
return False
if len(self) >= DEDUP_MAX:
self.popitem(last=False)
self[key] = now
self.move_to_end(key)
return True
def follow(path: str):
while not os.path.exists(path):
time.sleep(1)
fh = open(path, "r", encoding="utf-8", errors="replace")
fh.seek(0, 2)
inode = os.fstat(fh.fileno()).st_ino
while True:
line = fh.readline()
if line:
yield line.rstrip("\n")
continue
time.sleep(0.5)
try:
new_inode = os.stat(path).st_ino
if new_inode != inode:
fh.close()
fh = open(path, "r", encoding="utf-8", errors="replace")
inode = new_inode
except FileNotFoundError:
time.sleep(1)
def handle_event(event: "Event", dedup: "DedupCache") -> None:
if event.kind == "probe":
record_probe(event.source)
print(
f"[slack-notifier] probe: ip={event.ip} source={event.source}",
flush=True,
)
return
key = f"{event.ip}|{event.product}"
if not dedup.should_send(key):
record_dedup_skip(event.product)
print(
f"[slack-notifier] dedup-skip: ip={event.ip} product={event.product!r}",
flush=True,
)
return
text = (
f":computer: KMS activation\n"
f"• *Client*: `{event.ip}` ({event.source})\n"
f"• *Workstation*: `{event.host}`\n"
f"• *Product*: `{event.product}`\n"
f"• *Status before*: {event.status}"
)
slack_post(text)
record_activation(event.product, event.status)
print(
f"[slack-notifier] sent: ip={event.ip} source={event.source} "
f"product={event.product!r} host={event.host!r}",
flush=True,
)
def main() -> None:
threading.Thread(target=start_metrics_server, daemon=True).start()
dedup = DedupCache()
print(f"[slack-notifier] tailing {LOG_PATH}, posting to {CHANNEL} via Slack", flush=True)
state: dict = {}
for line in follow(LOG_PATH):
state, event = process_line(line, state)
if event is not None:
handle_event(event, dedup)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,151 @@
"""Unit tests for slack_notifier classification + state machine.
Run with: cd infra/stacks/kms/files && python3 -m unittest test_slack_notifier
"""
import importlib.util
import os
import unittest
from pathlib import Path
# Load the notifier module from the dashed filename without executing main().
os.environ.setdefault("SLACK_WEBHOOK_URL", "http://example.invalid/webhook")
_spec = importlib.util.spec_from_file_location(
"slack_notifier", Path(__file__).parent / "slack-notifier.py"
)
nm = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(nm)
class ClassifySourceTests(unittest.TestCase):
def test_pod_cidr_is_internal_pod(self):
self.assertEqual(nm.classify_source("10.10.107.224"), "internal_pod")
self.assertEqual(nm.classify_source("10.10.0.1"), "internal_pod")
def test_cluster_lan_is_cluster_node(self):
self.assertEqual(nm.classify_source("10.0.20.103"), "cluster_node")
self.assertEqual(nm.classify_source("10.0.20.200"), "cluster_node")
def test_unknown_source_is_external(self):
self.assertEqual(nm.classify_source("8.8.8.8"), "external")
self.assertEqual(nm.classify_source("203.0.113.42"), "external")
def test_ipv6_external_default(self):
self.assertEqual(nm.classify_source("[2001:db8::1]"), "external")
class IsProbeTests(unittest.TestCase):
def test_open_close_only_is_probe(self):
self.assertTrue(nm.is_probe({"ip": "10.10.107.224"}))
def test_application_id_only_is_not_probe(self):
self.assertFalse(nm.is_probe({"ip": "10.0.20.103", "app": "Windows"}))
def test_product_only_is_not_probe(self):
self.assertFalse(nm.is_probe({"ip": "10.0.20.103", "product": "Office 2021"}))
def test_full_activation_is_not_probe(self):
state = {
"ip": "10.0.20.103",
"app": "Windows",
"product": "Windows 11 Pro",
"host": "DESKTOP-X",
"status": "Notification",
}
self.assertFalse(nm.is_probe(state))
class StateMachineTests(unittest.TestCase):
"""Drive the regex parser through real-shaped vlmcsd log blocks."""
PROBE_BLOCK = [
"2026-05-10 11:00:00: IPv4 connection accepted: 10.10.107.224:54321.",
"2026-05-10 11:00:00: IPv4 connection closed: 10.10.107.224:54321.",
]
ACTIVATION_BLOCK = [
"2026-05-10 11:00:01: IPv4 connection accepted: 10.0.20.103:50001.",
"2026-05-10 11:00:01: <<< Incoming KMS request",
"2026-05-10 11:00:01: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)",
"2026-05-10 11:00:01: Activation ID (Product): 73111121-5638-40f6-bc11-f1d7b0d64300 (Windows 11 Pro)",
"2026-05-10 11:00:01: Workstation name : DESKTOP-MO2323B",
"2026-05-10 11:00:01: Licensing status : 2 (Notification)",
"2026-05-10 11:00:01: IPv4 connection closed: 10.0.20.103:50001.",
]
def _drive(self, lines):
events = []
state = {}
for line in lines:
state, event = nm.process_line(line, state)
if event is not None:
events.append(event)
return events, state
def test_probe_block_emits_probe_event(self):
events, state = self._drive(self.PROBE_BLOCK)
self.assertEqual(len(events), 1)
ev = events[0]
self.assertEqual(ev.kind, "probe")
self.assertEqual(ev.ip, "10.10.107.224")
self.assertEqual(state, {})
def test_activation_block_emits_activation_event(self):
events, state = self._drive(self.ACTIVATION_BLOCK)
self.assertEqual(len(events), 1)
ev = events[0]
self.assertEqual(ev.kind, "activation")
self.assertEqual(ev.ip, "10.0.20.103")
self.assertEqual(ev.product, "Windows 11 Pro")
self.assertEqual(ev.host, "DESKTOP-MO2323B")
self.assertEqual(ev.status, "Notification")
self.assertEqual(state, {})
def test_interleaved_probe_then_activation(self):
events, _ = self._drive(self.PROBE_BLOCK + self.ACTIVATION_BLOCK)
kinds = [e.kind for e in events]
self.assertEqual(kinds, ["probe", "activation"])
def test_kubelet_probe_during_long_activation(self):
"""vlmcsd is multi-threaded. While a real KMS RPC's connection
sits open (Windows holds it ~30s), kubelet's TCP readiness probe
every 5s opens+closes its own connection. The notifier MUST NOT
let the probe's OPEN/CLOSE wipe the in-flight activation's state.
Reproduces the production bug seen on 2026-05-10.
"""
interleaved = [
"2026-05-10 13:12:17: IPv4 connection accepted: 192.168.1.230:53140.",
"2026-05-10 13:12:17: <<< Incoming KMS request",
"2026-05-10 13:12:17: Licensing status : 1 (Licensed)",
"2026-05-10 13:12:17: Application ID : 55c92734-d682-4d71-983e-d6ec3f16059f (Windows)",
"2026-05-10 13:12:17: Activation ID (Product) : 2de67392-b7a7-462a-b1ca-108dd189f588 (Windows 10 Professional)",
"2026-05-10 13:12:17: Workstation name : WIN10Pro-DS32.viktorbarzin.lan",
# ── kubelet probe arrives mid-flight, MUST NOT clobber 53140's state ──
"2026-05-10 13:12:19: IPv4 connection accepted: 10.0.20.102:46498.",
"2026-05-10 13:12:19: IPv4 connection closed: 10.0.20.102:46498.",
"2026-05-10 13:12:24: IPv4 connection accepted: 10.0.20.102:54454.",
"2026-05-10 13:12:24: IPv4 connection closed: 10.0.20.102:54454.",
# ── activation closes 31s after open ──
"2026-05-10 13:12:48: IPv4 connection closed: 192.168.1.230:53140.",
]
events, _ = self._drive(interleaved)
kinds = [e.kind for e in events]
self.assertEqual(kinds, ["probe", "probe", "activation"])
activation = events[-1]
self.assertEqual(activation.ip, "192.168.1.230")
self.assertEqual(activation.product, "Windows 10 Professional")
self.assertEqual(activation.host, "WIN10Pro-DS32.viktorbarzin.lan")
self.assertEqual(activation.status, "Licensed")
def test_orphan_close_no_event(self):
"""If the notifier starts mid-conn, the open was missed but the
close still fires. We MUST NOT emit an event for that it would
show up with empty fields and look like a probe."""
orphan = [
"2026-05-10 13:00:00: IPv4 connection closed: 192.168.1.230:55555.",
]
events, _ = self._drive(orphan)
self.assertEqual(events, [])
if __name__ == "__main__":
unittest.main()