feat(templates): add template system — manifest, policy, loader, registries

bastion.toml manifest parser with variable validation and dependency
declarations. Declarative compliance policy schema with per-platform
check implementations. Template loader with variable substitution
(Bastion-owned files only — never touches Ansible/Terraform).
PolicyRegistry and AccordRegistry with builtin fallbacks.

BOUNDARY: loader never touches automation framework files.

Signed-off-by: Tyler King <tking@guildhouse.dev>
This commit is contained in:
Tyler J King 2026-04-14 11:09:41 -04:00
parent d62974f1b7
commit 77964e4042
10 changed files with 741 additions and 0 deletions

View file

@ -0,0 +1,2 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0

View file

@ -0,0 +1,181 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Template repository loader.
Loads a template from a local directory (already cloned/forked).
Parses the manifest, validates variables, resolves dependencies
(by reading their manifests -- actual git cloning is a separate
concern), and loads all content into the broker's registries.
IMPORTANT: The loader does NOT parse or template automation
framework files (Ansible, Terraform, Salt). It only processes
Bastion-owned files in Bastion-owned directories.
"""
from __future__ import annotations
import logging
import re
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import tomllib
from gsap_broker.templates.manifest import TemplateManifest
from gsap_broker.templates.policy import CompliancePolicy
from gsap_broker.templates.registry import AccordRegistry, PolicyRegistry
logger = logging.getLogger(__name__)
# Bastion-owned directories where variable substitution is applied.
# Files outside these directories are NEVER modified.
_BASTION_OWNED_DIRS = frozenset({"policies", "accords", "harnesses", "hbom", "dashboards"})
@dataclass
class LoadResult:
"""Result of loading a template."""
template_name: str = ""
policies_loaded: list[str] = field(default_factory=list)
accords_loaded: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
provenance: dict[str, Any] = field(default_factory=dict)
@property
def success(self) -> bool:
return len(self.errors) == 0
class TemplateLoader:
"""Loads bastion.toml template repos into broker registries."""
def __init__(
self,
policy_registry: PolicyRegistry,
accord_registry: AccordRegistry,
):
self._policies = policy_registry
self._accords = accord_registry
def load(self, template_dir: str | Path, variables: dict[str, Any]) -> LoadResult:
"""Load a template from a local directory.
1. Parse bastion.toml
2. Validate required variables are provided
3. Substitute variables into Bastion-owned files ONLY
4. Parse and load policies into policy registry
5. Parse and load accords into accord registry
6. Record template provenance (git commit hash if available)
Returns LoadResult with loaded items and any warnings.
"""
root = Path(template_dir)
result = LoadResult()
# 1. Parse manifest
manifest_path = root / "bastion.toml"
if not manifest_path.exists():
result.errors.append(f"No bastion.toml found in {root}")
return result
manifest = TemplateManifest.from_toml(manifest_path)
result.template_name = manifest.template.name
# 2. Validate variables
# Apply defaults first
effective_vars = {}
for name, var in manifest.variables.items():
if name in variables:
effective_vars[name] = variables[name]
elif var.default is not None:
effective_vars[name] = var.default
errors = manifest.validate_variables(effective_vars)
if errors:
result.errors.extend(errors)
return result
# 3. Provenance
result.provenance = self._compute_provenance(root)
# 4. Load policies
policies_dir = root / manifest.contents.policies
if policies_dir.is_dir():
for f in sorted(policies_dir.glob("*.toml")):
try:
content = self._substitute_variables(
f.read_text(), effective_vars, manifest.variables
)
# Parse from substituted content
data = tomllib.loads(content)
policy = CompliancePolicy.model_validate(data)
self._policies.register(policy, result.provenance)
result.policies_loaded.append(policy.name)
except Exception as e:
result.warnings.append(f"Failed to load policy {f.name}: {e}")
# 5. Load accords
accords_dir = root / manifest.contents.accords
if accords_dir.is_dir():
for f in sorted(accords_dir.glob("*.toml")):
try:
content = self._substitute_variables(
f.read_text(), effective_vars, manifest.variables
)
data = tomllib.loads(content)
name = data.get("name", f.stem)
self._accords.register(name, data, result.provenance)
result.accords_loaded.append(name)
except Exception as e:
result.warnings.append(f"Failed to load accord {f.name}: {e}")
return result
def _substitute_variables(
self, content: str, values: dict[str, Any], var_defs: dict
) -> str:
"""Replace ${variable_name} in content with values.
ONLY called on Bastion-owned TOML files.
NEVER on Ansible/Terraform/script files.
Warns on unresolved optional variables.
"""
def replacer(match: re.Match) -> str:
var_name = match.group(1)
if var_name in values:
val = values[var_name]
# Don't log sensitive values
var_def = var_defs.get(var_name)
if var_def and not var_def.sensitive:
logger.debug("Substituting ${%s} = %s", var_name, val)
else:
logger.debug("Substituting ${%s} = [REDACTED]", var_name)
return str(val)
logger.warning("Unresolved variable: ${%s}", var_name)
return match.group(0)
return re.sub(r"\$\{(\w+)\}", replacer, content)
def _compute_provenance(self, template_dir: Path) -> dict[str, Any]:
"""Compute git commit hash and repo origin for template provenance."""
provenance: dict[str, Any] = {"template_dir": str(template_dir)}
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=template_dir, capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
provenance["git_commit"] = result.stdout.strip()
result = subprocess.run(
["git", "remote", "get-url", "origin"],
cwd=template_dir, capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
provenance["git_origin"] = result.stdout.strip()
except Exception:
pass
return provenance

View file

@ -0,0 +1,98 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Bastion template manifest schema.
A template is a Git repository with a bastion.toml at the root.
It contains policies, accords, harnesses, scripts, HBOM profiles,
and optionally framework-specific automation (Ansible, Terraform, etc).
The manifest declares:
- What the template provides (contents)
- What it depends on (dependencies)
- What the deployer must customize (variables)
- What Bastion version and connectors it requires (compatibility)
"""
from __future__ import annotations
import tomllib
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field
class TemplateMetadata(BaseModel):
"""Template identification and categorization."""
name: str
version: str
description: str = ""
authors: list[str] = []
license: str = "Apache-2.0"
repository: Optional[str] = None
vertical: Optional[str] = None
sub_vertical: Optional[str] = None
compliance_frameworks: list[str] = []
target_fleet_size: Optional[str] = None
requires_vdi: bool = False
requires_tpm: bool = False
class TemplateCompatibility(BaseModel):
"""What Bastion version and connectors this template needs."""
bastion_min: str = "0.5.0"
connectors_required: list[str] = []
connectors_optional: list[str] = []
class TemplateDependency(BaseModel):
"""A dependency on another template repository."""
git: str
tag: Optional[str] = None
branch: Optional[str] = None
class TemplateVariable(BaseModel):
"""A variable the deployer must or may customize."""
type: str = "string"
required: bool = False
default: Optional[str] = None
description: str = ""
sensitive: bool = False
class TemplateContents(BaseModel):
"""Directory mapping for template content."""
policies: str = "policies/"
accords: str = "accords/"
harnesses: str = "harnesses/"
scripts: str = "scripts/"
hbom_profiles: str = "hbom/"
dashboards: str = "dashboards/"
class TemplateManifest(BaseModel):
"""Root schema for bastion.toml."""
template: TemplateMetadata
compatibility: TemplateCompatibility = TemplateCompatibility()
dependencies: dict[str, TemplateDependency] = {}
variables: dict[str, TemplateVariable] = {}
contents: TemplateContents = TemplateContents()
@classmethod
def from_toml(cls, path: str | Path) -> TemplateManifest:
"""Parse a bastion.toml file."""
with open(path, "rb") as f:
data = tomllib.load(f)
return cls.model_validate(data)
def validate_variables(self, provided: dict[str, Any]) -> list[str]:
"""Check that all required variables are provided.
Returns list of error messages (empty = valid)."""
errors = []
for name, var in self.variables.items():
if var.required and name not in provided and var.default is None:
errors.append(f"Required variable '{name}' not provided")
return errors

