t3-provision-users.sh now consumes roster_engine.py: derives accounts + per-tier groups + sticky ports + /etc/ttyd-user-map + dispatch.json from roster.yaml and applies them. ADDITIVE-ONLY for existing users (never strips a group, replaces a home, or re-locks an account) so the hourly timer is always safe. Best-effort tier validation vs live k8s_users: warns on a net-new absent user (emo), aborts only on a real tier conflict, skips when root has no Vault token. DRY_RUN mode for safe testing. Verified on the live host: reproduces dispatch.json content exactly, emo/anca groups + all t3-serve instances unchanged, idempotent, shellcheck-clean; deployed to /usr/local/bin (hourly timer target). Engine: validate_tiers now returns ValidationIssue(severity) — error=conflict (abort) vs warn=absent (grant pending) — + has_blocking_errors(); 28 pytest cases. setup-devvm.sh redeploys the provisioner for reproducibility. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
280 lines
9.6 KiB
Python
280 lines
9.6 KiB
Python
"""Unit tests for the pure roster derivation + offboarding-diff engine.
|
|
|
|
These exercise external behaviour only (parse -> validate -> derive -> diff);
|
|
no host I/O is touched. Mirrors the pure-core pytest style used elsewhere in
|
|
the monorepo. See PRD ViktorBarzin/infra#9 (modules #1 roster engine, #5
|
|
offboarding diff).
|
|
"""
|
|
|
|
import textwrap
|
|
|
|
import pytest
|
|
|
|
import roster_engine as eng
|
|
|
|
|
|
def _roster(yaml_text: str) -> "eng.Roster":
|
|
return eng.load_roster(textwrap.dedent(yaml_text))
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# load_roster: parsing + structural validation (module #1)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def test_parses_user_fields_and_tier():
|
|
r = _roster(
|
|
"""
|
|
users:
|
|
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
|
"""
|
|
)
|
|
u = r.users["emo"]
|
|
assert u.os_user == "emo"
|
|
assert u.authentik_user == "emil.barzin"
|
|
assert u.k8s_user == "emo"
|
|
assert u.tier == "power-user"
|
|
assert u.namespaces == ()
|
|
|
|
|
|
def test_namespace_owner_carries_namespaces():
|
|
r = _roster(
|
|
"""
|
|
users:
|
|
ancamilea: {authentik_user: ancaelena98, k8s_user: anca,
|
|
tier: namespace-owner, namespaces: [plotting-book]}
|
|
"""
|
|
)
|
|
assert r.users["ancamilea"].namespaces == ("plotting-book",)
|
|
|
|
|
|
def test_admin_tier_is_accepted():
|
|
r = _roster(
|
|
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
|
)
|
|
assert r.users["wizard"].tier == "admin"
|
|
|
|
|
|
def test_rejects_unknown_tier():
|
|
with pytest.raises(eng.RosterError, match="tier"):
|
|
_roster("users: {bob: {authentik_user: b, k8s_user: b, tier: wizard-king}}")
|
|
|
|
|
|
def test_rejects_missing_required_field():
|
|
with pytest.raises(eng.RosterError, match="authentik_user"):
|
|
_roster("users: {bob: {k8s_user: b, tier: power-user}}")
|
|
|
|
|
|
def test_namespace_owner_requires_namespaces():
|
|
with pytest.raises(eng.RosterError, match="namespace"):
|
|
_roster("users: {bob: {authentik_user: b, k8s_user: b, tier: namespace-owner}}")
|
|
|
|
|
|
def test_non_namespace_owner_must_not_set_namespaces():
|
|
with pytest.raises(eng.RosterError, match="namespace"):
|
|
_roster(
|
|
"users: {bob: {authentik_user: b, k8s_user: b, tier: power-user, "
|
|
"namespaces: [x]}}"
|
|
)
|
|
|
|
|
|
def test_empty_roster_is_valid():
|
|
assert _roster("users: {}").users == {}
|
|
|
|
|
|
def test_missing_users_key_is_valid_empty():
|
|
assert _roster("{}").users == {}
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# validate_tiers: roster tier vs live k8s_users (fail-loud, module #1)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def test_validate_ok_when_tiers_match():
|
|
r = _roster(
|
|
"users: {ancamilea: {authentik_user: a, k8s_user: anca, "
|
|
"tier: namespace-owner, namespaces: [plotting-book]}}"
|
|
)
|
|
assert eng.validate_tiers(r, {"anca": "namespace-owner"}) == []
|
|
|
|
|
|
def test_validate_flags_tier_mismatch_as_error():
|
|
# roster says power-user, cluster says namespace-owner -> a real conflict -> ERROR (abort).
|
|
r = _roster(
|
|
"users: {ancamilea: {authentik_user: a, k8s_user: anca, tier: power-user}}"
|
|
)
|
|
issues = eng.validate_tiers(r, {"anca": "namespace-owner"})
|
|
assert len(issues) == 1
|
|
assert issues[0].severity == "error"
|
|
assert issues[0].os_user == "ancamilea"
|
|
assert "power-user" in issues[0].message and "namespace-owner" in issues[0].message
|
|
|
|
|
|
def test_validate_flags_netnew_absent_as_warn():
|
|
# emo is power-user in the roster but has no k8s_users entry yet. Onboarding the
|
|
# workstation should still proceed; the kubectl grant is pending -> WARN, not error.
|
|
r = _roster("users: {emo: {authentik_user: e, k8s_user: emo, tier: power-user}}")
|
|
issues = eng.validate_tiers(r, {})
|
|
assert len(issues) == 1
|
|
assert issues[0].severity == "warn"
|
|
assert "emo" in issues[0].message and "k8s_users" in issues[0].message
|
|
|
|
|
|
def test_validate_skips_admin_tier():
|
|
# wizard (admin) is cluster-admin via a separate mechanism, not k8s_users.
|
|
r = _roster(
|
|
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
|
)
|
|
assert eng.validate_tiers(r, {}) == []
|
|
|
|
|
|
def test_has_blocking_errors_distinguishes_mismatch_from_absent():
|
|
mismatch = _roster(
|
|
"users: {ancamilea: {authentik_user: a, k8s_user: anca, tier: power-user}}"
|
|
)
|
|
absent = _roster(
|
|
"users: {emo: {authentik_user: e, k8s_user: emo, tier: power-user}}"
|
|
)
|
|
assert (
|
|
eng.has_blocking_errors(
|
|
eng.validate_tiers(mismatch, {"anca": "namespace-owner"})
|
|
)
|
|
is True
|
|
)
|
|
assert eng.has_blocking_errors(eng.validate_tiers(absent, {})) is False
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# derive_desired_state: accounts, sticky ports, ttyd map, dispatch (module #1)
|
|
# --------------------------------------------------------------------------
|
|
|
|
THREE = """
|
|
users:
|
|
wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}
|
|
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
|
ancamilea: {authentik_user: ancaelena98, k8s_user: anca, tier: namespace-owner, namespaces: [plotting-book]}
|
|
"""
|
|
|
|
LIVE_PORTS = {"wizard": 3773, "emo": 3774, "ancamilea": 3775}
|
|
|
|
|
|
def test_derive_preserves_existing_sticky_ports():
|
|
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
|
assert ds.ports == {"wizard": 3773, "emo": 3774, "ancamilea": 3775}
|
|
|
|
|
|
def test_derive_allocates_next_free_port_for_new_user():
|
|
ds = eng.derive_desired_state(_roster(THREE), {"wizard": 3773})
|
|
# emo + ancamilea are new -> next free from 3773 skipping the used 3773
|
|
assert ds.ports["wizard"] == 3773
|
|
assert sorted([ds.ports["emo"], ds.ports["ancamilea"]]) == [3774, 3775]
|
|
|
|
|
|
def test_derive_dispatch_keyed_by_authentik_user():
|
|
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
|
assert ds.dispatch == {
|
|
"vbarzin": {"os_user": "wizard", "port": 3773},
|
|
"emil.barzin": {"os_user": "emo", "port": 3774},
|
|
"ancaelena98": {"os_user": "ancamilea", "port": 3775},
|
|
}
|
|
|
|
|
|
def test_derive_ttyd_map_has_one_mapping_per_user():
|
|
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
|
body = [
|
|
line
|
|
for line in ds.ttyd_user_map.splitlines()
|
|
if line.strip() and not line.lstrip().startswith("#")
|
|
]
|
|
assert set(body) == {"vbarzin=wizard", "emil.barzin=emo", "ancaelena98=ancamilea"}
|
|
|
|
|
|
def test_derive_accounts_assign_tier_groups_and_shell():
|
|
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
|
assert ds.accounts["wizard"].groups == ("code-shared", "docker", "sudo")
|
|
assert ds.accounts["emo"].groups == ()
|
|
assert ds.accounts["ancamilea"].groups == ()
|
|
assert ds.accounts["emo"].shell == "/bin/zsh"
|
|
|
|
|
|
def test_derive_is_deterministic():
|
|
r = _roster(THREE)
|
|
assert eng.derive_desired_state(r, LIVE_PORTS) == eng.derive_desired_state(
|
|
r, LIVE_PORTS
|
|
)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# groups_to_add: the additive-only invariant (module #1)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def test_groups_to_add_returns_only_missing():
|
|
assert eng.groups_to_add(("sudo", "docker", "code-shared"), ("docker",)) == [
|
|
"code-shared",
|
|
"sudo",
|
|
]
|
|
|
|
|
|
def test_groups_to_add_never_proposes_removal_of_extra_groups():
|
|
# emo currently has code-shared+docker (legacy). A power-user reconcile wants
|
|
# no groups -> must NOT strip anything (additive-only invariant).
|
|
assert eng.groups_to_add((), ("code-shared", "docker")) == []
|
|
|
|
|
|
def test_groups_to_add_idempotent_when_all_present():
|
|
assert eng.groups_to_add(("sudo",), ("sudo", "docker")) == []
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# offboarding diff: staged plan, destructive never auto (module #5)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def test_to_deprovision_is_old_minus_new():
|
|
old = _roster(THREE)
|
|
new = _roster(
|
|
"""
|
|
users:
|
|
wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}
|
|
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
|
"""
|
|
)
|
|
assert eng.to_deprovision(old, new) == ["ancamilea"]
|
|
|
|
|
|
def test_to_deprovision_empty_when_nothing_removed():
|
|
r = _roster(THREE)
|
|
assert eng.to_deprovision(r, r) == []
|
|
|
|
|
|
def test_offboard_plan_reversible_cut_targets_exactly_the_removed_user():
|
|
old = _roster(THREE)
|
|
new = _roster(
|
|
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
|
)
|
|
plan = eng.offboard_plan(old, new, include_destructive=False)
|
|
cut_users = {a.os_user for a in plan}
|
|
assert cut_users == {"emo", "ancamilea"}
|
|
assert all(a.reversible for a in plan)
|
|
|
|
|
|
def test_offboard_plan_excludes_destructive_by_default():
|
|
old = _roster(THREE)
|
|
new = _roster(
|
|
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
|
)
|
|
auto = eng.offboard_plan(old, new, include_destructive=False)
|
|
assert all(a.kind != "userdel_archive" for a in auto)
|
|
|
|
|
|
def test_offboard_plan_includes_destructive_only_when_explicitly_requested():
|
|
old = _roster(THREE)
|
|
new = _roster(
|
|
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
|
)
|
|
full = eng.offboard_plan(old, new, include_destructive=True)
|
|
destructive = [a for a in full if a.kind == "userdel_archive"]
|
|
assert {a.os_user for a in destructive} == {"emo", "ancamilea"}
|
|
assert all(not a.reversible for a in destructive)
|