fastapi-gsap/gsap_broker/connectors/intune.py
Tyler J King e744336385 fix: capability enforcement, credential safety, atomic delegations, input validation
C-6: ConnectorRuntime enforces capability_mask per operation.
     READ-only ACs cannot invoke MUTATE operations (wipe, lock, retire).
C-7: AC validated against database (exists, active, not expired)
     before connector invocation.
C-9: Delegated AC capability bounded by delegator's capability.
C-10: Command counter uses atomic SQL increment with limit check.
M-23: expire_stale() uses same atomic SQL pattern.

H-1: Sensitive credential fields hidden from repr/logs via repr=False.
H-2: Stub backend requires ALLOW_STUB_CREDENTIALS=true to activate.
H-3: Kerberos backend raises CredentialResolutionError instead of
     returning stub ticket.
H-4: Chronicle INTENT emitted before execution, RESULT after.
H-5: device_id validated as UUID before Graph API URL interpolation.
H-8: ConnectorRuntime enforces governance for all connector invocations.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 08:13:27 -04:00

213 lines
8.7 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Intune device management connector — governed Graph API invocation.
Implements ConnectorPlugin for Intune Graph API operations.
Every invocation requires an active AC and emits a Chronicle
CONNECTOR_INVOKED event via the ConnectorRuntime.
"""
from __future__ import annotations
import logging
from datetime import datetime, UTC
from typing import Any
import re
from gsap_broker.connectors.base import (
CAP_MUTATE, CAP_PROPOSE, CAP_READ,
ConnectorContext, ConnectorPlugin, ConnectorResult,
)
from gsap_broker.intune.device_cache import DeviceComplianceCache
from gsap_broker.intune.graph_client import GraphClient
from gsap_broker.models.intune import ComplianceState, DeviceSummary
logger = logging.getLogger(__name__)
_GRAPH_DEVICES = "/deviceManagement/managedDevices"
# Fix H-5: device_id must be a UUID to prevent path traversal
_UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
def _validate_device_id(device_id: str) -> str:
"""Fix H-5: validate device_id as UUID before Graph API URL interpolation."""
if not device_id:
raise ValueError("device_id required")
if not _UUID_RE.match(device_id):
raise ValueError(f"Invalid device_id: must be UUID format, got '{device_id}'")
return device_id
class IntuneConnector(ConnectorPlugin):
connector_id = "intune"
corpus_entry_cid = "sha256:intune-connector-v1"
capability_mask = 0x7 # READ | PROPOSE | MUTATE
declared_endpoints = [
"graph.microsoft.com/v1.0/deviceManagement/managedDevices",
]
accord_template = "device-management"
gsap_required = True
chronicle_enabled = True
# Fix C-6: per-operation capability requirements
operation_capabilities = {
"list_devices": CAP_READ,
"get_device": CAP_READ,
"get_compliance": CAP_READ,
"sync_device": CAP_PROPOSE,
"remote_lock": CAP_MUTATE,
"retire_device": CAP_MUTATE,
"wipe_device": CAP_MUTATE,
}
def __init__(self, graph_client: GraphClient, cache: DeviceComplianceCache | None = None):
self.graph = graph_client
self.cache = cache or DeviceComplianceCache()
async def invoke(
self, operation: str, parameters: dict[str, Any], context: ConnectorContext
) -> ConnectorResult:
"""Route to the appropriate Graph API call."""
handlers = {
"list_devices": self._list_devices,
"get_device": self._get_device,
"get_compliance": self._get_compliance,
"sync_device": self._sync_device,
"remote_lock": self._remote_lock,
"retire_device": self._retire_device,
"wipe_device": self._wipe_device,
}
handler = handlers.get(operation)
if handler is None:
return ConnectorResult(
success=False, error=f"Unknown operation: {operation}"
)
try:
return await handler(parameters, context)
except Exception as e:
logger.error("Intune connector error: %s %s", operation, e)
return ConnectorResult(success=False, error=str(e))
def health_check(self) -> bool:
# Synchronous check — can't call async Graph API here.
# Return True if graph client is configured.
return bool(self.graph.tenant_id and self.graph.client_id)
# ── READ operations ──────────────────────────────────────────
async def _list_devices(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
top = params.get("top", 50)
select = params.get("select", "id,deviceName,operatingSystem,osVersion,complianceState,lastSyncDateTime,userPrincipalName,azureADDeviceId")
data = await self.graph.get(_GRAPH_DEVICES, params={"$top": top, "$select": select})
devices = [
DeviceSummary(
device_id=d["id"],
device_name=d.get("deviceName", ""),
os_type=d.get("operatingSystem", ""),
os_version=d.get("osVersion", ""),
compliance_state=d.get("complianceState", "unknown"),
last_sync=d.get("lastSyncDateTime"),
user_principal_name=d.get("userPrincipalName"),
entra_device_id=d.get("azureADDeviceId"),
).model_dump(mode="json")
for d in data.get("value", [])
]
return ConnectorResult(success=True, data=devices)
async def _get_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
data = await self.graph.get(f"{_GRAPH_DEVICES}/{device_id}")
return ConnectorResult(success=True, data=data)
async def _get_compliance(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
# Check cache first
cached = await self.cache.get(device_id)
if cached is not None:
return ConnectorResult(success=True, data=cached.model_dump(mode="json"))
# Fetch from Graph API
data = await self.graph.get(
f"{_GRAPH_DEVICES}/{device_id}",
params={"$select": "id,complianceState,lastSyncDateTime,complianceGracePeriodExpirationDateTime"},
)
raw_state = data.get("complianceState", "unknown")
state = ComplianceState(
device_id=device_id,
compliant=raw_state == "compliant",
state=raw_state,
last_evaluated=datetime.now(UTC),
)
await self.cache.set(device_id, state)
return ConnectorResult(success=True, data=state.model_dump(mode="json"))
# ── PROPOSE operations ───────────────────────────────────────
async def _sync_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/syncDevice")
if resp.status_code in (200, 204):
await self.cache.invalidate(device_id)
return ConnectorResult(success=True, data={"synced": True})
return ConnectorResult(success=False, error=f"Sync failed: HTTP {resp.status_code}")
# ── MUTATE operations ────────────────────────────────────────
async def _remote_lock(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/remoteLock")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"locked": True})
return ConnectorResult(success=False, error=f"Lock failed: HTTP {resp.status_code}")
async def _retire_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/retire")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"retired": True})
return ConnectorResult(success=False, error=f"Retire failed: HTTP {resp.status_code}")
async def _wipe_device(
self, params: dict[str, Any], ctx: ConnectorContext
) -> ConnectorResult:
try:
device_id = _validate_device_id(params.get("device_id", ""))
except ValueError as e:
return ConnectorResult(success=False, error=str(e))
resp = await self.graph.post(f"{_GRAPH_DEVICES}/{device_id}/wipe")
if resp.status_code in (200, 204):
return ConnectorResult(success=True, data={"wiped": True})
return ConnectorResult(success=False, error=f"Wipe failed: HTTP {resp.status_code}")