bastion.toml manifest parser with variable validation and dependency declarations. Declarative compliance policy schema with per-platform check implementations. Template loader with variable substitution (Bastion-owned files only — never touches Ansible/Terraform). PolicyRegistry and AccordRegistry with builtin fallbacks. BOUNDARY: loader never touches automation framework files. Signed-off-by: Tyler King <tking@guildhouse.dev>
112 lines
3.7 KiB
Python
112 lines
3.7 KiB
Python
# Copyright 2026 Guildhouse Dev
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""Registries for loaded template content.
|
|
|
|
Policies, accords, and harnesses are loaded from templates
|
|
and stored in memory. The broker's authorization flow and
|
|
compliance evaluator query these registries.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Optional
|
|
|
|
from gsap_broker.templates.policy import CompliancePolicy
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PolicyRegistry:
|
|
"""Stores loaded compliance policies."""
|
|
|
|
def __init__(self) -> None:
|
|
self._policies: dict[str, tuple[CompliancePolicy, dict]] = {}
|
|
|
|
def register(self, policy: CompliancePolicy, provenance: dict | None = None) -> None:
|
|
self._policies[policy.name] = (policy, provenance or {})
|
|
logger.info("Policy registered: %s v%s", policy.name, policy.version)
|
|
|
|
def get(self, name: str) -> Optional[CompliancePolicy]:
|
|
entry = self._policies.get(name)
|
|
return entry[0] if entry else None
|
|
|
|
def list(self) -> list[CompliancePolicy]:
|
|
return [p for p, _ in self._policies.values()]
|
|
|
|
def for_framework(self, framework: str) -> list[CompliancePolicy]:
|
|
return [p for p, _ in self._policies.values() if p.framework == framework]
|
|
|
|
|
|
class AccordRegistry:
|
|
"""Stores loaded accord templates.
|
|
|
|
Replaces the hardcoded dict in mcp.py and authorize.py.
|
|
Falls back to built-in defaults for known accords if no
|
|
template has been loaded.
|
|
"""
|
|
|
|
# Built-in defaults (from the original hardcoded dicts)
|
|
_BUILTINS: dict[str, dict[str, Any]] = {
|
|
"shell-exec": {
|
|
"name": "shell-exec",
|
|
"capability_ceiling": "CAP_MUTATE",
|
|
"session_ttl_minutes": 30,
|
|
"mfa_required": False,
|
|
"device_compliance_required": False,
|
|
},
|
|
"dev-operations": {
|
|
"name": "dev-operations",
|
|
"capability_ceiling": "CAP_MUTATE",
|
|
"session_ttl_minutes": 60,
|
|
"mfa_required": False,
|
|
"device_compliance_required": False,
|
|
},
|
|
"network-mutate": {
|
|
"name": "network-mutate",
|
|
"capability_ceiling": "CAP_GOVERN",
|
|
"session_ttl_minutes": 15,
|
|
"mfa_required": True,
|
|
"ceremony_gate": "network-admin-elevated",
|
|
"device_compliance_required": False,
|
|
},
|
|
"ai-delegation-standard": {
|
|
"name": "ai-delegation-standard",
|
|
"capability_ceiling": "CAP_MUTATE",
|
|
"session_ttl_minutes": 60,
|
|
"ceremony_required_for": ["delete", "destroy", "drop"],
|
|
"max_commands": 500,
|
|
"device_compliance_required": False,
|
|
},
|
|
"infrastructure-operations": {
|
|
"name": "infrastructure-operations",
|
|
"capability_ceiling": "CAP_MUTATE",
|
|
"session_ttl_minutes": 30,
|
|
"device_compliance_required": True,
|
|
},
|
|
"device-management": {
|
|
"name": "device-management",
|
|
"capability_ceiling": "CAP_MUTATE",
|
|
"session_ttl_minutes": 30,
|
|
"device_compliance_required": True,
|
|
},
|
|
}
|
|
|
|
def __init__(self) -> None:
|
|
self._accords: dict[str, tuple[dict, dict]] = {}
|
|
|
|
def register(self, name: str, accord: dict, provenance: dict | None = None) -> None:
|
|
self._accords[name] = (accord, provenance or {})
|
|
logger.info("Accord registered: %s", name)
|
|
|
|
def get(self, name: str) -> Optional[dict]:
|
|
entry = self._accords.get(name)
|
|
if entry:
|
|
return entry[0]
|
|
return self._BUILTINS.get(name)
|
|
|
|
def list(self) -> list[dict]:
|
|
all_accords = dict(self._BUILTINS)
|
|
all_accords.update({k: v for k, (v, _) in self._accords.items()})
|
|
return list(all_accords.values())
|