"""Keycloak registrar — registers ephemeral agent clients via Admin REST API. Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1. """ import logging from typing import Optional import httpx from .base import AgentCredentials logger = logging.getLogger(__name__) class KeycloakRegistrar: """AgentRegistrar implementation using Keycloak Admin REST API.""" def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str): self.base_url = base_url.rstrip("/") self.realm = realm self.client_id = client_id self.client_secret = client_secret self._token: Optional[str] = None async def _get_admin_token(self) -> str: async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token", data={ "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret, }, ) resp.raise_for_status() self._token = resp.json()["access_token"] return self._token async def _headers(self) -> dict: if not self._token: await self._get_admin_token() return {"Authorization": f"Bearer {self._token}"} async def register_agent( self, delegation_id: str, agent_type: str, delegator_id: str, display_name: str, expires_at: str, metadata: dict | None = None, ) -> AgentCredentials: headers = await self._headers() kc_client_id = f"agent-{agent_type}-{delegation_id}" client_rep = { "clientId": kc_client_id, "name": display_name, "enabled": True, "serviceAccountsEnabled": True, "directAccessGrantsEnabled": False, "publicClient": False, "protocol": "openid-connect", "attributes": { "agent_type": agent_type, "delegator_did": delegator_id, "delegation_id": delegation_id, }, } async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.post( f"{self.base_url}/admin/realms/{self.realm}/clients", json=client_rep, headers=headers, ) if resp.status_code == 401: headers = {"Authorization": f"Bearer {await self._get_admin_token()}"} resp = await http.post( f"{self.base_url}/admin/realms/{self.realm}/clients", json=client_rep, headers=headers, ) resp.raise_for_status() location = resp.headers.get("Location", "") client_uuid = location.rstrip("/").split("/")[-1] if location else None secret = "" if client_uuid: secret_resp = await http.get( f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}/client-secret", headers=headers, ) if secret_resp.status_code == 200: secret = secret_resp.json().get("value", "") logger.info("Keycloak: registered agent %s (uuid=%s)", kc_client_id, client_uuid) return AgentCredentials( client_id=kc_client_id, client_secret=secret, agent_display_name=display_name, idp_backend="keycloak", ) async def delete_agent(self, client_id: str) -> bool: headers = await self._headers() async with httpx.AsyncClient(timeout=10.0) as http: resp = await http.get( f"{self.base_url}/admin/realms/{self.realm}/clients", params={"clientId": client_id}, headers=headers, ) if resp.status_code != 200: return False clients = resp.json() if not clients: return False client_uuid = clients[0]["id"] del_resp = await http.delete( f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}", headers=headers, ) deleted = del_resp.status_code in (200, 204) if deleted: logger.info("Keycloak: deleted agent %s", client_id) return deleted async def get_agent_token(self, client_id: str) -> str | None: return None