workstation: tested roster derivation + offboarding-diff engine [ci skip]
Pure functional core (PRD ViktorBarzin/infra#9 modules #1 roster engine + #5 offboarding diff) that the bash provisioner will consume as JSON: roster parse/validate, fail-loud tier-vs-k8s_users check, sticky-port + ttyd-map + dispatch derivation, additive-only group reconcile, and the staged offboarding diff (reversible cut vs gated userdel, never auto). 27 pytest cases, ruff-clean; no host I/O in the tested path. Verified to reproduce the live dispatch.json byte-for-byte from the real roster. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
6504911a77
commit
7ab4c1e1e2
2 changed files with 538 additions and 0 deletions
276
scripts/workstation/roster_engine.py
Normal file
276
scripts/workstation/roster_engine.py
Normal file
|
|
@ -0,0 +1,276 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Pure derivation + offboarding-diff engine for the devvm Workstation roster.
|
||||
|
||||
Functional core (this module, unit-tested) / imperative shell (the bash
|
||||
provisioner that consumes the JSON this emits and performs the host mutations).
|
||||
No host I/O lives in the tested functions. See PRD ViktorBarzin/infra#9.
|
||||
|
||||
The roster (`roster.yaml`) is the single source of truth for the workstation
|
||||
lifecycle. `os_user` is the pinned key; `authentik_user` / `k8s_user` differ
|
||||
per person and are recorded explicitly (no email->username derivation).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterable
|
||||
|
||||
import yaml
|
||||
|
||||
BASE_PORT = 3773
|
||||
VALID_TIERS = ("admin", "power-user", "namespace-owner")
|
||||
# Tier -> supplementary groups the reconcile ENSURES (additive-only; never stripped).
|
||||
TIER_GROUPS: dict[str, tuple[str, ...]] = {
|
||||
"admin": ("code-shared", "docker", "sudo"),
|
||||
"power-user": (),
|
||||
"namespace-owner": (),
|
||||
}
|
||||
DEFAULT_SHELL = "/bin/zsh"
|
||||
_REVERSIBLE_OFFBOARD_KINDS = (
|
||||
"disable_instance",
|
||||
"unmap_dispatch",
|
||||
"remove_from_t3_group",
|
||||
"lock_login",
|
||||
"revoke_cluster_rbac",
|
||||
)
|
||||
|
||||
|
||||
class RosterError(ValueError):
|
||||
"""Raised when the roster is structurally invalid."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class User:
|
||||
os_user: str
|
||||
authentik_user: str
|
||||
k8s_user: str
|
||||
tier: str
|
||||
namespaces: tuple[str, ...] = ()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Roster:
|
||||
users: dict[str, User] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Account:
|
||||
os_user: str
|
||||
tier: str
|
||||
shell: str
|
||||
login_locked: bool
|
||||
groups: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DesiredState:
|
||||
accounts: dict[str, Account]
|
||||
ttyd_user_map: str
|
||||
dispatch: dict[str, dict]
|
||||
ports: dict[str, int]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OffboardAction:
|
||||
os_user: str
|
||||
kind: str
|
||||
reversible: bool
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Parsing + structural validation
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_user(os_user: str, spec: dict) -> User:
|
||||
for required in ("authentik_user", "k8s_user", "tier"):
|
||||
if required not in spec:
|
||||
raise RosterError(f"user {os_user!r}: missing required field {required!r}")
|
||||
tier = spec["tier"]
|
||||
if tier not in VALID_TIERS:
|
||||
raise RosterError(
|
||||
f"user {os_user!r}: unknown tier {tier!r} (valid: {list(VALID_TIERS)})"
|
||||
)
|
||||
namespaces = tuple(spec.get("namespaces") or ())
|
||||
if tier == "namespace-owner" and not namespaces:
|
||||
raise RosterError(f"user {os_user!r}: namespace-owner requires namespaces")
|
||||
if tier != "namespace-owner" and namespaces:
|
||||
raise RosterError(f"user {os_user!r}: only namespace-owner may set namespaces")
|
||||
return User(os_user, spec["authentik_user"], spec["k8s_user"], tier, namespaces)
|
||||
|
||||
|
||||
def load_roster(text: str) -> Roster:
|
||||
data = yaml.safe_load(text) or {}
|
||||
users_raw = data.get("users") or {}
|
||||
return Roster({name: _parse_user(name, spec) for name, spec in users_raw.items()})
|
||||
|
||||
|
||||
def load_roster_file(path: str) -> Roster:
|
||||
with open(path, encoding="utf-8") as fh:
|
||||
return load_roster(fh.read())
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Tier validation against live k8s_users (fail-loud)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def validate_tiers(roster: Roster, k8s_user_tiers: dict[str, str]) -> list[str]:
|
||||
"""Return one error string per roster user whose tier disagrees with the
|
||||
live `k8s_users` map. Admins are exempt (cluster-admin is granted out of
|
||||
band). An empty list means the roster is consistent with the cluster."""
|
||||
errors = []
|
||||
for user in roster.users.values():
|
||||
if user.tier == "admin":
|
||||
continue
|
||||
actual = k8s_user_tiers.get(user.k8s_user)
|
||||
if actual is None:
|
||||
errors.append(
|
||||
f"{user.os_user}: tier {user.tier} but k8s_user {user.k8s_user!r} "
|
||||
f"absent from k8s_users (add the entry first)"
|
||||
)
|
||||
elif actual != user.tier:
|
||||
errors.append(
|
||||
f"{user.os_user}: roster tier {user.tier} != k8s_users tier "
|
||||
f"{actual} for {user.k8s_user!r}"
|
||||
)
|
||||
return errors
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Desired-state derivation (sticky ports, ttyd map, dispatch, accounts)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _allocate_ports(roster: Roster, existing_ports: dict[str, int]) -> dict[str, int]:
|
||||
ports = {u: existing_ports[u] for u in roster.users if u in existing_ports}
|
||||
used = set(ports.values())
|
||||
for os_user in sorted(roster.users):
|
||||
if os_user in ports:
|
||||
continue
|
||||
candidate = BASE_PORT
|
||||
while candidate in used:
|
||||
candidate += 1
|
||||
ports[os_user] = candidate
|
||||
used.add(candidate)
|
||||
return ports
|
||||
|
||||
|
||||
_TTYD_MAP_HEADER = (
|
||||
"# Generated from roster.yaml by roster_engine.py — DO NOT EDIT BY HAND.\n"
|
||||
"# <authentik_user>=<os_user>; consumed by t3-dispatch.\n"
|
||||
)
|
||||
|
||||
|
||||
def derive_desired_state(
|
||||
roster: Roster, existing_ports: dict[str, int]
|
||||
) -> DesiredState:
|
||||
ports = _allocate_ports(roster, existing_ports)
|
||||
ordered = sorted(roster.users.values(), key=lambda u: ports[u.os_user])
|
||||
ttyd_lines = [f"{u.authentik_user}={u.os_user}" for u in ordered]
|
||||
ttyd_user_map = _TTYD_MAP_HEADER + "\n".join(ttyd_lines) + "\n"
|
||||
dispatch = {
|
||||
u.authentik_user: {"os_user": u.os_user, "port": ports[u.os_user]}
|
||||
for u in ordered
|
||||
}
|
||||
accounts = {
|
||||
u.os_user: Account(
|
||||
os_user=u.os_user,
|
||||
tier=u.tier,
|
||||
shell=DEFAULT_SHELL,
|
||||
login_locked=True,
|
||||
groups=TIER_GROUPS[u.tier],
|
||||
)
|
||||
for u in roster.users.values()
|
||||
}
|
||||
return DesiredState(accounts, ttyd_user_map, dispatch, ports)
|
||||
|
||||
|
||||
def groups_to_add(desired: Iterable[str], current: Iterable[str]) -> list[str]:
|
||||
"""Additive-only: the groups to `gpasswd -a`. Never proposes a removal, so a
|
||||
routine reconcile can't strip a pre-existing user's legacy groups."""
|
||||
return sorted(set(desired) - set(current))
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Offboarding diff (staged: reversible cut, then gated destructive removal)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def to_deprovision(old: Roster, new: Roster) -> list[str]:
|
||||
return sorted(set(old.users) - set(new.users))
|
||||
|
||||
|
||||
def offboard_plan(
|
||||
old: Roster, new: Roster, *, include_destructive: bool
|
||||
) -> list[OffboardAction]:
|
||||
"""Staged offboarding actions for users dropped from the roster. The
|
||||
reversible cut (disable instance, unmap, lock, revoke RBAC) is always
|
||||
returned; the irreversible `userdel_archive` is included ONLY when
|
||||
explicitly requested, so it can never be auto-applied by a reconcile."""
|
||||
plan: list[OffboardAction] = []
|
||||
for os_user in to_deprovision(old, new):
|
||||
plan.extend(
|
||||
OffboardAction(os_user, kind, True) for kind in _REVERSIBLE_OFFBOARD_KINDS
|
||||
)
|
||||
if include_destructive:
|
||||
plan.append(OffboardAction(os_user, "userdel_archive", False))
|
||||
return plan
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# CLI adapter (imperative shell entrypoint — consumed by t3-provision-users.sh)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _desired_state_to_dict(ds: DesiredState) -> dict:
|
||||
return {
|
||||
"accounts": {
|
||||
name: {
|
||||
"os_user": a.os_user,
|
||||
"tier": a.tier,
|
||||
"shell": a.shell,
|
||||
"login_locked": a.login_locked,
|
||||
"groups": list(a.groups),
|
||||
}
|
||||
for name, a in ds.accounts.items()
|
||||
},
|
||||
"ttyd_user_map": ds.ttyd_user_map,
|
||||
"dispatch": ds.dispatch,
|
||||
"ports": ds.ports,
|
||||
}
|
||||
|
||||
|
||||
def _main(argv: list[str]) -> int:
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Workstation roster engine")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
pv = sub.add_parser(
|
||||
"validate", help="exit 1 if roster tiers diverge from k8s_users"
|
||||
)
|
||||
pv.add_argument("--roster", required=True)
|
||||
pv.add_argument("--k8s-users-json", required=True, help="JSON map {k8s_user: tier}")
|
||||
pd = sub.add_parser("derive", help="emit desired state as JSON")
|
||||
pd.add_argument("--roster", required=True)
|
||||
pd.add_argument("--ports-json", required=True, help="JSON map {os_user: port}")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
roster = load_roster_file(args.roster)
|
||||
if args.cmd == "validate":
|
||||
with open(args.k8s_users_json, encoding="utf-8") as fh:
|
||||
errors = validate_tiers(roster, json.load(fh))
|
||||
for err in errors:
|
||||
print(err, file=sys.stderr)
|
||||
return 1 if errors else 0
|
||||
with open(args.ports_json, encoding="utf-8") as fh:
|
||||
desired = derive_desired_state(roster, json.load(fh))
|
||||
json.dump(_desired_state_to_dict(desired), sys.stdout, indent=2, sort_keys=True)
|
||||
sys.stdout.write("\n")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(_main(sys.argv[1:]))
|
||||
262
scripts/workstation/test_roster_engine.py
Normal file
262
scripts/workstation/test_roster_engine.py
Normal file
|
|
@ -0,0 +1,262 @@
|
|||
"""Unit tests for the pure roster derivation + offboarding-diff engine.
|
||||
|
||||
These exercise external behaviour only (parse -> validate -> derive -> diff);
|
||||
no host I/O is touched. Mirrors the pure-core pytest style used elsewhere in
|
||||
the monorepo. See PRD ViktorBarzin/infra#9 (modules #1 roster engine, #5
|
||||
offboarding diff).
|
||||
"""
|
||||
|
||||
import textwrap
|
||||
|
||||
import pytest
|
||||
|
||||
import roster_engine as eng
|
||||
|
||||
|
||||
def _roster(yaml_text: str) -> "eng.Roster":
|
||||
return eng.load_roster(textwrap.dedent(yaml_text))
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# load_roster: parsing + structural validation (module #1)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_parses_user_fields_and_tier():
|
||||
r = _roster(
|
||||
"""
|
||||
users:
|
||||
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
||||
"""
|
||||
)
|
||||
u = r.users["emo"]
|
||||
assert u.os_user == "emo"
|
||||
assert u.authentik_user == "emil.barzin"
|
||||
assert u.k8s_user == "emo"
|
||||
assert u.tier == "power-user"
|
||||
assert u.namespaces == ()
|
||||
|
||||
|
||||
def test_namespace_owner_carries_namespaces():
|
||||
r = _roster(
|
||||
"""
|
||||
users:
|
||||
ancamilea: {authentik_user: ancaelena98, k8s_user: anca,
|
||||
tier: namespace-owner, namespaces: [plotting-book]}
|
||||
"""
|
||||
)
|
||||
assert r.users["ancamilea"].namespaces == ("plotting-book",)
|
||||
|
||||
|
||||
def test_admin_tier_is_accepted():
|
||||
r = _roster(
|
||||
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
||||
)
|
||||
assert r.users["wizard"].tier == "admin"
|
||||
|
||||
|
||||
def test_rejects_unknown_tier():
|
||||
with pytest.raises(eng.RosterError, match="tier"):
|
||||
_roster("users: {bob: {authentik_user: b, k8s_user: b, tier: wizard-king}}")
|
||||
|
||||
|
||||
def test_rejects_missing_required_field():
|
||||
with pytest.raises(eng.RosterError, match="authentik_user"):
|
||||
_roster("users: {bob: {k8s_user: b, tier: power-user}}")
|
||||
|
||||
|
||||
def test_namespace_owner_requires_namespaces():
|
||||
with pytest.raises(eng.RosterError, match="namespace"):
|
||||
_roster("users: {bob: {authentik_user: b, k8s_user: b, tier: namespace-owner}}")
|
||||
|
||||
|
||||
def test_non_namespace_owner_must_not_set_namespaces():
|
||||
with pytest.raises(eng.RosterError, match="namespace"):
|
||||
_roster(
|
||||
"users: {bob: {authentik_user: b, k8s_user: b, tier: power-user, "
|
||||
"namespaces: [x]}}"
|
||||
)
|
||||
|
||||
|
||||
def test_empty_roster_is_valid():
|
||||
assert _roster("users: {}").users == {}
|
||||
|
||||
|
||||
def test_missing_users_key_is_valid_empty():
|
||||
assert _roster("{}").users == {}
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# validate_tiers: roster tier vs live k8s_users (fail-loud, module #1)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_validate_ok_when_tiers_match():
|
||||
r = _roster(
|
||||
"users: {ancamilea: {authentik_user: a, k8s_user: anca, "
|
||||
"tier: namespace-owner, namespaces: [plotting-book]}}"
|
||||
)
|
||||
assert eng.validate_tiers(r, {"anca": "namespace-owner"}) == []
|
||||
|
||||
|
||||
def test_validate_flags_tier_mismatch():
|
||||
r = _roster(
|
||||
"users: {ancamilea: {authentik_user: a, k8s_user: anca, tier: power-user}}"
|
||||
)
|
||||
errs = eng.validate_tiers(r, {"anca": "namespace-owner"})
|
||||
assert len(errs) == 1
|
||||
assert (
|
||||
"anca" in errs[0] and "power-user" in errs[0] and "namespace-owner" in errs[0]
|
||||
)
|
||||
|
||||
|
||||
def test_validate_flags_netnew_user_absent_from_k8s_users():
|
||||
# emo is power-user in the roster but has no k8s_users entry yet -> the OIDC
|
||||
# RBAC binding can't exist, so this must fail loud (add the entry first).
|
||||
r = _roster("users: {emo: {authentik_user: e, k8s_user: emo, tier: power-user}}")
|
||||
errs = eng.validate_tiers(r, {})
|
||||
assert len(errs) == 1
|
||||
assert "emo" in errs[0] and "k8s_users" in errs[0]
|
||||
|
||||
|
||||
def test_validate_skips_admin_tier():
|
||||
# wizard (admin) is cluster-admin via a separate mechanism, not k8s_users.
|
||||
r = _roster(
|
||||
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
||||
)
|
||||
assert eng.validate_tiers(r, {}) == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# derive_desired_state: accounts, sticky ports, ttyd map, dispatch (module #1)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
THREE = """
|
||||
users:
|
||||
wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}
|
||||
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
||||
ancamilea: {authentik_user: ancaelena98, k8s_user: anca, tier: namespace-owner, namespaces: [plotting-book]}
|
||||
"""
|
||||
|
||||
LIVE_PORTS = {"wizard": 3773, "emo": 3774, "ancamilea": 3775}
|
||||
|
||||
|
||||
def test_derive_preserves_existing_sticky_ports():
|
||||
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
||||
assert ds.ports == {"wizard": 3773, "emo": 3774, "ancamilea": 3775}
|
||||
|
||||
|
||||
def test_derive_allocates_next_free_port_for_new_user():
|
||||
ds = eng.derive_desired_state(_roster(THREE), {"wizard": 3773})
|
||||
# emo + ancamilea are new -> next free from 3773 skipping the used 3773
|
||||
assert ds.ports["wizard"] == 3773
|
||||
assert sorted([ds.ports["emo"], ds.ports["ancamilea"]]) == [3774, 3775]
|
||||
|
||||
|
||||
def test_derive_dispatch_keyed_by_authentik_user():
|
||||
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
||||
assert ds.dispatch == {
|
||||
"vbarzin": {"os_user": "wizard", "port": 3773},
|
||||
"emil.barzin": {"os_user": "emo", "port": 3774},
|
||||
"ancaelena98": {"os_user": "ancamilea", "port": 3775},
|
||||
}
|
||||
|
||||
|
||||
def test_derive_ttyd_map_has_one_mapping_per_user():
|
||||
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
||||
body = [
|
||||
line
|
||||
for line in ds.ttyd_user_map.splitlines()
|
||||
if line.strip() and not line.lstrip().startswith("#")
|
||||
]
|
||||
assert set(body) == {"vbarzin=wizard", "emil.barzin=emo", "ancaelena98=ancamilea"}
|
||||
|
||||
|
||||
def test_derive_accounts_assign_tier_groups_and_shell():
|
||||
ds = eng.derive_desired_state(_roster(THREE), LIVE_PORTS)
|
||||
assert ds.accounts["wizard"].groups == ("code-shared", "docker", "sudo")
|
||||
assert ds.accounts["emo"].groups == ()
|
||||
assert ds.accounts["ancamilea"].groups == ()
|
||||
assert ds.accounts["emo"].shell == "/bin/zsh"
|
||||
|
||||
|
||||
def test_derive_is_deterministic():
|
||||
r = _roster(THREE)
|
||||
assert eng.derive_desired_state(r, LIVE_PORTS) == eng.derive_desired_state(
|
||||
r, LIVE_PORTS
|
||||
)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# groups_to_add: the additive-only invariant (module #1)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_groups_to_add_returns_only_missing():
|
||||
assert eng.groups_to_add(("sudo", "docker", "code-shared"), ("docker",)) == [
|
||||
"code-shared",
|
||||
"sudo",
|
||||
]
|
||||
|
||||
|
||||
def test_groups_to_add_never_proposes_removal_of_extra_groups():
|
||||
# emo currently has code-shared+docker (legacy). A power-user reconcile wants
|
||||
# no groups -> must NOT strip anything (additive-only invariant).
|
||||
assert eng.groups_to_add((), ("code-shared", "docker")) == []
|
||||
|
||||
|
||||
def test_groups_to_add_idempotent_when_all_present():
|
||||
assert eng.groups_to_add(("sudo",), ("sudo", "docker")) == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# offboarding diff: staged plan, destructive never auto (module #5)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_to_deprovision_is_old_minus_new():
|
||||
old = _roster(THREE)
|
||||
new = _roster(
|
||||
"""
|
||||
users:
|
||||
wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}
|
||||
emo: {authentik_user: emil.barzin, k8s_user: emo, tier: power-user}
|
||||
"""
|
||||
)
|
||||
assert eng.to_deprovision(old, new) == ["ancamilea"]
|
||||
|
||||
|
||||
def test_to_deprovision_empty_when_nothing_removed():
|
||||
r = _roster(THREE)
|
||||
assert eng.to_deprovision(r, r) == []
|
||||
|
||||
|
||||
def test_offboard_plan_reversible_cut_targets_exactly_the_removed_user():
|
||||
old = _roster(THREE)
|
||||
new = _roster(
|
||||
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
||||
)
|
||||
plan = eng.offboard_plan(old, new, include_destructive=False)
|
||||
cut_users = {a.os_user for a in plan}
|
||||
assert cut_users == {"emo", "ancamilea"}
|
||||
assert all(a.reversible for a in plan)
|
||||
|
||||
|
||||
def test_offboard_plan_excludes_destructive_by_default():
|
||||
old = _roster(THREE)
|
||||
new = _roster(
|
||||
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
||||
)
|
||||
auto = eng.offboard_plan(old, new, include_destructive=False)
|
||||
assert all(a.kind != "userdel_archive" for a in auto)
|
||||
|
||||
|
||||
def test_offboard_plan_includes_destructive_only_when_explicitly_requested():
|
||||
old = _roster(THREE)
|
||||
new = _roster(
|
||||
"users: {wizard: {authentik_user: vbarzin, k8s_user: wizard, tier: admin}}"
|
||||
)
|
||||
full = eng.offboard_plan(old, new, include_destructive=True)
|
||||
destructive = [a for a in full if a.kind == "userdel_archive"]
|
||||
assert {a.os_user for a in destructive} == {"emo", "ancamilea"}
|
||||
assert all(not a.reversible for a in destructive)
|
||||
Loading…
Add table
Add a link
Reference in a new issue