feat(connectors): add Intune device management connector

Implements ConnectorPlugin for Intune Graph API operations.
Governed invocation: every Intune call requires an active AC
and emits a Chronicle CONNECTOR_INVOKED event.
Operations: list, get, compliance check, sync, lock, retire, wipe.
In-memory compliance cache with configurable TTL.
Conditional registration via intune_enabled setting.

Signed-off-by: Tyler King <tking@guildhouse.dev>
This commit is contained in:
Tyler J King 2026-04-14 05:21:47 -04:00
parent 8196396ce6
commit 871541f0eb
6 changed files with 490 additions and 0 deletions

View file

@ -0,0 +1,177 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Intune device management connector — governed Graph API invocation.
Implements ConnectorPlugin for Intune Graph API operations.
Every invocation requires an active AC and emits a Chronicle
CONNECTOR_INVOKED event via the ConnectorRuntime.
"""
from __future__ import annotations
import logging
from datetime import datetime, UTC
from typing import Any
from gsap_broker.connectors.base import ConnectorContext, ConnectorPlugin, ConnectorResult
from gsap_broker.intune.device_cache import DeviceComplianceCache
from gsap_broker.intune.graph_client import GraphClient
from gsap_broker.models.intune import ComplianceState, DeviceSummary
logger = logging.getLogger(__name__)
_GRAPH_DEVICES = "/deviceManagement/managedDevices"
class IntuneConnector(ConnectorPlugin):
connector_id = "intune"
corpus_entry_cid = "sha256:intune-connector-v1"
capability_mask = 0x7 # READ | PROPOSE | MUTATE
declared_endpoints = [
"graph.microsoft.com/v1.0/deviceManagement/managedDevices",
]
accord_template = "device-management"
gsap_required = True
chronicle_enabled = True
def __init__(self, graph_client: GraphClient, cache: DeviceComplianceCache | None = None):
self.graph = graph_client
self.cache = cache or DeviceComplianceCache()
async def invoke(
self, operation: str, parameters: dict[str, Any], context: ConnectorContext
) -> ConnectorResult:
"""Route to the appropriate Graph API call."""
handlers = {
"list_devices": self._list_devices,
"get_device": self._get_device,
"get_compliance": self._get_compliance,
"sync_device": self._sync_device,
"remote_lock": self._remote_lock,
"retire_device": self._retire_device,
"wipe_device": self._wipe_device,
}
handler = handlers.get(operation)
if handler is None:
return ConnectorResult(
success=False, error=f"Unknown operation: {operation}"
)
try:
return await handler(parameters, context)
except Exception as e:
logger.error("Intune connector error: %s %s", operation, e)
return ConnectorResult(success=False, error=str(e))
def health_check(self) -> bool:
# Synchronous check — can't call async Graph API here.
# Return True if graph client is configured.
return bool(self.graph.tenant_id and self.graph.client_id)
# ── READ operations ──────────────────────────────────────────
async def _list_devices(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
top = params.get("top", 50)
select = params.get("select", "id,deviceName,operatingSystem,osVersion,complianceState,lastSyncDateTime,userPrincipalName,azureADDeviceId")
data = await self.graph.get(_GRAPH_DEVICES, params={"$top": top, "$select": select})
devices = [
DeviceSummary(
device_id=d["id"],
device_name=d.get("deviceName", ""),
os_type=d.get("operatingSystem", ""),
os_version=d.get("osVersion", ""),
compliance_state=d.get("complianceState", "unknown"),
last_sync=d.get("lastSyncDateTime"),
user_principal_name=d.get("userPrincipalName"),
entra_device_id=d.get("azureADDeviceId"),
).model_dump(mode="json")
for d in data.get("value", [])
]
return ConnectorResult(success=True, data=devices)
async def _get_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
data = await self.graph.get(f"{_GRAPH_DEVICES}/{device_id}")
return ConnectorResult(success=True, data=data)
async def _get_compliance(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
# Check cache first
cached = await self.cache.get(device_id)
if cached is not None:
return ConnectorResult(success=True, data=cached.model_dump(mode="json"))
# Fetch from Graph API
data = await self.graph.get(
f"{_GRAPH_DEVICES}/{device_id}",
params={"$select": "id,complianceState,lastSyncDateTime,complianceGracePeriodExpirationDateTime"},
)
raw_state = data.get("complianceState", "unknown")
state = ComplianceState(
device_id=device_id,
compliant=raw_state == "compliant",
state=raw_state,
last_evaluated=datetime.now(UTC),
)
await self.cache.set(device_id, state)
return ConnectorResult(success=True, data=state.model_dump(mode="json"))
# ── PROPOSE operations ───────────────────────────────────────
async def _sync_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/syncDevice")
if resp.status_code in (200, 204):
await self.cache.invalidate(device_id)
return ConnectorResult(success=True, data={"synced": True})
return ConnectorResult(success=False, error=f"Sync failed: HTTP {resp.status_code}")
# ── MUTATE operations ────────────────────────────────────────
async def _remote_lock(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/remoteLock")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"locked": True})
return ConnectorResult(success=False, error=f"Lock failed: HTTP {resp.status_code}")
async def _retire_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/retire")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"retired": True})
return ConnectorResult(success=False, error=f"Retire failed: HTTP {resp.status_code}")
async def _wipe_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
device_id = params.get("device_id", "")
if not device_id:
return ConnectorResult(success=False, error="device_id required")
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/wipe")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"wiped": True})
return ConnectorResult(success=False, error=f"Wipe failed: HTTP {resp.status_code}")

View file

@ -0,0 +1,39 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""In-memory cache of Intune device compliance state."""
import time
from typing import Optional
from gsap_broker.models.intune import ComplianceState
class DeviceComplianceCache:
"""In-memory cache with TTL for device compliance state."""
def __init__(self, ttl_seconds: int = 300):
self.ttl = ttl_seconds
self._store: dict[str, tuple[ComplianceState, float]] = {}
async def get(self, device_id: str) -> Optional[ComplianceState]:
"""Get cached compliance state, or None if expired/missing."""
entry = self._store.get(device_id)
if entry is None:
return None
state, stored_at = entry
if (time.time() - stored_at) > self.ttl:
del self._store[device_id]
return None
return state
async def set(self, device_id: str, state: ComplianceState) -> None:
"""Cache a compliance state."""
self._store[device_id] = (state, time.time())
async def invalidate(self, device_id: str) -> None:
"""Remove a device from cache."""
self._store.pop(device_id, None)
def size(self) -> int:
return len(self._store)

