diff --git a/scripts/workstation/roster_engine.py b/scripts/workstation/roster_engine.py new file mode 100644 index 00000000..ceaa5388 --- /dev/null +++ b/scripts/workstation/roster_engine.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +"""Pure derivation + offboarding-diff engine for the devvm Workstation roster. + +Functional core (this module, unit-tested) / imperative shell (the bash +provisioner that consumes the JSON this emits and performs the host mutations). +No host I/O lives in the tested functions. See PRD ViktorBarzin/infra#9. + +The roster (`roster.yaml`) is the single source of truth for the workstation +lifecycle. `os_user` is the pinned key; `authentik_user` / `k8s_user` differ +per person and are recorded explicitly (no email->username derivation). +""" + +from __future__ import annotations + +import json +import sys +from dataclasses import dataclass, field +from typing import Iterable + +import yaml + +BASE_PORT = 3773 +VALID_TIERS = ("admin", "power-user", "namespace-owner") +# Tier -> supplementary groups the reconcile ENSURES (additive-only; never stripped). +TIER_GROUPS: dict[str, tuple[str, ...]] = { + "admin": ("code-shared", "docker", "sudo"), + "power-user": (), + "namespace-owner": (), +} +DEFAULT_SHELL = "/bin/zsh" +_REVERSIBLE_OFFBOARD_KINDS = ( + "disable_instance", + "unmap_dispatch", + "remove_from_t3_group", + "lock_login", + "revoke_cluster_rbac", +) + + +class RosterError(ValueError): + """Raised when the roster is structurally invalid.""" + + +@dataclass(frozen=True) +class User: + os_user: str + authentik_user: str + k8s_user: str + tier: str + namespaces: tuple[str, ...] = () + + +@dataclass(frozen=True) +class Roster: + users: dict[str, User] = field(default_factory=dict) + + +@dataclass(frozen=True) +class Account: + os_user: str + tier: str + shell: str + login_locked: bool + groups: tuple[str, ...] + + +@dataclass(frozen=True) +class DesiredState: + accounts: dict[str, Account] + ttyd_user_map: str + dispatch: dict[str, dict] + ports: dict[str, int] + + +@dataclass(frozen=True) +class OffboardAction: + os_user: str + kind: str + reversible: bool + + +# -------------------------------------------------------------------------- +# Parsing + structural validation +# -------------------------------------------------------------------------- + + +def _parse_user(os_user: str, spec: dict) -> User: + for required in ("authentik_user", "k8s_user", "tier"): + if required not in spec: + raise RosterError(f"user {os_user!r}: missing required field {required!r}") + tier = spec["tier"] + if tier not in VALID_TIERS: + raise RosterError( + f"user {os_user!r}: unknown tier {tier!r} (valid: {list(VALID_TIERS)})" + ) + namespaces = tuple(spec.get("namespaces") or ()) + if tier == "namespace-owner" and not namespaces: + raise RosterError(f"user {os_user!r}: namespace-owner requires namespaces") + if tier != "namespace-owner" and namespaces: + raise RosterError(f"user {os_user!r}: only namespace-owner may set namespaces") + return User(os_user, spec["authentik_user"], spec["k8s_user"], tier, namespaces) + + +def load_roster(text: str) -> Roster: + data = yaml.safe_load(text) or {} + users_raw = data.get("users") or {} + return Roster({name: _parse_user(name, spec) for name, spec in users_raw.items()}) + + +def load_roster_file(path: str) -> Roster: + with open(path, encoding="utf-8") as fh: + return load_roster(fh.read()) + + +# -------------------------------------------------------------------------- +# Tier validation against live k8s_users (fail-loud) +# -------------------------------------------------------------------------- + + +def validate_tiers(roster: Roster, k8s_user_tiers: dict[str, str]) -> list[str]: + """Return one error string per roster user whose tier disagrees with the + live `k8s_users` map. Admins are exempt (cluster-admin is granted out of + band). An empty list means the roster is consistent with the cluster.""" + errors = [] + for user in roster.users.values(): + if user.tier == "admin": + continue + actual = k8s_user_tiers.get(user.k8s_user) + if actual is None: + errors.append( + f"{user.os_user}: tier {user.tier} but k8s_user {user.k8s_user!r} " + f"absent from k8s_users (add the entry first)" + ) + elif actual != user.tier: + errors.append( + f"{user.os_user}: roster tier {user.tier} != k8s_users tier " + f"{actual} for {user.k8s_user!r}" + ) + return errors + + +# -------------------------------------------------------------------------- +# Desired-state derivation (sticky ports, ttyd map, dispatch, accounts) +# -------------------------------------------------------------------------- + + +def _allocate_ports(roster: Roster, existing_ports: dict[str, int]) -> dict[str, int]: + ports = {u: existing_ports[u] for u in roster.users if u in existing_ports} + used = set(ports.values()) + for os_user in sorted(roster.users): + if os_user in ports: + continue + candidate = BASE_PORT + while candidate in used: + candidate += 1 + ports[os_user] = candidate + used.add(candidate) + return ports + + +_TTYD_MAP_HEADER = ( + "# Generated from roster.yaml by roster_engine.py — DO NOT EDIT BY HAND.\n" + "# =; consumed by t3-dispatch.\n" +) + + +def derive_desired_state( + roster: Roster, existing_ports: dict[str, int] +) -> DesiredState: + ports = _allocate_ports(roster, existing_ports) + ordered = sorted(roster.users.values(), key=lambda u: ports[u.os_user]) + ttyd_lines = [f"{u.authentik_user}={u.os_user}" for u in ordered] + ttyd_user_map = _TTYD_MAP_HEADER + "\n".join(ttyd_lines) + "\n" + dispatch = { + u.authentik_user: {"os_user": u.os_user, "port": ports[u.os_user]} + for u in ordered + } + accounts = { + u.os_user: Account( + os_user=u.os_user, + tier=u.tier, + shell=DEFAULT_SHELL, + login_locked=True, + groups=TIER_GROUPS[u.tier], + ) + for u in roster.users.values() + } + return DesiredState(accounts, ttyd_user_map, dispatch, ports) + + +def groups_to_add(desired: Iterable[str], current: Iterable[str]) -> list[str]: + """Additive-only: the groups to `gpasswd -a`. Never proposes a removal, so a + routine reconcile can't strip a pre-existing user's legacy groups.""" + return sorted(set(desired) - set(current)) + + +# -------------------------------------------------------------------------- +# Offboarding diff (staged: reversible cut, then gated destructive removal) +# -------------------------------------------------------------------------- + + +def to_deprovision(old: Roster, new: Roster) -> list[str]: + return sorted(set(old.users) - set(new.users)) + + +def offboard_plan( + old: Roster, new: Roster, *, include_destructive: bool +) -> list[OffboardAction]: + """Staged offboarding actions for users dropped from the roster. The + reversible cut (disable instance, unmap, lock, revoke RBAC) is always + returned; the irreversible `userdel_archive` is included ONLY when + explicitly requested, so it can never be auto-applied by a reconcile.""" + plan: list[OffboardAction] = [] + for os_user in to_deprovision(old, new): + plan.extend( + OffboardAction(os_user, kind, True) for kind in _REVERSIBLE_OFFBOARD_KINDS + ) + if include_destructive: + plan.append(OffboardAction(os_user, "userdel_archive", False)) + return plan + + +# -------------------------------------------------------------------------- +# CLI adapter (imperative shell entrypoint — consumed by t3-provision-users.sh) +# -------------------------------------------------------------------------- + + +def _desired_state_to_dict(ds: DesiredState) -> dict: + return { + "accounts": { + name: { + "os_user": a.os_user, + "tier": a.tier, + "shell": a.shell, + "login_locked": a.login_locked, + "groups": list(a.groups), + } + for name, a in ds.accounts.items() + }, + "ttyd_user_map": ds.ttyd_user_map, + "dispatch": ds.dispatch, + "ports": ds.ports, + } + + +def _main(argv: list[str]) -> int: + import argparse + + parser = argparse.ArgumentParser(description="Workstation roster engine") + sub = parser.add_subparsers(dest="cmd", required=True) + pv = sub.add_parser( + "validate", help="exit 1 if roster tiers diverge from k8s_users" + ) + pv.add_argument("--roster", required=True) + pv.add_argument("--k8s-users-json", required=True, help="JSON map {k8s_user: tier}") + pd = sub.add_parser("derive", help="emit desired state as JSON") + pd.add_argument("--roster", required=True) + pd.add_argument("--ports-json", required=True, help="JSON map {os_user: port}") + args = parser.parse_args(argv) + + roster = load_roster_file(args.roster) + if args.cmd == "validate": + with open(args.k8s_users_json, encoding="utf-8") as fh: + errors = validate_tiers(roster, json.load(fh)) + for err in errors: + print(err, file=sys.stderr) + return 1 if errors else 0 + with open(args.ports_json, encoding="utf-8") as fh: + desired = derive_desired_state(roster, json.load(fh)) + json.dump(_desired_state_to_dict(desired), sys.stdout, indent=2, sort_keys=True) + sys.stdout.write("\n") + return 0 + + +if __name__ == "__main__": + raise SystemExit(_main(sys.argv[1:])) diff --git a/scripts/workstation/test_roster_engine.py b/scripts/workstation/test_roster_engine.py new file mode 100644 index 00000000..444371db --- /dev/null +++ b/scripts/workstation/test_roster_engine.py @@ -0,0 +1,262 @@ +"""Unit tests for the pure roster derivation + offboarding-diff engine. + +These exercise external behaviour only (parse -> validate -> derive -> diff); +no host I/O is touched. Mirrors the pure-core pytest style used elsewhere in +the monorepo. See PRD ViktorBarzin/infra#9 (modules #1 roster engine, #5 +offboarding diff). +""" + +import textwrap + +import pytest + +import roster_engine as eng + + +def _roster(yaml_text: str) -> "eng.Roster": + return eng.load_roster(textwrap.dedent(yaml_text)) + + +# -------------------------------------------------------------------------- +# load_roster: parsing + structural validation (module #1) +# -------------------------------------------------------------------------- + + +def test_parses_user_fields_and_tier(): + r = _roster( + """ + users: + emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user} + """ + ) + u = r.users["emo"] + assert u.os_user == "emo" + assert u.authentik_user == "emil.barzin" + assert u.k8s_user == "emo" + assert u.tier == "power-user" + assert u.namespaces == () + + +def test_namespace_owner_carries_namespaces(): + r = _roster( + """ + users: + ancamilea: {authentik_user: ancaelena98, k8s_user: anca, + tier: namespace-owner, namespaces: [plotting-book]} + """ + ) + assert r.users["ancamilea"].namespaces == ("plotting-book",) + + +def test_admin_tier_is_accepted(): + r = _roster( + "users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}" + ) + assert r.users["wizard"].tier == "admin" + + +def test_rejects_unknown_tier(): + with pytest.raises(eng.RosterError, match="tier"): + _roster("users: {bob: {authentik_user: b, k8s_user: b, tier: wizard-king}}") + + +def test_rejects_missing_required_field(): + with pytest.raises(eng.RosterError, match="authentik_user"): + _roster("users: {bob: {k8s_user: b, tier: power-user}}") + + +def test_namespace_owner_requires_namespaces(): + with pytest.raises(eng.RosterError, match="namespace"): + _roster("users: {bob: {authentik_user: b, k8s_user: b, tier: namespace-owner}}") + + +def test_non_namespace_owner_must_not_set_namespaces(): + with pytest.raises(eng.RosterError, match="namespace"): + _roster( + "users: {bob: {authentik_user: b, k8s_user: b, tier: power-user, " + "namespaces: [x]}}" + ) + + +def test_empty_roster_is_valid(): + assert _roster("users: {}").users == {} + + +def test_missing_users_key_is_valid_empty(): + assert _roster("{}").users == {} + + +# -------------------------------------------------------------------------- +# validate_tiers: roster tier vs live k8s_users (fail-loud, module #1) +# -------------------------------------------------------------------------- + + +def test_validate_ok_when_tiers_match(): + r = _roster( + "users: {ancamilea: {authentik_user: a, k8s_user: anca, " + "tier: namespace-owner, namespaces: [plotting-book]}}" + ) + assert eng.validate_tiers(r, {"anca": "namespace-owner"}) == [] + + +def test_validate_flags_tier_mismatch(): + r = _roster( + "users: {ancamilea: {authentik_user: a, k8s_user: anca, tier: power-user}}" + ) + errs = eng.validate_tiers(r, {"anca": "namespace-owner"}) + assert len(errs) == 1 + assert ( + "anca" in errs[0] and "power-user" in errs[0] and "namespace-owner" in errs[0] + ) + + +def test_validate_flags_netnew_user_absent_from_k8s_users(): + # emo is power-user in the roster but has no k8s_users entry yet -> the OIDC + # RBAC binding can't exist, so this must fail loud (add the entry first). + r = _roster("users: {emo: {authentik_user: e, k8s_user: emo, tier: power-user}}") + errs = eng.validate_tiers(r, {}) + assert len(errs) == 1 + assert "emo" in errs[0] and "k8s_users" in errs[0] + + +def test_validate_skips_admin_tier(): + # wizard (admin) is cluster-admin via a separate mechanism, not k8s_users. + r = _roster( + "users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}" + ) + assert eng.validate_tiers(r, {}) == [] + + +# -------------------------------------------------------------------------- +# derive_desired_state: accounts, sticky ports, ttyd map, dispatch (module #1) +# -------------------------------------------------------------------------- + +THREE = """ + users: + wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin} + emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user} + ancamilea: {authentik_user: ancaelena98, k8s_user: anca, tier: namespace-owner, namespaces: [plotting-book]} +""" + +LIVE_PORTS = {"wizard": 3773, "emo": 3774, "ancamilea": 3775} + + +def test_derive_preserves_existing_sticky_ports(): + ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS) + assert ds.ports == {"wizard": 3773, "emo": 3774, "ancamilea": 3775} + + +def test_derive_allocates_next_free_port_for_new_user(): + ds = eng.derive_desired_state(_roster(THREE), {"wizard": 3773}) + # emo + ancamilea are new -> next free from 3773 skipping the used 3773 + assert ds.ports["wizard"] == 3773 + assert sorted([ds.ports["emo"], ds.ports["ancamilea"]]) == [3774, 3775] + + +def test_derive_dispatch_keyed_by_authentik_user(): + ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS) + assert ds.dispatch == { + "vbarzin": {"os_user": "wizard", "port": 3773}, + "emil.barzin": {"os_user": "emo", "port": 3774}, + "ancaelena98": {"os_user": "ancamilea", "port": 3775}, + } + + +def test_derive_ttyd_map_has_one_mapping_per_user(): + ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS) + body = [ + line + for line in ds.ttyd_user_map.splitlines() + if line.strip() and not line.lstrip().startswith("#") + ] + assert set(body) == {"vbarzin=wizard", "emil.barzin=emo", "ancaelena98=ancamilea"} + + +def test_derive_accounts_assign_tier_groups_and_shell(): + ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS) + assert ds.accounts["wizard"].groups == ("code-shared", "docker", "sudo") + assert ds.accounts["emo"].groups == () + assert ds.accounts["ancamilea"].groups == () + assert ds.accounts["emo"].shell == "/bin/zsh" + + +def test_derive_is_deterministic(): + r = _roster(THREE) + assert eng.derive_desired_state(r, LIVE_PORTS) == eng.derive_desired_state( + r, LIVE_PORTS + ) + + +# -------------------------------------------------------------------------- +# groups_to_add: the additive-only invariant (module #1) +# -------------------------------------------------------------------------- + + +def test_groups_to_add_returns_only_missing(): + assert eng.groups_to_add(("sudo", "docker", "code-shared"), ("docker",)) == [ + "code-shared", + "sudo", + ] + + +def test_groups_to_add_never_proposes_removal_of_extra_groups(): + # emo currently has code-shared+docker (legacy). A power-user reconcile wants + # no groups -> must NOT strip anything (additive-only invariant). + assert eng.groups_to_add((), ("code-shared", "docker")) == [] + + +def test_groups_to_add_idempotent_when_all_present(): + assert eng.groups_to_add(("sudo",), ("sudo", "docker")) == [] + + +# -------------------------------------------------------------------------- +# offboarding diff: staged plan, destructive never auto (module #5) +# -------------------------------------------------------------------------- + + +def test_to_deprovision_is_old_minus_new(): + old = _roster(THREE) + new = _roster( + """ + users: + wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin} + emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user} + """ + ) + assert eng.to_deprovision(old, new) == ["ancamilea"] + + +def test_to_deprovision_empty_when_nothing_removed(): + r = _roster(THREE) + assert eng.to_deprovision(r, r) == [] + + +def test_offboard_plan_reversible_cut_targets_exactly_the_removed_user(): + old = _roster(THREE) + new = _roster( + "users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}" + ) + plan = eng.offboard_plan(old, new, include_destructive=False) + cut_users = {a.os_user for a in plan} + assert cut_users == {"emo", "ancamilea"} + assert all(a.reversible for a in plan) + + +def test_offboard_plan_excludes_destructive_by_default(): + old = _roster(THREE) + new = _roster( + "users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}" + ) + auto = eng.offboard_plan(old, new, include_destructive=False) + assert all(a.kind != "userdel_archive" for a in auto) + + +def test_offboard_plan_includes_destructive_only_when_explicitly_requested(): + old = _roster(THREE) + new = _roster( + "users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}" + ) + full = eng.offboard_plan(old, new, include_destructive=True) + destructive = [a for a in full if a.kind == "userdel_archive"] + assert {a.os_user for a in destructive} == {"emo", "ancamilea"} + assert all(not a.reversible for a in destructive)