Go-based network automation with YANG models, gRPC, Ansible, Terraform, and Kubernetes integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
98 lines
3.1 KiB
Python
98 lines
3.1 KiB
Python
"""
|
|
VyOS NETCONF XML generator.
|
|
|
|
Transforms YANG zone-policy data into VyOS NETCONF edit-config XML
|
|
payloads for interface, firewall zone, and static route configuration.
|
|
"""
|
|
|
|
from typing import Any
|
|
|
|
from lxml import etree
|
|
|
|
|
|
def compile_zones(zones: list[dict[str, Any]]) -> dict[str, Any]:
|
|
"""
|
|
Compile zone definitions into VyOS NETCONF XML payloads.
|
|
|
|
Returns a dict with 'edit_configs' containing XML strings
|
|
for NETCONF edit-config operations.
|
|
"""
|
|
result: dict[str, Any] = {
|
|
"target": "vyos",
|
|
"protocol": "netconf",
|
|
"edit_configs": [],
|
|
}
|
|
|
|
for zone in zones:
|
|
# VyOS interface configuration.
|
|
iface_xml = _build_interface_config(zone)
|
|
result["edit_configs"].append({
|
|
"description": f"Configure VLAN {zone['vlan_id']} interface for zone {zone['name']}",
|
|
"xml": etree.tostring(iface_xml, pretty_print=True).decode(),
|
|
})
|
|
|
|
# VyOS zone-policy firewall rules.
|
|
for policy in zone.get("policies", []):
|
|
fw_xml = _build_firewall_rule(zone["name"], policy)
|
|
result["edit_configs"].append({
|
|
"description": f"Firewall rule: {zone['name']} -> {policy['dst_zone']}",
|
|
"xml": etree.tostring(fw_xml, pretty_print=True).decode(),
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def _build_interface_config(zone: dict[str, Any]) -> etree._Element:
|
|
"""Build VyOS NETCONF XML for a VLAN interface."""
|
|
config = etree.Element("config")
|
|
interfaces = etree.SubElement(config, "interfaces")
|
|
ethernet = etree.SubElement(interfaces, "ethernet")
|
|
|
|
# VyOS VLAN sub-interface: ethX vif <vlan-id>.
|
|
tagnode = etree.SubElement(ethernet, "tagnode")
|
|
tagnode.text = "eth0" # Parent interface — configurable per-site.
|
|
|
|
vif = etree.SubElement(ethernet, "vif")
|
|
vif_tagnode = etree.SubElement(vif, "tagnode")
|
|
vif_tagnode.text = str(zone["vlan_id"])
|
|
|
|
address = etree.SubElement(vif, "address")
|
|
address.text = zone["subnet"]
|
|
|
|
description = etree.SubElement(vif, "description")
|
|
description.text = f"Kedge managed: {zone['name']}"
|
|
|
|
return config
|
|
|
|
|
|
def _build_firewall_rule(src_zone: str, policy: dict[str, Any]) -> etree._Element:
|
|
"""Build VyOS NETCONF XML for a zone-policy firewall rule."""
|
|
config = etree.Element("config")
|
|
zone_policy = etree.SubElement(config, "zone-policy")
|
|
zone = etree.SubElement(zone_policy, "zone")
|
|
|
|
name = etree.SubElement(zone, "name")
|
|
name.text = src_zone
|
|
|
|
from_elem = etree.SubElement(zone, "from")
|
|
from_zone = etree.SubElement(from_elem, "zone")
|
|
from_zone.text = policy["dst_zone"]
|
|
|
|
firewall = etree.SubElement(from_elem, "firewall")
|
|
fw_name = etree.SubElement(firewall, "name")
|
|
fw_name.text = f"{src_zone}-to-{policy['dst_zone']}"
|
|
|
|
action = etree.SubElement(firewall, "default-action")
|
|
action.text = _map_action(policy["action"])
|
|
|
|
return config
|
|
|
|
|
|
def _map_action(yang_action: str) -> str:
|
|
"""Map YANG action enum to VyOS firewall action."""
|
|
mapping = {
|
|
"allow-stateful": "accept",
|
|
"allow-restricted": "accept",
|
|
"deny": "drop",
|
|
}
|
|
return mapping.get(yang_action, "drop")
|