View file

@ -0,0 +1,88 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Declarative compliance policy schema.
A policy defines conditions that managed devices must satisfy.
Each condition maps to a PostureConditionKind evaluated by
Bastion's compliance engine. Conditions can have platform-specific
check implementations (Intune field, script, Keylime attestation).
Policies are framework-agnostic. They describe WHAT must be true,
not HOW to achieve it. Playbooks/scripts achieve compliance;
policies evaluate it.
"""
from __future__ import annotations
import tomllib
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel
class PlatformCheck(BaseModel):
"""Platform-specific check implementation."""
intune_field: Optional[str] = None
intune_policy: Optional[str] = None
expect: Optional[Any] = None
script: Optional[str] = None
keylime_check: Optional[str] = None
class PolicyCondition(BaseModel):
"""A single compliance condition."""
id: str
kind: str
description: str = ""
framework_ref: Optional[str] = None
severity: str = "medium"
optional: bool = False
linux: Optional[PlatformCheck] = None
windows: Optional[PlatformCheck] = None
macos: Optional[PlatformCheck] = None
class BreachResponseConfig(BaseModel):
"""What happens when conditions at each severity level fail."""
critical: str = "suspend_access"
high: str = "alert_msp"
medium: str = "log_only"
low: str = "log_only"
class EvaluationSchedule(BaseModel):
"""How often to evaluate compliance."""
interval_seconds: int = 300
full_evaluation_hours: int = 24
class CompliancePolicy(BaseModel):
"""A complete compliance policy definition."""
name: str
description: str = ""
version: str = "1.0.0"
framework: Optional[str] = None
framework_controls: list[str] = []
conditions: list[PolicyCondition] = []
breach_response: BreachResponseConfig = BreachResponseConfig()
schedule: EvaluationSchedule = EvaluationSchedule()
@classmethod
def from_toml(cls, path: str | Path) -> CompliancePolicy:
"""Parse a policy TOML file."""
with open(path, "rb") as f:
data = tomllib.load(f)
return cls.model_validate(data)
def conditions_for_platform(self, platform: str) -> list[PolicyCondition]:
"""Return conditions applicable to a specific platform."""
result = []
for c in self.conditions:
check = getattr(c, platform, None)
if check is not None:
result.append(c)
return result

View file

@ -0,0 +1,112 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Registries for loaded template content.
Policies, accords, and harnesses are loaded from templates
and stored in memory. The broker's authorization flow and
compliance evaluator query these registries.
"""
from __future__ import annotations
import logging
from typing import Any, Optional
from gsap_broker.templates.policy import CompliancePolicy
logger = logging.getLogger(__name__)
class PolicyRegistry:
"""Stores loaded compliance policies."""
def __init__(self) -> None:
self._policies: dict[str, tuple[CompliancePolicy, dict]] = {}
def register(self, policy: CompliancePolicy, provenance: dict | None = None) -> None:
self._policies[policy.name] = (policy, provenance or {})
logger.info("Policy registered: %s v%s", policy.name, policy.version)
def get(self, name: str) -> Optional[CompliancePolicy]:
entry = self._policies.get(name)
return entry[0] if entry else None
def list(self) -> list[CompliancePolicy]:
return [p for p, _ in self._policies.values()]
def for_framework(self, framework: str) -> list[CompliancePolicy]:
return [p for p, _ in self._policies.values() if p.framework == framework]
class AccordRegistry:
"""Stores loaded accord templates.
Replaces the hardcoded dict in mcp.py and authorize.py.
Falls back to built-in defaults for known accords if no
template has been loaded.
"""
# Built-in defaults (from the original hardcoded dicts)
_BUILTINS: dict[str, dict[str, Any]] = {
"shell-exec": {
"name": "shell-exec",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"mfa_required": False,
"device_compliance_required": False,
},
"dev-operations": {
"name": "dev-operations",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 60,
"mfa_required": False,
"device_compliance_required": False,
},
"network-mutate": {
"name": "network-mutate",
"capability_ceiling": "CAP_GOVERN",
"session_ttl_minutes": 15,
"mfa_required": True,
"ceremony_gate": "network-admin-elevated",
"device_compliance_required": False,
},
"ai-delegation-standard": {
"name": "ai-delegation-standard",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 60,
"ceremony_required_for": ["delete", "destroy", "drop"],
"max_commands": 500,
"device_compliance_required": False,
},
"infrastructure-operations": {
"name": "infrastructure-operations",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"device_compliance_required": True,
},
"device-management": {
"name": "device-management",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"device_compliance_required": True,
},
}
def __init__(self) -> None:
self._accords: dict[str, tuple[dict, dict]] = {}
def register(self, name: str, accord: dict, provenance: dict | None = None) -> None:
self._accords[name] = (accord, provenance or {})
logger.info("Accord registered: %s", name)
def get(self, name: str) -> Optional[dict]:
entry = self._accords.get(name)
if entry:
return entry[0]
return self._BUILTINS.get(name)
def list(self) -> list[dict]:
all_accords = dict(self._BUILTINS)
all_accords.update({k: v for k, (v, _) in self._accords.items()})
return list(all_accords.values())

