"""Keycloak Admin API client โ€” ephemeral agent client registration. Registers and deletes confidential Keycloak clients for AI agent delegations per GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 ยง4.1. """ import logging from typing import Optional import httpx logger = logging.getLogger(__name__) class KeycloakAdmin: def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str): self.base_url = base_url.rstrip("/") self.realm = realm self.client_id = client_id self.client_secret = client_secret self._token: Optional[str] = None async def _get_admin_token(self) -> str: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token", data={ "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, }, ) resp.raise_for_status() self._token = resp.json()["access_token"] return self._token async def _headers(self) -> dict: if not self._token: await self._get_admin_token() return {"Authorization": f"Bearer {self._token}"} async def register_agent_client( self, client_id: str, display_name: str, delegator_did: str, delegation_id: str, agent_type: str, ) -> dict: """Register ephemeral Keycloak client for an AI agent.""" if not self.client_secret: logger.info("Keycloak not configured โ€” dev mode stub for %s", client_id) return {"client_id": client_id, "client_secret": f"dev-secret-{delegation_id}", "client_uuid": None} headers = await self._headers() client_rep = { "clientId": client_id, "name": display_name, "enabled": True, "serviceAccountsEnabled": True, "directAccessGrantsEnabled": False, "publicClient": False, "protocol": "openid-connect", "attributes": { "agent_type": agent_type, "delegator_did": delegator_did, "delegation_id": delegation_id, }, } async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.post( f"{self.base_url}/admin/realms/{self.realm}/clients", json=client_rep, headers=headers, ) if resp.status_code == 401: headers = {"Authorization": f"Bearer {await self._get_admin_token()}"} resp = await http.post( f"{self.base_url}/admin/realms/{self.realm}/clients", json=client_rep, headers=headers, ) resp.raise_for_status() # Retrieve the generated client secret location = resp.headers.get("Location", "") client_uuid = location.rstrip("/").split("/")[-1] if location else None client_secret = "" if client_uuid: secret_resp = await http.get( f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}/client-secret", headers=headers, ) if secret_resp.status_code == 200: client_secret = secret_resp.json().get("value", "") logger.info("Registered agent client: %s (uuid=%s)", client_id, client_uuid) return { "client_id": client_id, "client_secret": client_secret, "client_uuid": client_uuid, } async def delete_agent_client(self, client_id: str) -> bool: """Delete ephemeral agent client on revocation/expiry.""" if not self.client_secret: logger.info("Keycloak not configured โ€” dev mode stub delete for %s", client_id) return True headers = await self._headers() async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.get( f"{self.base_url}/admin/realms/{self.realm}/clients", params={"clientId": client_id}, headers=headers, ) if resp.status_code != 200: return False clients = resp.json() if not clients: return False client_uuid = clients[0]["id"] del_resp = await http.delete( f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}", headers=headers, ) deleted = del_resp.status_code in (200, 204) if deleted: logger.info("Deleted agent client: %s", client_id) return deleted