Go-based network automation with YANG models, gRPC, Ansible, Terraform, and Kubernetes integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
YANG compiler entry point.
|
|
|
|
Loads the sovereign-sdwan YANG schema, validates site instance data,
|
|
and dispatches to vendor-specific compiler targets to produce
|
|
device configuration payloads.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from lxml import etree
|
|
|
|
from . import to_fortios, to_unifi, to_vyos
|
|
|
|
YANG_MODELS_DIR = Path(__file__).parent.parent / "models"
|
|
NAMESPACE = "urn:sovereign:sdwan"
|
|
|
|
|
|
def parse_site_config(path: str) -> etree._Element:
|
|
"""Parse and return the root element of a site config XML file."""
|
|
tree = etree.parse(path)
|
|
return tree.getroot()
|
|
|
|
|
|
def extract_zones(root: etree._Element) -> list[dict[str, Any]]:
|
|
"""Extract zone-policy entries from the site config."""
|
|
ns = {"s": NAMESPACE}
|
|
zones = []
|
|
|
|
for zone_elem in root.findall(".//s:zone-policy/s:zone", ns):
|
|
zone: dict[str, Any] = {
|
|
"name": zone_elem.findtext("s:name", default="", namespaces=ns),
|
|
"subnet": zone_elem.findtext("s:subnet", default="", namespaces=ns),
|
|
"vlan_id": int(zone_elem.findtext("s:vlan-id", default="0", namespaces=ns)),
|
|
"owner_device": zone_elem.findtext("s:owner-device", default="", namespaces=ns),
|
|
"policies": [],
|
|
}
|
|
|
|
for policy_elem in zone_elem.findall("s:policy", ns):
|
|
policy = {
|
|
"dst_zone": policy_elem.findtext("s:dst-zone", default="", namespaces=ns),
|
|
"action": policy_elem.findtext("s:action", default="deny", namespaces=ns),
|
|
"services": [
|
|
svc.text for svc in policy_elem.findall("s:services", ns) if svc.text
|
|
],
|
|
}
|
|
zone["policies"].append(policy)
|
|
|
|
zones.append(zone)
|
|
|
|
return zones
|
|
|
|
|
|
def extract_circuits(root: etree._Element) -> list[dict[str, Any]]:
|
|
"""Extract WAN circuit entries from the site config."""
|
|
ns = {"s": NAMESPACE}
|
|
circuits = []
|
|
|
|
for circuit_elem in root.findall(".//s:wan-circuits/s:circuit", ns):
|
|
circuit: dict[str, Any] = {
|
|
"name": circuit_elem.findtext("s:name", default="", namespaces=ns),
|
|
"type": circuit_elem.findtext("s:type", default="", namespaces=ns),
|
|
"interface": circuit_elem.findtext("s:interface-name", default="", namespaces=ns),
|
|
}
|
|
|
|
sla_elem = circuit_elem.find("s:sla", ns)
|
|
if sla_elem is not None:
|
|
circuit["sla"] = {
|
|
"latency_ms": int(sla_elem.findtext("s:latency-target-ms", default="0", namespaces=ns)),
|
|
"jitter_ms": int(sla_elem.findtext("s:jitter-target-ms", default="0", namespaces=ns)),
|
|
"loss_pct": float(sla_elem.findtext("s:loss-target-pct", default="0", namespaces=ns)),
|
|
}
|
|
|
|
circuits.append(circuit)
|
|
|
|
return circuits
|
|
|
|
|
|
def compile_site(site_config_path: str, output_format: str = "json") -> dict[str, Any]:
|
|
"""
|
|
Compile a site configuration into vendor-specific payloads.
|
|
|
|
Returns a dict mapping device type to its compiled payloads.
|
|
"""
|
|
root = parse_site_config(site_config_path)
|
|
zones = extract_zones(root)
|
|
circuits = extract_circuits(root)
|
|
|
|
result: dict[str, Any] = {"site_config": site_config_path, "devices": {}}
|
|
|
|
# Group zones by owner device and compile per-device payloads.
|
|
device_zones: dict[str, list[dict[str, Any]]] = {}
|
|
for zone in zones:
|
|
device = zone["owner_device"]
|
|
if device:
|
|
device_zones.setdefault(device, []).append(zone)
|
|
|
|
for device, dev_zones in device_zones.items():
|
|
if "fortigate" in device.lower():
|
|
result["devices"][device] = to_fortios.compile_zones(dev_zones)
|
|
elif "vyos" in device.lower():
|
|
result["devices"][device] = to_vyos.compile_zones(dev_zones)
|
|
elif "udr" in device.lower() or "unifi" in device.lower():
|
|
result["devices"][device] = to_unifi.compile_zones(dev_zones)
|
|
|
|
if circuits:
|
|
result["circuits"] = circuits
|
|
|
|
return result
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="YANG site config compiler")
|
|
parser.add_argument("--site-config", required=True, help="Path to site config XML")
|
|
parser.add_argument("--output-format", default="json", choices=["json"], help="Output format")
|
|
args = parser.parse_args()
|
|
|
|
result = compile_site(args.site_config, args.output_format)
|
|
json.dump(result, sys.stdout, indent=2)
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|