"""Entra Agent ID registrar — registers agent identities via Microsoft Graph. Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.2. Uses standard Graph application registration with agent metadata tags. When Entra Agent ID Blueprint APIs reach GA, this driver should be updated to use the dedicated /agentIdentityBlueprints and /agentIdentities endpoints. """ import logging from typing import Optional import httpx import msal from .registrar import AgentCredentials logger = logging.getLogger(__name__) GRAPH_API = "https://graph.microsoft.com/v1.0" class EntraRegistrar: """AgentRegistrar implementation using Microsoft Entra + Graph API.""" def __init__( self, tenant_id: str, client_id: str, client_secret: str, agent_blueprint_id: str = "", ): self.tenant_id = tenant_id self.client_id = client_id self.client_secret = client_secret self.agent_blueprint_id = agent_blueprint_id self._app = msal.ConfidentialClientApplication( client_id=self.client_id, client_credential=self.client_secret, authority=f"https://login.microsoftonline.com/{self.tenant_id}", ) async def _get_token(self) -> str: result = self._app.acquire_token_for_client( scopes=["https://graph.microsoft.com/.default"] ) if "access_token" in result: return result["access_token"] raise RuntimeError( f"Entra token error: {result.get('error_description', result.get('error', 'unknown'))}" ) async def _headers(self) -> dict: token = await self._get_token() return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} async def register_agent( self, delegation_id: str, agent_type: str, delegator_id: str, display_name: str, expires_at: str, metadata: dict | None = None, ) -> AgentCredentials: headers = await self._headers() tags = [ f"agent_type:{agent_type}", f"delegation_id:{delegation_id}", f"delegator:{delegator_id}", "governed:true", "HideApp", ] if self.agent_blueprint_id: tags.append(f"blueprint:{self.agent_blueprint_id}") app_body = { "displayName": display_name, "signInAudience": "AzureADMyOrg", "tags": tags, "notes": f"Governed AI agent. Delegator: {delegator_id}. Expires: {expires_at}", "passwordCredentials": [], } async with httpx.AsyncClient(timeout=15.0) as http: resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers) if resp.status_code == 401: headers = await self._headers() resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers) resp.raise_for_status() app_data = resp.json() app_id = app_data["appId"] object_id = app_data["id"] # Add client secret with TTL matching delegation expiry secret_resp = await http.post( f"{GRAPH_API}/applications/{object_id}/addPassword", json={ "passwordCredential": { "displayName": f"delegation-{delegation_id}", "endDateTime": expires_at, } }, headers=headers, ) secret_resp.raise_for_status() client_secret = secret_resp.json().get("secretText", "") # Create service principal sp_resp = await http.post( f"{GRAPH_API}/servicePrincipals", json={"appId": app_id, "displayName": display_name, "tags": tags}, headers=headers, ) if sp_resp.status_code not in (200, 201, 409): sp_resp.raise_for_status() logger.info("Entra: registered agent %s (appId=%s)", display_name, app_id) return AgentCredentials( client_id=app_id, client_secret=client_secret, agent_display_name=display_name, idp_backend="entra", ) async def delete_agent(self, client_id: str) -> bool: headers = await self._headers() async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.get( f"{GRAPH_API}/applications", params={"$filter": f"appId eq '{client_id}'"}, headers=headers, ) resp.raise_for_status() apps = resp.json().get("value", []) if not apps: return False object_id = apps[0]["id"] del_resp = await http.delete( f"{GRAPH_API}/applications/{object_id}", headers=headers, ) deleted = del_resp.status_code in (200, 204) if deleted: logger.info("Entra: deleted agent app %s", client_id) return deleted async def get_agent_token(self, client_id: str) -> str | None: return None