# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Template repository loader. Loads a template from a local directory (already cloned/forked). Parses the manifest, validates variables, resolves dependencies (by reading their manifests -- actual git cloning is a separate concern), and loads all content into the broker's registries. IMPORTANT: The loader does NOT parse or template automation framework files (Ansible, Terraform, Salt). It only processes Bastion-owned files in Bastion-owned directories. """ from __future__ import annotations import logging import re import subprocess from dataclasses import dataclass, field from pathlib import Path from typing import Any import tomllib from gsap_broker.templates.manifest import TemplateManifest from gsap_broker.templates.policy import CompliancePolicy from gsap_broker.templates.registry import AccordRegistry, PolicyRegistry logger = logging.getLogger(__name__) # Bastion-owned directories where variable substitution is applied. # Files outside these directories are NEVER modified. _BASTION_OWNED_DIRS = frozenset({"policies", "accords", "harnesses", "hbom", "dashboards"}) @dataclass class LoadResult: """Result of loading a template.""" template_name: str = "" policies_loaded: list[str] = field(default_factory=list) accords_loaded: list[str] = field(default_factory=list) warnings: list[str] = field(default_factory=list) errors: list[str] = field(default_factory=list) provenance: dict[str, Any] = field(default_factory=dict) @property def success(self) -> bool: return len(self.errors) == 0 class TemplateLoader: """Loads bastion.toml template repos into broker registries.""" def __init__( self, policy_registry: PolicyRegistry, accord_registry: AccordRegistry, ): self._policies = policy_registry self._accords = accord_registry def load(self, template_dir: str | Path, variables: dict[str, Any]) -> LoadResult: """Load a template from a local directory. 1. Parse bastion.toml 2. Validate required variables are provided 3. Substitute variables into Bastion-owned files ONLY 4. Parse and load policies into policy registry 5. Parse and load accords into accord registry 6. Record template provenance (git commit hash if available) Returns LoadResult with loaded items and any warnings. """ root = Path(template_dir) result = LoadResult() # 1. Parse manifest manifest_path = root / "bastion.toml" if not manifest_path.exists(): result.errors.append(f"No bastion.toml found in {root}") return result manifest = TemplateManifest.from_toml(manifest_path) result.template_name = manifest.template.name # 2. Validate variables # Apply defaults first effective_vars = {} for name, var in manifest.variables.items(): if name in variables: effective_vars[name] = variables[name] elif var.default is not None: effective_vars[name] = var.default errors = manifest.validate_variables(effective_vars) if errors: result.errors.extend(errors) return result # 3. Provenance result.provenance = self._compute_provenance(root) # 4. Load policies policies_dir = root / manifest.contents.policies if policies_dir.is_dir(): for f in sorted(policies_dir.glob("*.toml")): try: content = self._substitute_variables( f.read_text(), effective_vars, manifest.variables ) # Parse from substituted content data = tomllib.loads(content) policy = CompliancePolicy.model_validate(data) self._policies.register(policy, result.provenance) result.policies_loaded.append(policy.name) except Exception as e: result.warnings.append(f"Failed to load policy {f.name}: {e}") # 5. Load accords accords_dir = root / manifest.contents.accords if accords_dir.is_dir(): for f in sorted(accords_dir.glob("*.toml")): try: content = self._substitute_variables( f.read_text(), effective_vars, manifest.variables ) data = tomllib.loads(content) name = data.get("name", f.stem) self._accords.register(name, data, result.provenance) result.accords_loaded.append(name) except Exception as e: result.warnings.append(f"Failed to load accord {f.name}: {e}") return result def _substitute_variables( self, content: str, values: dict[str, Any], var_defs: dict ) -> str: """Replace ${variable_name} in content with values. ONLY called on Bastion-owned TOML files. NEVER on Ansible/Terraform/script files. Warns on unresolved optional variables. """ def replacer(match: re.Match) -> str: var_name = match.group(1) if var_name in values: val = values[var_name] # Don't log sensitive values var_def = var_defs.get(var_name) if var_def and not var_def.sensitive: logger.debug("Substituting ${%s} = %s", var_name, val) else: logger.debug("Substituting ${%s} = [REDACTED]", var_name) return str(val) logger.warning("Unresolved variable: ${%s}", var_name) return match.group(0) return re.sub(r"\$\{(\w+)\}", replacer, content) def _compute_provenance(self, template_dir: Path) -> dict[str, Any]: """Compute git commit hash and repo origin for template provenance.""" provenance: dict[str, Any] = {"template_dir": str(template_dir)} try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=template_dir, capture_output=True, text=True, timeout=5, ) if result.returncode == 0: provenance["git_commit"] = result.stdout.strip() result = subprocess.run( ["git", "remote", "get-url", "origin"], cwd=template_dir, capture_output=True, text=True, timeout=5, ) if result.returncode == 0: provenance["git_origin"] = result.stdout.strip() except Exception: pass return provenance