Implements ConnectorPlugin for Intune Graph API operations. Governed invocation: every Intune call requires an active AC and emits a Chronicle CONNECTOR_INVOKED event. Operations: list, get, compliance check, sync, lock, retire, wipe. In-memory compliance cache with configurable TTL. Conditional registration via intune_enabled setting. Signed-off-by: Tyler King <tking@guildhouse.dev>
39 lines
1.2 KiB
Python
39 lines
1.2 KiB
Python
# Copyright 2026 Guildhouse Dev
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""In-memory cache of Intune device compliance state."""
|
|
|
|
import time
|
|
from typing import Optional
|
|
|
|
from gsap_broker.models.intune import ComplianceState
|
|
|
|
|
|
class DeviceComplianceCache:
|
|
"""In-memory cache with TTL for device compliance state."""
|
|
|
|
def __init__(self, ttl_seconds: int = 300):
|
|
self.ttl = ttl_seconds
|
|
self._store: dict[str, tuple[ComplianceState, float]] = {}
|
|
|
|
async def get(self, device_id: str) -> Optional[ComplianceState]:
|
|
"""Get cached compliance state, or None if expired/missing."""
|
|
entry = self._store.get(device_id)
|
|
if entry is None:
|
|
return None
|
|
state, stored_at = entry
|
|
if (time.time() - stored_at) > self.ttl:
|
|
del self._store[device_id]
|
|
return None
|
|
return state
|
|
|
|
async def set(self, device_id: str, state: ComplianceState) -> None:
|
|
"""Cache a compliance state."""
|
|
self._store[device_id] = (state, time.time())
|
|
|
|
async def invalidate(self, device_id: str) -> None:
|
|
"""Remove a device from cache."""
|
|
self._store.pop(device_id, None)
|
|
|
|
def size(self) -> int:
|
|
return len(self._store)
|