# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Intune device management connector — governed Graph API invocation. Implements ConnectorPlugin for Intune Graph API operations. Every invocation requires an active AC and emits a Chronicle CONNECTOR_INVOKED event via the ConnectorRuntime. """ from __future__ import annotations import logging from datetime import datetime, UTC from typing import Any from gsap_broker.connectors.base import ConnectorContext, ConnectorPlugin, ConnectorResult from gsap_broker.intune.device_cache import DeviceComplianceCache from gsap_broker.intune.graph_client import GraphClient from gsap_broker.models.intune import ComplianceState, DeviceSummary logger = logging.getLogger(__name__) _GRAPH_DEVICES = "/deviceManagement/managedDevices" class IntuneConnector(ConnectorPlugin): connector_id = "intune" corpus_entry_cid = "sha256:intune-connector-v1" capability_mask = 0x7 # READ | PROPOSE | MUTATE declared_endpoints = [ "graph.microsoft.com/v1.0/deviceManagement/managedDevices", ] accord_template = "device-management" gsap_required = True chronicle_enabled = True def __init__(self, graph_client: GraphClient, cache: DeviceComplianceCache | None = None): self.graph = graph_client self.cache = cache or DeviceComplianceCache() async def invoke( self, operation: str, parameters: dict[str, Any], context: ConnectorContext ) -> ConnectorResult: """Route to the appropriate Graph API call.""" handlers = { "list_devices": self._list_devices, "get_device": self._get_device, "get_compliance": self._get_compliance, "sync_device": self._sync_device, "remote_lock": self._remote_lock, "retire_device": self._retire_device, "wipe_device": self._wipe_device, } handler = handlers.get(operation) if handler is None: return ConnectorResult( success=False, error=f"Unknown operation: {operation}" ) try: return await handler(parameters, context) except Exception as e: logger.error("Intune connector error: %s %s", operation, e) return ConnectorResult(success=False, error=str(e)) def health_check(self) -> bool: # Synchronous check — can't call async Graph API here. # Return True if graph client is configured. return bool(self.graph.tenant_id and self.graph.client_id) # ── READ operations ────────────────────────────────────────── async def _list_devices( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: top = params.get("top", 50) select = params.get("select", "id,deviceName,operatingSystem,osVersion,complianceState,lastSyncDateTime,userPrincipalName,azureADDeviceId") data = await self.graph.get(_GRAPH_DEVICES, params={"$top": top, "$select": select}) devices = [ DeviceSummary( device_id=d["id"], device_name=d.get("deviceName", ""), os_type=d.get("operatingSystem", ""), os_version=d.get("osVersion", ""), compliance_state=d.get("complianceState", "unknown"), last_sync=d.get("lastSyncDateTime"), user_principal_name=d.get("userPrincipalName"), entra_device_id=d.get("azureADDeviceId"), ).model_dump(mode="json") for d in data.get("value", []) ] return ConnectorResult(success=True, data=devices) async def _get_device( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") data = await self.graph.get(f"{_GRAPH_DEVICES}/{device_id}") return ConnectorResult(success=True, data=data) async def _get_compliance( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") # Check cache first cached = await self.cache.get(device_id) if cached is not None: return ConnectorResult(success=True, data=cached.model_dump(mode="json")) # Fetch from Graph API data = await self.graph.get( f"{_GRAPH_DEVICES}/{device_id}", params={"$select": "id,complianceState,lastSyncDateTime,complianceGracePeriodExpirationDateTime"}, ) raw_state = data.get("complianceState", "unknown") state = ComplianceState( device_id=device_id, compliant=raw_state == "compliant", state=raw_state, last_evaluated=datetime.now(UTC), ) await self.cache.set(device_id, state) return ConnectorResult(success=True, data=state.model_dump(mode="json")) # ── PROPOSE operations ─────────────────────────────────────── async def _sync_device( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/syncDevice") if resp.status_code in (200, 204): await self.cache.invalidate(device_id) return ConnectorResult(success=True, data={"synced": True}) return ConnectorResult(success=False, error=f"Sync failed: HTTP {resp.status_code}") # ── MUTATE operations ──────────────────────────────────────── async def _remote_lock( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/remoteLock") if resp.status_code in (200, 204): return ConnectorResult(success=True, data={"locked": True}) return ConnectorResult(success=False, error=f"Lock failed: HTTP {resp.status_code}") async def _retire_device( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/retire") if resp.status_code in (200, 204): return ConnectorResult(success=True, data={"retired": True}) return ConnectorResult(success=False, error=f"Retire failed: HTTP {resp.status_code}") async def _wipe_device( self, params: dict[str, Any], ctx: ConnectorContext ) -> ConnectorResult: device_id = params.get("device_id", "") if not device_id: return ConnectorResult(success=False, error="device_id required") resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/wipe") if resp.status_code in (200, 204): return ConnectorResult(success=True, data={"wiped": True}) return ConnectorResult(success=False, error=f"Wipe failed: HTTP {resp.status_code}")