View file

@ -0,0 +1,28 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Pydantic models for Intune device management."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class DeviceSummary(BaseModel):
device_id: str
device_name: str = ""
os_type: str = "" # windows, linux, macOS, android, iOS
os_version: str = ""
compliance_state: str = "" # compliant, noncompliant, unknown, configManager, ...
last_sync: Optional[datetime] = None
user_principal_name: Optional[str] = None
entra_device_id: Optional[str] = None
class ComplianceState(BaseModel):
device_id: str
compliant: bool
state: str = "" # compliant, noncompliant, configManager, ...
detail: Optional[str] = None
last_evaluated: datetime

View file

@ -10,6 +10,7 @@ from gsap_broker.connectors.base import ConnectorContext
from gsap_broker.connectors.registry import ConnectorRegistry from gsap_broker.connectors.registry import ConnectorRegistry
from gsap_broker.connectors.runtime import ConnectorRuntime from gsap_broker.connectors.runtime import ConnectorRuntime
from gsap_broker.connectors.examples.echo_connector import EchoConnector from gsap_broker.connectors.examples.echo_connector import EchoConnector
from gsap_broker.settings import settings
router = APIRouter() router = APIRouter()
@ -20,6 +21,21 @@ _runtime = ConnectorRuntime(registry=_registry)
# Register built-in connectors # Register built-in connectors
_registry.register(EchoConnector()) _registry.register(EchoConnector())
# Conditionally register Intune connector
if settings.intune_enabled and settings.entra_client_secret:
from gsap_broker.intune.graph_client import GraphClient
from gsap_broker.intune.device_cache import DeviceComplianceCache
from gsap_broker.connectors.intune import IntuneConnector
_intune_graph = GraphClient(
tenant_id=settings.entra_tenant_id,
client_id=settings.entra_client_id,
client_secret=settings.entra_client_secret,
)
_intune_cache = DeviceComplianceCache(ttl_seconds=settings.intune_compliance_cache_ttl)
_intune_connector = IntuneConnector(graph_client=_intune_graph, cache=_intune_cache)
_registry.register(_intune_connector)
class InvokeRequest(BaseModel): class InvokeRequest(BaseModel):
operation: str operation: str

View file

@ -35,6 +35,12 @@ class Settings(BaseSettings):
entra_client_secret: str = "" entra_client_secret: str = ""
entra_agent_blueprint_id: str = "" entra_agent_blueprint_id: str = ""
# ── Intune / Device Management ──
intune_enabled: bool = False
intune_compliance_required: bool = False # global default for accord templates
intune_compliance_strict: bool = False # reject if no device_id present
intune_compliance_cache_ttl: int = 300 # seconds
# Delegation defaults # Delegation defaults
default_delegation_ttl_minutes: int = 60 default_delegation_ttl_minutes: int = 60
default_max_commands: int = 500 default_max_commands: int = 500

224
tests/test_intune.py Normal file
View file