View file

@ -0,0 +1,6 @@
name = "standard-operations"
capability_ceiling = "CAP_MUTATE"
session_ttl_minutes = 30
mfa_required = false
device_compliance_required = false
description = "Standard operations accord for ${org_name}"

View file

@ -0,0 +1,8 @@
# This is a plain Ansible playbook. Bastion variable substitution
# MUST NOT modify this file. ${org_name} should remain as-is.
---
- name: Test playbook for ${org_name}
hosts: all
tasks:
- name: Ping
ansible.builtin.ping:

View file

@ -0,0 +1,29 @@
[template]
name = "test-baseline"
version = "0.1.0"
description = "Test template for Bastion loader tests"
authors = ["Test Author"]
vertical = "testing"
compliance_frameworks = ["test-framework"]
[compatibility]
bastion_min = "0.3.0"
connectors_required = ["intune"]
[variables.org_name]
type = "string"
required = true
description = "Organization name"
[variables.admin_email]
type = "string"
required = false
default = "admin@example.com"
description = "Admin email"
[variables.api_key]
type = "string"
required = false
default = "test-key"
description = "API key"
sensitive = true

View file

@ -0,0 +1,37 @@
name = "test-workstation-policy"
description = "Test workstation compliance for ${org_name}"
version = "1.0.0"
framework = "test-framework"
framework_controls = ["TC-001", "TC-002"]
[[conditions]]
id = "disk-encryption"
kind = "DiskEncryption"
description = "Full disk encryption required"
framework_ref = "TC-001"
severity = "critical"
[conditions.linux]
script = "scripts/linux/check-encryption.sh"
expect = "encrypted"
[conditions.windows]
intune_field = "isEncrypted"
expect = true
[[conditions]]
id = "antivirus-active"
kind = "AntivirusActive"
description = "Antivirus must be running"
severity = "high"
[conditions.windows]
intune_field = "antiVirusStatus"
expect = "active"
[breach_response]
critical = "suspend_access"
high = "alert_msp"
[schedule]
interval_seconds = 300

