#!/usr/bin/env python3 """ YANG compiler entry point. Loads the sovereign-sdwan YANG schema, validates site instance data, and dispatches to vendor-specific compiler targets to produce device configuration payloads. """ import argparse import json import sys from pathlib import Path from typing import Any from lxml import etree from . import to_fortios, to_unifi, to_vyos YANG_MODELS_DIR = Path(__file__).parent.parent / "models" NAMESPACE = "urn:sovereign:sdwan" def parse_site_config(path: str) -> etree._Element: """Parse and return the root element of a site config XML file.""" tree = etree.parse(path) return tree.getroot() def extract_zones(root: etree._Element) -> list[dict[str, Any]]: """Extract zone-policy entries from the site config.""" ns = {"s": NAMESPACE} zones = [] for zone_elem in root.findall(".//s:zone-policy/s:zone", ns): zone: dict[str, Any] = { "name": zone_elem.findtext("s:name", default="", namespaces=ns), "subnet": zone_elem.findtext("s:subnet", default="", namespaces=ns), "vlan_id": int(zone_elem.findtext("s:vlan-id", default="0", namespaces=ns)), "owner_device": zone_elem.findtext("s:owner-device", default="", namespaces=ns), "policies": [], } for policy_elem in zone_elem.findall("s:policy", ns): policy = { "dst_zone": policy_elem.findtext("s:dst-zone", default="", namespaces=ns), "action": policy_elem.findtext("s:action", default="deny", namespaces=ns), "services": [ svc.text for svc in policy_elem.findall("s:services", ns) if svc.text ], } zone["policies"].append(policy) zones.append(zone) return zones def extract_circuits(root: etree._Element) -> list[dict[str, Any]]: """Extract WAN circuit entries from the site config.""" ns = {"s": NAMESPACE} circuits = [] for circuit_elem in root.findall(".//s:wan-circuits/s:circuit", ns): circuit: dict[str, Any] = { "name": circuit_elem.findtext("s:name", default="", namespaces=ns), "type": circuit_elem.findtext("s:type", default="", namespaces=ns), "interface": circuit_elem.findtext("s:interface-name", default="", namespaces=ns), } sla_elem = circuit_elem.find("s:sla", ns) if sla_elem is not None: circuit["sla"] = { "latency_ms": int(sla_elem.findtext("s:latency-target-ms", default="0", namespaces=ns)), "jitter_ms": int(sla_elem.findtext("s:jitter-target-ms", default="0", namespaces=ns)), "loss_pct": float(sla_elem.findtext("s:loss-target-pct", default="0", namespaces=ns)), } circuits.append(circuit) return circuits def compile_site(site_config_path: str, output_format: str = "json") -> dict[str, Any]: """ Compile a site configuration into vendor-specific payloads. Returns a dict mapping device type to its compiled payloads. """ root = parse_site_config(site_config_path) zones = extract_zones(root) circuits = extract_circuits(root) result: dict[str, Any] = {"site_config": site_config_path, "devices": {}} # Group zones by owner device and compile per-device payloads. device_zones: dict[str, list[dict[str, Any]]] = {} for zone in zones: device = zone["owner_device"] if device: device_zones.setdefault(device, []).append(zone) for device, dev_zones in device_zones.items(): if "fortigate" in device.lower(): result["devices"][device] = to_fortios.compile_zones(dev_zones) elif "vyos" in device.lower(): result["devices"][device] = to_vyos.compile_zones(dev_zones) elif "udr" in device.lower() or "unifi" in device.lower(): result["devices"][device] = to_unifi.compile_zones(dev_zones) if circuits: result["circuits"] = circuits return result def main() -> None: parser = argparse.ArgumentParser(description="YANG site config compiler") parser.add_argument("--site-config", required=True, help="Path to site config XML") parser.add_argument("--output-format", default="json", choices=["json"], help="Output format") args = parser.parse_args() result = compile_site(args.site_config, args.output_format) json.dump(result, sys.stdout, indent=2) print() if __name__ == "__main__": main()