@ -0,0 +1,224 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Tests for Intune connector and device compliance cache."""
import time
import pytest
from datetime import datetime, UTC
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from gsap_broker.connectors.base import ConnectorContext
from gsap_broker.connectors.intune import IntuneConnector
from gsap_broker.intune.device_cache import DeviceComplianceCache
from gsap_broker.intune.graph_client import GraphClient
from gsap_broker.models.intune import ComplianceState
@pytest.fixture
def mock_graph():
graph = MagicMock(spec=GraphClient)
graph.tenant_id = "test-tenant"
graph.client_id = "test-client"
graph.get = AsyncMock()
graph.post = AsyncMock()
return graph
@pytest.fixture
def cache():
return DeviceComplianceCache(ttl_seconds=5)
@pytest.fixture
def connector(mock_graph, cache):
return IntuneConnector(graph_client=mock_graph, cache=cache)
@pytest.fixture
def ctx():
return ConnectorContext(gsap_context_id="test-ac-123")
# ── list_devices ──────────────────────────────────────────────
@pytest.mark.asyncio
async def test_list_devices(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"value": [
{
"id": "dev-1",
"deviceName": "LAPTOP-001",
"operatingSystem": "Windows",
"osVersion": "10.0.19045",
"complianceState": "compliant",
"lastSyncDateTime": "2026-04-14T00:00:00Z",
"userPrincipalName": "alice@contoso.com",
"azureADDeviceId": "entra-dev-1",
},
{
"id": "dev-2",
"deviceName": "PHONE-001",
"operatingSystem": "iOS",
"osVersion": "17.0",
"complianceState": "noncompliant",
},
]
}
result = await connector.invoke("list_devices", {"top": 10}, ctx)
assert result.success
assert len(result.data) == 2
assert result.data[0]["device_id"] == "dev-1"
assert result.data[0]["compliance_state"] == "compliant"
assert result.data[1]["compliance_state"] == "noncompliant"
# ── get_compliance ────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_compliance_compliant(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"id": "dev-1",
"complianceState": "compliant",
"lastSyncDateTime": "2026-04-14T00:00:00Z",
}
result = await connector.invoke("get_compliance", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["compliant"] is True
assert result.data["state"] == "compliant"
@pytest.mark.asyncio
async def test_get_compliance_noncompliant(connector, mock_graph, ctx):
mock_graph.get.return_value = {
"id": "dev-1",
"complianceState": "noncompliant",
}
result = await connector.invoke("get_compliance", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["compliant"] is False
assert result.data["state"] == "noncompliant"
@pytest.mark.asyncio
async def test_get_compliance_uses_cache(connector, mock_graph, cache, ctx):
# Pre-populate cache
state = ComplianceState(
device_id="dev-cached", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-cached", state)
result = await connector.invoke("get_compliance", {"device_id": "dev-cached"}, ctx)
assert result.success
assert result.data["compliant"] is True
# Graph API should NOT have been called
mock_graph.get.assert_not_called()
@pytest.mark.asyncio
async def test_get_compliance_missing_device_id(connector, ctx):
result = await connector.invoke("get_compliance", {}, ctx)
assert not result.success
assert "device_id required" in result.error
# ── remote_lock ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_remote_lock(connector, mock_graph, ctx):
resp = MagicMock()
resp.status_code = 204
mock_graph.post.return_value = resp
result = await connector.invoke("remote_lock", {"device_id": "dev-1"}, ctx)
assert result.success
assert result.data["locked"] is True
# ── unknown operation ─────────────────────────────────────────
@pytest.mark.asyncio
async def test_unknown_operation(connector, ctx):
result = await connector.invoke("hack_device", {}, ctx)
assert not result.success
assert "Unknown operation" in result.error
# ── health_check ──────────────────────────────────────────────
def test_health_check(connector):
assert connector.health_check() is True
def test_health_check_unconfigured():
graph = MagicMock(spec=GraphClient)
graph.tenant_id = ""
graph.client_id = ""
conn = IntuneConnector(graph_client=graph)
assert conn.health_check() is False
# ── DeviceComplianceCache ─────────────────────────────────────
@pytest.mark.asyncio
async def test_cache_set_and_get(cache):
state = ComplianceState(
device_id="dev-1", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-1", state)
result = await cache.get("dev-1")
assert result is not None
assert result.compliant is True
@pytest.mark.asyncio
async def test_cache_ttl_expiry():
cache = DeviceComplianceCache(ttl_seconds=0)
state = ComplianceState(
device_id="dev-1", compliant=True, state="compliant",
last_evaluated=datetime.now(UTC),
)
await cache.set("dev-1", state)
# TTL is 0, so it should be expired immediately
time.sleep(0.01)
result = await cache.get("dev-1")
assert result is None
@pytest.mark.asyncio
async def test_cache_miss():
cache = DeviceComplianceCache()
result = await cache.get("nonexistent")
assert result is None
# ── Connector catalog conditional registration ────────────────
@pytest.mark.asyncio
async def test_intune_not_in_catalog_when_disabled(client):
"""Intune connector should NOT appear when intune_enabled=False (default)."""
resp = await client.get("/connectors/")
assert resp.status_code == 200
ids = [c["connector_id"] for c in resp.json()]
assert "intune" not in ids
@pytest.fixture
async def client():
"""Reuse the broker test client fixture pattern."""
from httpx import AsyncClient, ASGITransport
from gsap_broker.app import app
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c