guildhouse-spire-plugins/cmd/gsap-attestor/attestor_test.go
Tyler J King fe5e2cf3c6 feat(spire): gsap-attestor WorkloadAttestor plugin
SPIRE WorkloadAttestor that reads governance env vars from /proc/{pid}/environ
(walking up the process tree to find gsh) and emits gsap: selectors on workload
SVIDs. Maps BASCULE_* vars set by bascule-shell and future GSH_* vars to the
11-selector vocabulary defined in gsap-types/src/selectors.rs.

- pkg/gsap/selectors.go: shared Go constants mirroring Rust vocabulary
- cmd/gsap-attestor/: plugin implementation with /proc reading, process tree
  walking, capability ceiling translation, and fail-open for non-governed processes
- 28 tests covering selector extraction, proc parsing, tree walking, and depth limits
- Makefile, Dockerfile, deploy config updated

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-13 03:59:08 -04:00

399 lines
10 KiB
Go

package main
import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"testing"
"github.com/guildhouse-cooperative/guildhouse-spire-plugins/pkg/gsap"
)
// writeMockProc creates a fake /proc/{pid} directory with environ and status files.
func writeMockProc(t *testing.T, root string, pid int32, env map[string]string, ppid int32) {
t.Helper()
dir := filepath.Join(root, fmt.Sprintf("%d", pid))
if err := os.MkdirAll(dir, 0755); err != nil {
t.Fatal(err)
}
var buf bytes.Buffer
for k, v := range env {
buf.WriteString(k + "=" + v)
buf.WriteByte(0)
}
if err := os.WriteFile(filepath.Join(dir, "environ"), buf.Bytes(), 0644); err != nil {
t.Fatal(err)
}
status := fmt.Sprintf("Name:\tmockproc\nPPid:\t%d\n", ppid)
if err := os.WriteFile(filepath.Join(dir, "status"), []byte(status), 0644); err != nil {
t.Fatal(err)
}
}
// --- extractSelectors unit tests (pure, no /proc) ---
func TestExtractSelectors_FullGoverned(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:web:example.com/tyler",
"BASCULE_AUTH_METHOD": "oidc-entra",
"BASCULE_POSTURE_LEVEL": "5",
"BASCULE_CAPABILITY_CEILING": "CAP_MUTATE",
"BASCULE_SESSION_ID": "sess-123",
"BASCULE_CORPUS_CID": "sha256:dev-guildhouse-cli",
}
selectors := extractSelectors(env)
smap := selectorMap(selectors)
assertSelector(t, smap, "principal_did", "did:web:example.com/tyler")
assertSelector(t, smap, "driver_id", "oidc-entra")
assertSelector(t, smap, "posture_level", "5")
assertSelector(t, smap, "capability_mask", "0x07")
assertSelector(t, smap, "context_id", "sess-123")
assertSelector(t, smap, "corpus_cid", "sha256:dev-guildhouse-cli")
if len(selectors) != 6 {
t.Errorf("expected 6 selectors, got %d: %v", len(selectors), selectors)
}
}
func TestExtractSelectors_GSH_DID_Override(t *testing.T) {
env := map[string]string{
"GSH_DID": "did:web:override.com/alice",
"BASCULE_PRINCIPAL": "did:web:example.com/tyler",
"BASCULE_AUTH_METHOD": "ssh-key",
}
selectors := extractSelectors(env)
smap := selectorMap(selectors)
assertSelector(t, smap, "principal_did", "did:web:override.com/alice")
}
func TestExtractSelectors_CapabilityCeiling_AllValues(t *testing.T) {
tests := []struct {
ceiling string
wantHex string
}{
{"CAP_NONE", "0x00"},
{"CAP_READ", "0x01"},
{"CAP_PROPOSE", "0x03"},
{"CAP_MUTATE", "0x07"},
{"CAP_GOVERN", "0x0f"},
}
for _, tt := range tests {
t.Run(tt.ceiling, func(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"BASCULE_CAPABILITY_CEILING": tt.ceiling,
}
smap := selectorMap(extractSelectors(env))
assertSelector(t, smap, "capability_mask", tt.wantHex)
})
}
}
func TestExtractSelectors_GSH_CAPABILITY_MASK_Override(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"GSH_CAPABILITY_MASK": "0x0f",
"BASCULE_CAPABILITY_CEILING": "CAP_READ",
}
smap := selectorMap(extractSelectors(env))
assertSelector(t, smap, "capability_mask", "0x0f")
}
func TestExtractSelectors_PostureLevel_Fallback(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"BASCULE_DEFCON_LEVEL": "3",
}
smap := selectorMap(extractSelectors(env))
assertSelector(t, smap, "posture_level", "3")
}
func TestExtractSelectors_PostureLevel_PrimaryWins(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"BASCULE_POSTURE_LEVEL": "5",
"BASCULE_DEFCON_LEVEL": "3",
}
smap := selectorMap(extractSelectors(env))
assertSelector(t, smap, "posture_level", "5")
}
func TestExtractSelectors_UnknownCeiling(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"BASCULE_CAPABILITY_CEILING": "CAP_UNKNOWN",
}
smap := selectorMap(extractSelectors(env))
if _, ok := smap["capability_mask"]; ok {
t.Error("expected no capability_mask selector for unknown ceiling")
}
}
func TestExtractSelectors_EmptyEnv(t *testing.T) {
selectors := extractSelectors(map[string]string{})
if len(selectors) != 0 {
t.Errorf("expected no selectors for empty env, got %v", selectors)
}
}
func TestExtractSelectors_FutureVars(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"GSH_ACCORD_TEMPLATE": "m365-governance",
"GSH_PLAYBOOK": "m365:groups:create",
"GSH_PARAMETERS_CID": "sha256:params",
"GSH_SESSION_MODE": "true",
"GSH_SHELL_CLASS": "Human",
}
smap := selectorMap(extractSelectors(env))
assertSelector(t, smap, "accord_template", "m365-governance")
assertSelector(t, smap, "playbook", "m365:groups:create")
assertSelector(t, smap, "parameters_cid", "sha256:params")
assertSelector(t, smap, "session_mode", "true")
assertSelector(t, smap, "shell_class", "Human")
}
func TestExtractSelectors_Format(t *testing.T) {
env := map[string]string{
"BASCULE_PRINCIPAL": "did:test",
"BASCULE_AUTH_METHOD": "ssh-key",
"BASCULE_POSTURE_LEVEL": "4",
}
for _, sel := range extractSelectors(env) {
if !strings.HasPrefix(sel, "gsap:") {
t.Errorf("selector %q does not start with gsap:", sel)
}
parts := strings.SplitN(sel, ":", 3)
if len(parts) != 3 {
t.Errorf("selector %q does not have 3 colon-separated parts", sel)
}
}
}
// --- proc reading tests ---
func TestReadProcEnviron_Simple(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 42, map[string]string{"FOO": "bar", "BAZ": "qux"}, 1)
env, err := readProcEnviron(root, 42)
if err != nil {
t.Fatal(err)
}
if env["FOO"] != "bar" || env["BAZ"] != "qux" {
t.Errorf("unexpected env: %v", env)
}
}
func TestReadProcEnviron_EmptyFile(t *testing.T) {
root := t.TempDir()
dir := filepath.Join(root, "42")
os.MkdirAll(dir, 0755)
os.WriteFile(filepath.Join(dir, "environ"), []byte{}, 0644)
env, err := readProcEnviron(root, 42)
if err != nil {
t.Fatal(err)
}
if len(env) != 0 {
t.Errorf("expected empty map, got %v", env)
}
}
func TestReadProcEnviron_Missing(t *testing.T) {
root := t.TempDir()
_, err := readProcEnviron(root, 9999)
if err == nil {
t.Error("expected error for missing PID")
}
}
func TestGetParentPid_Normal(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 100, map[string]string{}, 42)
ppid, err := getParentPid(root, 100)
if err != nil {
t.Fatal(err)
}
if ppid != 42 {
t.Errorf("expected ppid 42, got %d", ppid)
}
}
func TestGetParentPid_Init(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 1, map[string]string{}, 0)
ppid, err := getParentPid(root, 1)
if err != nil {
t.Fatal(err)
}
if ppid != 0 {
t.Errorf("expected ppid 0, got %d", ppid)
}
}
// --- Attest integration tests ---
func newTestAttestor(root string) *GsapAttestor {
a := &GsapAttestor{}
a.Configure(GsapAttestorConfig{ProcRoot: root, MaxDepth: 10})
return a
}
func TestAttest_DirectProcess(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 100, map[string]string{
"BASCULE_PRINCIPAL": "did:web:test/tyler",
"BASCULE_AUTH_METHOD": "oidc-entra",
"BASCULE_POSTURE_LEVEL": "5",
}, 1)
a := newTestAttestor(root)
selectors, err := a.Attest(context.Background(), 100)
if err != nil {
t.Fatal(err)
}
if len(selectors) == 0 {
t.Fatal("expected selectors for governed process")
}
smap := selectorMap(selectors)
assertSelector(t, smap, "principal_did", "did:web:test/tyler")
assertSelector(t, smap, "driver_id", "oidc-entra")
assertSelector(t, smap, "posture_level", "5")
}
func TestAttest_WalkToParent(t *testing.T) {
root := t.TempDir()
// Parent (gsh) has governance vars
writeMockProc(t, root, 50, map[string]string{
"BASCULE_PRINCIPAL": "did:web:test/tyler",
"BASCULE_AUTH_METHOD": "ssh-key",
}, 1)
// Child process has no governance vars
writeMockProc(t, root, 100, map[string]string{
"HOME": "/home/tyler",
"PATH": "/usr/bin",
}, 50)
a := newTestAttestor(root)
selectors, err := a.Attest(context.Background(), 100)
if err != nil {
t.Fatal(err)
}
if len(selectors) == 0 {
t.Fatal("expected selectors from parent")
}
smap := selectorMap(selectors)
assertSelector(t, smap, "principal_did", "did:web:test/tyler")
}
func TestAttest_DepthLimit(t *testing.T) {
root := t.TempDir()
// Create a chain of 15 processes with no governance vars
for i := int32(2); i <= 16; i++ {
writeMockProc(t, root, i, map[string]string{"HOME": "/tmp"}, i-1)
}
writeMockProc(t, root, 1, map[string]string{
"BASCULE_PRINCIPAL": "did:web:test/unreachable",
}, 0)
a := &GsapAttestor{}
a.Configure(GsapAttestorConfig{ProcRoot: root, MaxDepth: 5})
selectors, err := a.Attest(context.Background(), 16)
if err != nil {
t.Fatal(err)
}
if len(selectors) != 0 {
t.Errorf("expected no selectors beyond depth limit, got %v", selectors)
}
}
func TestAttest_NonGovernedProcess(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 100, map[string]string{
"HOME": "/home/user",
"PATH": "/usr/bin",
}, 1)
writeMockProc(t, root, 1, map[string]string{}, 0)
a := newTestAttestor(root)
selectors, err := a.Attest(context.Background(), 100)
if err != nil {
t.Fatal(err)
}
if len(selectors) != 0 {
t.Errorf("expected no selectors for non-governed process, got %v", selectors)
}
}
func TestAttest_MissingProcEntry(t *testing.T) {
root := t.TempDir()
a := newTestAttestor(root)
selectors, err := a.Attest(context.Background(), 9999)
if err != nil {
t.Errorf("expected nil error for missing PID, got %v", err)
}
if len(selectors) != 0 {
t.Errorf("expected no selectors for missing PID, got %v", selectors)
}
}
func TestAttest_EndToEnd_CapabilityMask(t *testing.T) {
root := t.TempDir()
writeMockProc(t, root, 100, map[string]string{
"BASCULE_PRINCIPAL": "did:web:test/tyler",
"BASCULE_CAPABILITY_CEILING": "CAP_GOVERN",
}, 1)
a := newTestAttestor(root)
selectors, err := a.Attest(context.Background(), 100)
if err != nil {
t.Fatal(err)
}
smap := selectorMap(selectors)
assertSelector(t, smap, "capability_mask", "0x0f")
}
// --- test helpers ---
func selectorMap(selectors []string) map[string]string {
m := make(map[string]string)
for _, sel := range selectors {
parts := strings.SplitN(sel, ":", 3)
if len(parts) == 3 && parts[0] == gsap.SelectorType {
m[parts[1]] = parts[2]
}
}
return m
}
func assertSelector(t *testing.T, smap map[string]string, key, want string) {
t.Helper()
got, ok := smap[key]
if !ok {
keys := make([]string, 0, len(smap))
for k := range smap {
keys = append(keys, k)
}
sort.Strings(keys)
t.Errorf("selector %q not found; present keys: %v", key, keys)
return
}
if got != want {
t.Errorf("selector %q = %q, want %q", key, got, want)
}
}