fastapi-gsap/tests/test_ansible_collection.py
Tyler J King 85afbd8d61 feat(ansible): add guildhouse.bastion Ansible Galaxy collection
Dynamic inventory plugin — queries Bastion for managed devices,
groups by OS and compliance state, bastion_* host vars, zero
credentials in inventory.

Credential lookup plugin — resolves short-lived credentials from
Bastion's CredentialResolver at execution time. Graceful
degradation when broker unavailable.

Chronicle callback plugin — reports playbook lifecycle events
(started, task completed, completed) to Chronicle. Optionally
triggers compliance re-evaluation after playbook completion.

Shared BastionClient for all plugins using stdlib urllib.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 11:23:03 -04:00

168 lines
6.1 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Tests for the Ansible collection plugins — inventory, credential, callback."""
import sys
import os
import pytest
# Add the collection to the path so plugins are importable
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "ansible_collection"))
from guildhouse.bastion.plugins.inventory.bastion import build_inventory, _CREDENTIAL_FIELDS
from guildhouse.bastion.plugins.lookup.credential import resolve_credential, _AUTO_TYPE_MAP
from guildhouse.bastion.plugins.callback.bastion_chronicle import ChronicleCallback
# ── Inventory plugin ─────────────────────────────────────────────
MOCK_DEVICES = [
{
"device_id": "00000000-0000-0000-0000-000000000001",
"device_name": "LAPTOP-WIN-01",
"os_type": "Windows",
"os_version": "10.0.19045",
"compliance_state": "compliant",
"last_sync": "2026-04-14T00:00:00Z",
"user_principal_name": "alice@contoso.com",
"entra_device_id": "entra-001",
},
{
"device_id": "00000000-0000-0000-0000-000000000002",
"device_name": "SRV-LINUX-01",
"os_type": "Linux",
"os_version": "6.6",
"compliance_state": "noncompliant",
},
{
"device_id": "00000000-0000-0000-0000-000000000003",
"device_name": "MAC-01",
"os_type": "macOS",
"os_version": "14.0",
"compliance_state": "compliant",
},
]
def test_inventory_groups_by_os():
"""TEST 12: Devices grouped by OS correctly."""
inv = build_inventory(MOCK_DEVICES)
assert "laptop-win-01" in inv["os_windows"]["hosts"]
assert "srv-linux-01" in inv["os_linux"]["hosts"]
assert "mac-01" in inv["os_macos"]["hosts"]
def test_inventory_groups_by_compliance():
"""TEST 12: Devices grouped by compliance state."""
inv = build_inventory(MOCK_DEVICES)
assert "laptop-win-01" in inv["compliance_compliant"]["hosts"]
assert "mac-01" in inv["compliance_compliant"]["hosts"]
assert "srv-linux-01" in inv["compliance_noncompliant"]["hosts"]
def test_inventory_host_vars_have_bastion_prefix():
"""TEST 12: Host variables prefixed with bastion_."""
inv = build_inventory(MOCK_DEVICES)
hostvars = inv["_meta"]["hostvars"]["laptop-win-01"]
assert hostvars["bastion_device_id"] == "00000000-0000-0000-0000-000000000001"
assert hostvars["bastion_compliance_state"] == "compliant"
assert hostvars["bastion_os_type"] == "windows"
assert hostvars["bastion_management_channel"] == "intune"
def test_inventory_no_credentials_in_host_vars():
"""TEST 13: No credential variables in host vars."""
inv = build_inventory(MOCK_DEVICES)
for hostname, hostvars in inv["_meta"]["hostvars"].items():
for field in _CREDENTIAL_FIELDS:
assert field not in hostvars, f"Credential field '{field}' found in {hostname} host vars"
def test_inventory_empty_device_list():
"""Empty device list produces valid empty inventory."""
inv = build_inventory([])
assert inv["all"]["hosts"] == []
assert inv["_meta"]["hostvars"] == {}
# ── Credential lookup plugin ─────────────────────────────────────
def test_credential_auto_detect_windows():
"""Auto-detect type from bastion_os_type."""
cred_type = _AUTO_TYPE_MAP.get("windows")
assert cred_type == "kerberos"
def test_credential_auto_detect_linux():
cred_type = _AUTO_TYPE_MAP.get("linux")
assert cred_type == "ssh_cert"
def test_credential_graceful_degradation():
"""TEST 14: Returns None when broker unavailable."""
result = resolve_credential("host-1", credential_type="kerberos")
assert result is None # Broker not running — graceful degradation
# ── Chronicle callback plugin ────────────────────────────────────
def test_callback_playbook_started():
"""TEST 15: PLAYBOOK_STARTED event emitted."""
cb = ChronicleCallback()
cb.on_playbook_start("test-playbook.yml", ["host-1", "host-2"])
assert len(cb._events) == 1
assert cb._events[0]["kind"] == "ANSIBLE_PLAYBOOK_STARTED"
assert cb._events[0]["playbook"] == "test-playbook.yml"
def test_callback_task_completed():
"""TEST 15: TASK_COMPLETED event emitted per host."""
cb = ChronicleCallback()
cb.on_task_completed("host-1", "Ping", "ok", changed=False)
cb.on_task_completed("host-2", "Ping", "failed", changed=False)
assert len(cb._events) == 2
assert cb._events[0]["kind"] == "ANSIBLE_TASK_COMPLETED"
assert cb._events[1]["status"] == "failed"
def test_callback_playbook_completed():
"""TEST 15: PLAYBOOK_COMPLETED event with stats."""
cb = ChronicleCallback()
cb.on_task_completed("host-1", "Ping", "ok")
cb.on_playbook_completed({"ok": 1, "failed": 0, "changed": 0})
events = [e for e in cb._events if e["kind"] == "ANSIBLE_PLAYBOOK_COMPLETED"]
assert len(events) == 1
assert "host-1" in events[0]["affected_hosts"]
def test_callback_full_lifecycle():
"""TEST 15: Full playbook lifecycle audited with 3 events."""
cb = ChronicleCallback()
cb.on_playbook_start("site.yml", ["host-1"])
cb.on_task_completed("host-1", "Install packages", "changed", changed=True)
cb.on_playbook_completed({"ok": 0, "changed": 1, "failed": 0})
kinds = [e["kind"] for e in cb._events]
assert kinds == [
"ANSIBLE_PLAYBOOK_STARTED",
"ANSIBLE_TASK_COMPLETED",
"ANSIBLE_PLAYBOOK_COMPLETED",
]
def test_callback_compliance_recheck_triggered(monkeypatch):
"""TEST 16: Compliance re-eval triggered when configured."""
monkeypatch.setenv("BASTION_RECHECK_COMPLIANCE", "true")
cb = ChronicleCallback()
cb._recheck = True # env already set but constructor ran before monkeypatch
cb._affected_hosts = {"host-1", "host-2"}
# Without a real client, _trigger_compliance_recheck is a no-op
# but should not raise
cb.on_playbook_completed({"ok": 2})
assert len(cb._events) == 1 # PLAYBOOK_COMPLETED emitted