feat(mcp): add Intune device management tools

MCP tools for list_devices, get_device_compliance, sync_device,
remote_lock. All route through governed IntuneConnector
invocation with Chronicle audit.

Signed-off-by: Tyler King <tking@guildhouse.dev>
This commit is contained in:
Tyler J King 2026-04-14 05:25:08 -04:00
parent 03a99b4aff
commit e24a87db6f
2 changed files with 121 additions and 0 deletions

View file

@ -158,6 +158,49 @@ TOOLS = [
"description": "Get current session details: principal, AC scope, delegation, DEFCON level.", "description": "Get current session details: principal, AC scope, delegation, DEFCON level.",
"inputSchema": {"type": "object", "properties": {}}, "inputSchema": {"type": "object", "properties": {}},
}, },
{
"name": "list_devices",
"description": "List managed devices from Intune. Requires Intune connector enabled.",
"inputSchema": {
"type": "object",
"properties": {
"top": {"type": "integer", "description": "Max devices to return (default: 50)"},
},
},
},
{
"name": "get_device_compliance",
"description": "Check compliance state of a specific device via Intune.",
"inputSchema": {
"type": "object",
"properties": {
"device_id": {"type": "string", "description": "Intune managed device ID"},
},
"required": ["device_id"],
},
},
{
"name": "sync_device",
"description": "Trigger Intune sync for a device. Requires PROPOSE capability.",
"inputSchema": {
"type": "object",
"properties": {
"device_id": {"type": "string", "description": "Intune managed device ID"},
},
"required": ["device_id"],
},
},
{
"name": "remote_lock",
"description": "Remote lock a managed device. Requires MUTATE capability. May require ceremony approval in production Accords.",
"inputSchema": {
"type": "object",
"properties": {
"device_id": {"type": "string", "description": "Intune managed device ID"},
},
"required": ["device_id"],
},
},
] ]
@ -411,6 +454,32 @@ async def _handle_session_info(request: Request) -> dict:
} }
async def _handle_intune_tool(tool_name: str, args: dict) -> dict:
"""Route Intune MCP tools through the governed IntuneConnector."""
from gsap_broker.routers.connectors import _registry
from gsap_broker.connectors.base import ConnectorContext
intune = _registry.get("intune")
if intune is None:
return {"error": "Intune connector not enabled. Set intune_enabled=True."}
op_map = {
"list_devices": "list_devices",
"get_device_compliance": "get_compliance",
"sync_device": "sync_device",
"remote_lock": "remote_lock",
}
operation = op_map.get(tool_name)
if not operation:
return {"error": f"Unknown Intune tool: {tool_name}"}
ctx = ConnectorContext(gsap_context_id=args.get("ac_id", "mcp-session"))
result = await intune.invoke(operation, args, ctx)
if result.success:
return {"data": result.data, "lineage_cid": result.lineage_cid}
return {"error": result.error}
# ── Tool Dispatch ──────────────────────────────────────────────── # ── Tool Dispatch ────────────────────────────────────────────────
@ -427,6 +496,10 @@ async def _dispatch_tool(request: Request, tool_name: str, arguments: dict) -> d
"get_posture": lambda: _handle_get_posture(arguments), "get_posture": lambda: _handle_get_posture(arguments),
"check_operation": lambda: _handle_check_operation(arguments, request), "check_operation": lambda: _handle_check_operation(arguments, request),
"session_info": lambda: _handle_session_info(request), "session_info": lambda: _handle_session_info(request),
"list_devices": lambda: _handle_intune_tool(tool_name, arguments),
"get_device_compliance": lambda: _handle_intune_tool(tool_name, arguments),
"sync_device": lambda: _handle_intune_tool(tool_name, arguments),
"remote_lock": lambda: _handle_intune_tool(tool_name, arguments),
} }
handler = handlers.get(tool_name) handler = handlers.get(tool_name)

48
tests/test_mcp_intune.py Normal file
View file

@ -0,0 +1,48 @@
# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Tests for Intune MCP tools."""
import pytest
from httpx import AsyncClient, ASGITransport
from gsap_broker.app import app
@pytest.fixture
async def client():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
@pytest.mark.asyncio
async def test_mcp_tools_list_includes_intune(client):
"""MCP tools/list should include Intune tools."""
resp = await client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/list",
"id": 1,
})
assert resp.status_code == 200
tools = resp.json()["result"]["tools"]
tool_names = [t["name"] for t in tools]
assert "list_devices" in tool_names
assert "get_device_compliance" in tool_names
assert "sync_device" in tool_names
assert "remote_lock" in tool_names
@pytest.mark.asyncio
async def test_mcp_intune_tool_without_connector(client):
"""Intune MCP tool should return error when connector not enabled."""
resp = await client.post("/mcp", json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "list_devices",
"arguments": {},
},
"id": 2,
})
assert resp.status_code == 200
content = resp.json()["result"]["content"][0]["text"]
assert "not enabled" in content.lower() or "error" in content.lower()