fastapi-gsap/gsap_broker/templates/registry.py
Tyler J King 77964e4042 feat(templates): add template system — manifest, policy, loader, registries
bastion.toml manifest parser with variable validation and dependency
declarations. Declarative compliance policy schema with per-platform
check implementations. Template loader with variable substitution
(Bastion-owned files only — never touches Ansible/Terraform).
PolicyRegistry and AccordRegistry with builtin fallbacks.

BOUNDARY: loader never touches automation framework files.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 11:09:41 -04:00

112 lines
3.7 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Registries for loaded template content.
Policies, accords, and harnesses are loaded from templates
and stored in memory. The broker's authorization flow and
compliance evaluator query these registries.
"""
from __future__ import annotations
import logging
from typing import Any, Optional
from gsap_broker.templates.policy import CompliancePolicy
logger = logging.getLogger(__name__)
class PolicyRegistry:
"""Stores loaded compliance policies."""
def __init__(self) -> None:
self._policies: dict[str, tuple[CompliancePolicy, dict]] = {}
def register(self, policy: CompliancePolicy, provenance: dict | None = None) -> None:
self._policies[policy.name] = (policy, provenance or {})
logger.info("Policy registered: %s v%s", policy.name, policy.version)
def get(self, name: str) -> Optional[CompliancePolicy]:
entry = self._policies.get(name)
return entry[0] if entry else None
def list(self) -> list[CompliancePolicy]:
return [p for p, _ in self._policies.values()]
def for_framework(self, framework: str) -> list[CompliancePolicy]:
return [p for p, _ in self._policies.values() if p.framework == framework]
class AccordRegistry:
"""Stores loaded accord templates.
Replaces the hardcoded dict in mcp.py and authorize.py.
Falls back to built-in defaults for known accords if no
template has been loaded.
"""
# Built-in defaults (from the original hardcoded dicts)
_BUILTINS: dict[str, dict[str, Any]] = {
"shell-exec": {
"name": "shell-exec",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"mfa_required": False,
"device_compliance_required": False,
},
"dev-operations": {
"name": "dev-operations",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 60,
"mfa_required": False,
"device_compliance_required": False,
},
"network-mutate": {
"name": "network-mutate",
"capability_ceiling": "CAP_GOVERN",
"session_ttl_minutes": 15,
"mfa_required": True,
"ceremony_gate": "network-admin-elevated",
"device_compliance_required": False,
},
"ai-delegation-standard": {
"name": "ai-delegation-standard",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 60,
"ceremony_required_for": ["delete", "destroy", "drop"],
"max_commands": 500,
"device_compliance_required": False,
},
"infrastructure-operations": {
"name": "infrastructure-operations",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"device_compliance_required": True,
},
"device-management": {
"name": "device-management",
"capability_ceiling": "CAP_MUTATE",
"session_ttl_minutes": 30,
"device_compliance_required": True,
},
}
def __init__(self) -> None:
self._accords: dict[str, tuple[dict, dict]] = {}
def register(self, name: str, accord: dict, provenance: dict | None = None) -> None:
self._accords[name] = (accord, provenance or {})
logger.info("Accord registered: %s", name)
def get(self, name: str) -> Optional[dict]:
entry = self._accords.get(name)
if entry:
return entry[0]
return self._BUILTINS.get(name)
def list(self) -> list[dict]:
all_accords = dict(self._BUILTINS)
all_accords.update({k: v for k, (v, _) in self._accords.items()})
return list(all_accords.values())