kedge/yang/compiler/to_fortios.py
Tyler King 6058e62348 Initial commit: Kedge network automation platform
Go-based network automation with YANG models, gRPC, Ansible,
Terraform, and Kubernetes integration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 12:09:30 -05:00

97 lines
2.9 KiB
Python

"""
FortiOS REST API payload generator.
Transforms YANG zone-policy data into FortiOS REST API payloads
for zone, interface, and firewall policy configuration.
"""
from typing import Any
def compile_zones(zones: list[dict[str, Any]]) -> dict[str, Any]:
"""
Compile zone definitions into FortiOS REST API payloads.
Returns a dict with 'zones', 'interfaces', and 'policies' keys,
each containing a list of FortiOS REST API request bodies.
"""
result: dict[str, Any] = {
"target": "fortios",
"api_version": "v2",
"zones": [],
"interfaces": [],
"policies": [],
}
for zone in zones:
# FortiOS zone object.
result["zones"].append({
"path": "system/zone",
"method": "POST",
"body": {
"name": zone["name"],
"interface": [{"interface-name": f"vlan{zone['vlan_id']}"}],
},
})
# FortiOS VLAN interface.
result["interfaces"].append({
"path": "system/interface",
"method": "POST",
"body": {
"name": f"vlan{zone['vlan_id']}",
"type": "vlan",
"vlanid": zone["vlan_id"],
"ip": _subnet_to_ip_mask(zone["subnet"]),
"allowaccess": "ping",
"role": "lan",
},
})
# FortiOS firewall policies for inter-zone traffic.
for policy in zone.get("policies", []):
action = _map_action(policy["action"])
fw_policy: dict[str, Any] = {
"path": "firewall/policy",
"method": "POST",
"body": {
"srcintf": [{"name": zone["name"]}],
"dstintf": [{"name": policy["dst_zone"]}],
"srcaddr": [{"name": "all"}],
"dstaddr": [{"name": "all"}],
"action": action,
"schedule": "always",
"logtraffic": "all",
},
}
if policy.get("services"):
fw_policy["body"]["service"] = [
{"name": svc.upper()} for svc in policy["services"]
]
else:
fw_policy["body"]["service"] = [{"name": "ALL"}]
result["policies"].append(fw_policy)
return result
def _map_action(yang_action: str) -> str:
"""Map YANG action enum to FortiOS action string."""
mapping = {
"allow-stateful": "accept",
"allow-restricted": "accept",
"deny": "deny",
}
return mapping.get(yang_action, "deny")
def _subnet_to_ip_mask(subnet: str) -> str:
"""Convert CIDR notation to FortiOS ip/mask format."""
import ipaddress
network = ipaddress.IPv4Network(subnet, strict=False)
# FortiOS uses first usable IP as the interface IP.
first_host = str(next(network.hosts()))
return f"{first_host} {network.netmask}"