kedge/yang/compiler/compile.py
Tyler King 6058e62348 Initial commit: Kedge network automation platform
Go-based network automation with YANG models, gRPC, Ansible,
Terraform, and Kubernetes integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 12:09:30 -05:00

129 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""
YANG compiler entry point.
Loads the sovereign-sdwan YANG schema, validates site instance data,
and dispatches to vendor-specific compiler targets to produce
device configuration payloads.
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Any
from lxml import etree
from . import to_fortios, to_unifi, to_vyos
YANG_MODELS_DIR = Path(__file__).parent.parent / "models"
NAMESPACE = "urn:sovereign:sdwan"
def parse_site_config(path: str) -> etree._Element:
"""Parse and return the root element of a site config XML file."""
tree = etree.parse(path)
return tree.getroot()
def extract_zones(root: etree._Element) -> list[dict[str, Any]]:
"""Extract zone-policy entries from the site config."""
ns = {"s": NAMESPACE}
zones = []
for zone_elem in root.findall(".//s:zone-policy/s:zone", ns):
zone: dict[str, Any] = {
"name": zone_elem.findtext("s:name", default="", namespaces=ns),
"subnet": zone_elem.findtext("s:subnet", default="", namespaces=ns),
"vlan_id": int(zone_elem.findtext("s:vlan-id", default="0", namespaces=ns)),
"owner_device": zone_elem.findtext("s:owner-device", default="", namespaces=ns),
"policies": [],
}
for policy_elem in zone_elem.findall("s:policy", ns):
policy = {
"dst_zone": policy_elem.findtext("s:dst-zone", default="", namespaces=ns),
"action": policy_elem.findtext("s:action", default="deny", namespaces=ns),
"services": [
svc.text for svc in policy_elem.findall("s:services", ns) if svc.text
],
}
zone["policies"].append(policy)
zones.append(zone)
return zones
def extract_circuits(root: etree._Element) -> list[dict[str, Any]]:
"""Extract WAN circuit entries from the site config."""
ns = {"s": NAMESPACE}
circuits = []
for circuit_elem in root.findall(".//s:wan-circuits/s:circuit", ns):
circuit: dict[str, Any] = {
"name": circuit_elem.findtext("s:name", default="", namespaces=ns),
"type": circuit_elem.findtext("s:type", default="", namespaces=ns),
"interface": circuit_elem.findtext("s:interface-name", default="", namespaces=ns),
}
sla_elem = circuit_elem.find("s:sla", ns)
if sla_elem is not None:
circuit["sla"] = {
"latency_ms": int(sla_elem.findtext("s:latency-target-ms", default="0", namespaces=ns)),
"jitter_ms": int(sla_elem.findtext("s:jitter-target-ms", default="0", namespaces=ns)),
"loss_pct": float(sla_elem.findtext("s:loss-target-pct", default="0", namespaces=ns)),
}
circuits.append(circuit)
return circuits
def compile_site(site_config_path: str, output_format: str = "json") -> dict[str, Any]:
"""
Compile a site configuration into vendor-specific payloads.
Returns a dict mapping device type to its compiled payloads.
"""
root = parse_site_config(site_config_path)
zones = extract_zones(root)
circuits = extract_circuits(root)
result: dict[str, Any] = {"site_config": site_config_path, "devices": {}}
# Group zones by owner device and compile per-device payloads.
device_zones: dict[str, list[dict[str, Any]]] = {}
for zone in zones:
device = zone["owner_device"]
if device:
device_zones.setdefault(device, []).append(zone)
for device, dev_zones in device_zones.items():
if "fortigate" in device.lower():
result["devices"][device] = to_fortios.compile_zones(dev_zones)
elif "vyos" in device.lower():
result["devices"][device] = to_vyos.compile_zones(dev_zones)
elif "udr" in device.lower() or "unifi" in device.lower():
result["devices"][device] = to_unifi.compile_zones(dev_zones)
if circuits:
result["circuits"] = circuits
return result
def main() -> None:
parser = argparse.ArgumentParser(description="YANG site config compiler")
parser.add_argument("--site-config", required=True, help="Path to site config XML")
parser.add_argument("--output-format", default="json", choices=["json"], help="Output format")
args = parser.parse_args()
result = compile_site(args.site_config, args.output_format)
json.dump(result, sys.stdout, indent=2)
print()
if __name__ == "__main__":
main()