fastapi-gsap/tests/test_intune.py
Tyler J King 871541f0eb feat(connectors): add Intune device management connector
Implements ConnectorPlugin for Intune Graph API operations.
Governed invocation: every Intune call requires an active AC
and emits a Chronicle CONNECTOR_INVOKED event.
Operations: list, get, compliance check, sync, lock, retire, wipe.
In-memory compliance cache with configurable TTL.
Conditional registration via intune_enabled setting.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 05:21:47 -04:00

224 lines
7 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Tests for Intune connector and device compliance cache."""
import time
import pytest
from datetime import datetime, UTC
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from gsap_broker.connectors.base import ConnectorContext
from gsap_broker.connectors.intune import IntuneConnector
from gsap_broker.intune.device_cache import DeviceComplianceCache
from gsap_broker.intune.graph_client import GraphClient
from gsap_broker.models.intune import ComplianceState
@pytest.fixture
def mock_graph():
graph = MagicMock(spec=GraphClient)
graph.tenant_id = "test-tenant"
graph.client_id = "test-client"
graph.get = AsyncMock()
graph.post = AsyncMock()
return graph
@pytest.fixture
def cache():
return DeviceComplianceCache(ttl_seconds=5)
@pytest.fixture
def connector(mock_graph, cache):
return IntuneConnector(graph_client=mock_graph, cache=cache)
@pytest.fixture
def ctx():
return ConnectorContext(gsap_context_id="test-ac-123")
# ── list_devices ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_list_devices(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"value": [
{
"id": "dev-1",
"deviceName": "LAPTOP-001",
"operatingSystem": "Windows",
"osVersion": "10.0.19045",
"complianceState": "compliant",
"lastSyncDateTime": "2026-04-14T00:00:00Z",
"userPrincipalName": "alice@contoso.com",
"azureADDeviceId": "entra-dev-1",
},
{
"id": "dev-2",
"deviceName": "PHONE-001",
"operatingSystem": "iOS",
"osVersion": "17.0",
"complianceState": "noncompliant",
},
]
}
result = await connector.invoke("list_devices", {"top": 10}, ctx)
assert result.success
assert len(result.data) == 2
assert result.data[0]["device_id"] == "dev-1"
assert result.data[0]["compliance_state"] == "compliant"
assert result.data[1]["compliance_state"] == "noncompliant"
# ── get_compliance ────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_compliance_compliant(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"id": "dev-1",
"complianceState": "compliant",
"lastSyncDateTime": "2026-04-14T00:00:00Z",
}
result = await connector.invoke("get_compliance", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["compliant"] is True
assert result.data["state"] == "compliant"
@pytest.mark.asyncio
async def test_get_compliance_noncompliant(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"id": "dev-1",
"complianceState": "noncompliant",
}
result = await connector.invoke("get_compliance", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["compliant"] is False
assert result.data["state"] == "noncompliant"
@pytest.mark.asyncio
async def test_get_compliance_uses_cache(connector, mock_graph, cache, ctx):
# Pre-populate cache
state = ComplianceState(
device_id="dev-cached", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-cached", state)
result = await connector.invoke("get_compliance", {"device_id": "dev-cached"}, ctx)
assert result.success
assert result.data["compliant"] is True
# Graph API should NOT have been called
mock_graph.get.assert_not_called()
@pytest.mark.asyncio
async def test_get_compliance_missing_device_id(connector, ctx):
result = await connector.invoke("get_compliance", {}, ctx)
assert not result.success
assert "device_id required" in result.error
# ── remote_lock ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_remote_lock(connector, mock_graph, ctx):
resp = MagicMock()
resp.status_code = 204
mock_graph.post.return_value = resp
result = await connector.invoke("remote_lock", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["locked"] is True
# ── unknown operation ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_unknown_operation(connector, ctx):
result = await connector.invoke("hack_device", {}, ctx)
assert not result.success
assert "Unknown operation" in result.error
# ── health_check ──────────────────────────────────────────────
def test_health_check(connector):
assert connector.health_check() is True
def test_health_check_unconfigured():
graph = MagicMock(spec=GraphClient)
graph.tenant_id = ""
graph.client_id = ""
conn = IntuneConnector(graph_client=graph)
assert conn.health_check() is False
# ── DeviceComplianceCache ─────────────────────────────────────
@pytest.mark.asyncio
async def test_cache_set_and_get(cache):
state = ComplianceState(
device_id="dev-1", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-1", state)
result = await cache.get("dev-1")
assert result is not None
assert result.compliant is True
@pytest.mark.asyncio
async def test_cache_ttl_expiry():
cache = DeviceComplianceCache(ttl_seconds=0)
state = ComplianceState(
device_id="dev-1", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-1", state)
# TTL is 0, so it should be expired immediately
time.sleep(0.01)
result = await cache.get("dev-1")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss():
cache = DeviceComplianceCache()
result = await cache.get("nonexistent")
assert result is None
# ── Connector catalog conditional registration ────────────────
@pytest.mark.asyncio
async def test_intune_not_in_catalog_when_disabled(client):
"""Intune connector should NOT appear when intune_enabled=False (default)."""
resp = await client.get("/connectors/")
assert resp.status_code == 200
ids = [c["connector_id"] for c in resp.json()]
assert "intune" not in ids
@pytest.fixture
async def client():
"""Reuse the broker test client fixture pattern."""
from httpx import AsyncClient, ASGITransport
from gsap_broker.app import app
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c