180
tests/test_templates.py Normal file
View file

@ -0,0 +1,180 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Tests for the template system — manifest, policy, loader, registry."""
import os
import pytest
from pathlib import Path
from gsap_broker.templates.manifest import TemplateManifest
from gsap_broker.templates.policy import CompliancePolicy
from gsap_broker.templates.loader import TemplateLoader, LoadResult
from gsap_broker.templates.registry import AccordRegistry, PolicyRegistry
FIXTURE_DIR = Path(__file__).parent / "fixtures" / "sample-template"
# ── Manifest parsing ─────────────────────────────────────────────
def test_parse_full_manifest():
"""TEST 2: Full bastion.toml parsed correctly."""
m = TemplateManifest.from_toml(FIXTURE_DIR / "bastion.toml")
assert m.template.name == "test-baseline"
assert m.template.version == "0.1.0"
assert m.template.vertical == "testing"
assert "test-framework" in m.template.compliance_frameworks
assert m.compatibility.bastion_min == "0.3.0"
assert "intune" in m.compatibility.connectors_required
assert "org_name" in m.variables
assert m.variables["org_name"].required is True
assert m.variables["api_key"].sensitive is True
def test_parse_minimal_manifest(tmp_path):
"""TEST 3: Minimal bastion.toml with only required fields."""
toml = tmp_path / "bastion.toml"
toml.write_text('[template]\nname = "minimal"\nversion = "0.1.0"\n')
m = TemplateManifest.from_toml(toml)
assert m.template.name == "minimal"
assert m.compatibility.bastion_min == "0.5.0" # default
assert m.contents.policies == "policies/" # default
def test_validate_variables_missing_required():
"""TEST 4: Missing required variable produces error."""
m = TemplateManifest.from_toml(FIXTURE_DIR / "bastion.toml")
errors = m.validate_variables({}) # org_name missing
assert any("org_name" in e for e in errors)
def test_validate_variables_all_provided():
"""TEST 5: All required variables provided — no errors."""
m = TemplateManifest.from_toml(FIXTURE_DIR / "bastion.toml")
errors = m.validate_variables({"org_name": "TestCorp"})
assert errors == []
# ── Policy parsing ───────────────────────────────────────────────
def test_parse_policy():
"""TEST 6: Multi-condition policy parsed correctly."""
p = CompliancePolicy.from_toml(FIXTURE_DIR / "policies" / "test-workstation.toml")
assert p.name == "test-workstation-policy"
assert p.framework == "test-framework"
assert len(p.conditions) == 2
assert p.conditions[0].id == "disk-encryption"
assert p.conditions[0].severity == "critical"
assert p.breach_response.critical == "suspend_access"
assert p.schedule.interval_seconds == 300
def test_policy_platform_filtering():
"""TEST 7: conditions_for_platform filters correctly."""
p = CompliancePolicy.from_toml(FIXTURE_DIR / "policies" / "test-workstation.toml")
linux_conds = p.conditions_for_platform("linux")
windows_conds = p.conditions_for_platform("windows")
# disk-encryption has linux check, antivirus doesn't
assert len(linux_conds) == 1
assert linux_conds[0].id == "disk-encryption"
# Both conditions have windows checks
assert len(windows_conds) == 2
# ── Template loader ──────────────────────────────────────────────
def test_template_loader_full():
"""TEST 8: Full template load — policies and accords registered."""
policies = PolicyRegistry()
accords = AccordRegistry()
loader = TemplateLoader(policies, accords)
result = loader.load(FIXTURE_DIR, {"org_name": "TestCorp"})
assert result.success
assert "test-workstation-policy" in result.policies_loaded
assert "standard-operations" in result.accords_loaded
assert policies.get("test-workstation-policy") is not None
assert accords.get("standard-operations") is not None
def test_variable_substitution_bastion_files_only():
"""TEST 9: Variable substitution applies to Bastion files, not Ansible files."""
policies = PolicyRegistry()
accords = AccordRegistry()
loader = TemplateLoader(policies, accords)
loader.load(FIXTURE_DIR, {"org_name": "AcmeCorp"})
# Policy file should have substitution applied
policy = policies.get("test-workstation-policy")
assert policy is not None
assert "AcmeCorp" in policy.description
# Ansible playbook should be UNTOUCHED
playbook_path = FIXTURE_DIR / "ansible" / "playbooks" / "test-playbook.yml"
content = playbook_path.read_text()
assert "${org_name}" in content # NOT substituted
def test_variable_substitution_sensitive_not_logged(caplog):
"""TEST 10: Sensitive variable values not in log output."""
import logging
policies = PolicyRegistry()
accords = AccordRegistry()
loader = TemplateLoader(policies, accords)
with caplog.at_level(logging.DEBUG):
loader.load(FIXTURE_DIR, {"org_name": "TestCorp", "api_key": "SECRET_VALUE_123"})
# The sensitive value should not appear in logs
assert "SECRET_VALUE_123" not in caplog.text
# Non-sensitive org_name value DOES appear (confirms substitution ran)
assert "TestCorp" in caplog.text
def test_missing_bastion_toml(tmp_path):
"""Missing bastion.toml produces error, not crash."""
policies = PolicyRegistry()
accords = AccordRegistry()
loader = TemplateLoader(policies, accords)
result = loader.load(tmp_path, {})
assert not result.success
assert any("bastion.toml" in e for e in result.errors)
# ── Registries ───────────────────────────────────────────────────
def test_accord_registry_builtin_fallback():
"""TEST 11: AccordRegistry falls back to builtins for known accords."""
reg = AccordRegistry()
# shell-exec is a builtin
assert reg.get("shell-exec") is not None
assert reg.get("shell-exec")["capability_ceiling"] == "CAP_MUTATE"
# unknown returns None
assert reg.get("nonexistent") is None
def test_accord_registry_template_overrides_builtin():
"""Template-loaded accord overrides builtin with same name."""
reg = AccordRegistry()
reg.register("shell-exec", {"name": "shell-exec", "custom": True})
assert reg.get("shell-exec")["custom"] is True
def test_policy_registry_by_framework():
"""PolicyRegistry.for_framework filters correctly."""
reg = PolicyRegistry()
p1 = CompliancePolicy(name="p1", framework="hipaa")
p2 = CompliancePolicy(name="p2", framework="pci")
p3 = CompliancePolicy(name="p3", framework="hipaa")
reg.register(p1)
reg.register(p2)
reg.register(p3)
hipaa = reg.for_framework("hipaa")
assert len(hipaa) == 2
assert all(p.framework == "hipaa" for p in hipaa)