feat: AgentRegistrar abstraction + Entra Agent ID driver

Extracts agent identity registration behind AgentRegistrar protocol.
Three implementations:
  KeycloakRegistrar — Keycloak Admin REST API (existing, refactored)
  EntraRegistrar    — Microsoft Entra Agent ID platform (NEW)
  StubRegistrar     — dev mode without real IdP

Driver selection via AGENT_REGISTRAR env var:
  auto     — prefers Entra if configured, Keycloak fallback, stub default
  keycloak — explicit Keycloak
  entra    — explicit Entra Agent ID

Entra integration:
  Registers agent as Entra app + service principal via Graph API
  Tags with delegation metadata (agent_type, delegator, governed:true)
  Client secret TTL matches delegation expiry
  Deletes application on revocation (cascades to SP)
  Uses msal for token acquisition
  Future: native Agent ID Blueprint API when GA

Files:
  registrar.py         — AgentRegistrar protocol + AgentCredentials dataclass
  registrar_keycloak.py — refactored from keycloak.py
  registrar_entra.py   — NEW Entra Graph API driver
  registrar_stub.py    — dev mode stub
  registrar_factory.py — driver selection factory
  delegation.py        — updated to use registrar abstraction
  settings.py          — added Entra config + agent_registrar field

All 7 smoke tests pass with stub registrar.
Implements GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1-§4.3.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Tyler King 2026-04-04 16:52:45 -04:00
parent 944b3fde19
commit f3ffeb38ae
9 changed files with 355 additions and 56 deletions

View file

@ -16,6 +16,15 @@ KEYCLOAK_ADMIN_CLIENT_SECRET=
# Chronicle (optional — events posted as Forgejo push webhooks) # Chronicle (optional — events posted as Forgejo push webhooks)
CHRONICLE_WEBHOOK_URL= CHRONICLE_WEBHOOK_URL=
# Agent registrar driver: auto | keycloak | entra
AGENT_REGISTRAR=auto
# Entra Agent ID (when AGENT_REGISTRAR=entra or auto)
ENTRA_TENANT_ID=
ENTRA_CLIENT_ID=
ENTRA_CLIENT_SECRET=
ENTRA_AGENT_BLUEPRINT_ID=
# Delegation Defaults # Delegation Defaults
DEFAULT_DELEGATION_TTL_MINUTES=60 DEFAULT_DELEGATION_TTL_MINUTES=60
DEFAULT_MAX_COMMANDS=500 DEFAULT_MAX_COMMANDS=500

View file

@ -7,13 +7,13 @@ from datetime import datetime, timedelta
from llm_broker import chronicle, db from llm_broker import chronicle, db
from llm_broker.db import DelegationDB from llm_broker.db import DelegationDB
from llm_broker.gsap import GSAPClient from llm_broker.gsap import GSAPClient
from llm_broker.keycloak import KeycloakAdmin
from llm_broker.models import ( from llm_broker.models import (
AgentPrincipal, AgentPrincipal,
DelegationRequest, DelegationRequest,
DelegationResponse, DelegationResponse,
DelegationScope, DelegationScope,
) )
from llm_broker.registrar_factory import create_registrar
from llm_broker.settings import Settings from llm_broker.settings import Settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -22,12 +22,7 @@ logger = logging.getLogger(__name__)
class DelegationManager: class DelegationManager:
def __init__(self, config: Settings): def __init__(self, config: Settings):
self.config = config self.config = config
self.keycloak = KeycloakAdmin( self.registrar = create_registrar(config)
config.keycloak_url,
config.keycloak_realm,
config.keycloak_admin_client_id,
config.keycloak_admin_client_secret,
)
self.gsap = GSAPClient(config.gsap_broker_url, config.gsap_bearer_token) self.gsap = GSAPClient(config.gsap_broker_url, config.gsap_bearer_token)
async def create_delegation( async def create_delegation(
@ -39,17 +34,17 @@ class DelegationManager:
expires_at = now + timedelta(minutes=scope.max_ttl_minutes) expires_at = now + timedelta(minutes=scope.max_ttl_minutes)
agent_did = f"did:web:guildhouse.dev/agent/{request.agent_type}-{delegation_id}" agent_did = f"did:web:guildhouse.dev/agent/{request.agent_type}-{delegation_id}"
agent_client_id = f"agent-{request.agent_type}-{delegation_id}"
delegator_short = delegator_did.rsplit("/", 1)[-1] delegator_short = delegator_did.rsplit("/", 1)[-1]
agent_display = f"{request.agent_type} (delegated by {delegator_short})" agent_display = f"{request.agent_type} (delegated by {delegator_short})"
# 1. Register agent in Keycloak # 1. Register agent identity via the configured registrar
kc_result = await self.keycloak.register_agent_client( credentials = await self.registrar.register_agent(
client_id=agent_client_id,
display_name=agent_display,
delegator_did=delegator_did,
delegation_id=delegation_id, delegation_id=delegation_id,
agent_type=request.agent_type, agent_type=request.agent_type,
delegator_id=delegator_did,
display_name=agent_display,
expires_at=expires_at.isoformat(),
metadata={"model": request.agent_model},
) )
# 2. Request delegated AC from GSAP broker # 2. Request delegated AC from GSAP broker
@ -63,7 +58,7 @@ class DelegationManager:
ttl_minutes=scope.max_ttl_minutes, ttl_minutes=scope.max_ttl_minutes,
) )
except Exception as e: except Exception as e:
await self.keycloak.delete_agent_client(agent_client_id) await self.registrar.delete_agent(credentials.client_id)
raise RuntimeError(f"Failed to request delegated AC: {e}") raise RuntimeError(f"Failed to request delegated AC: {e}")
# 3. Record in Chronicle # 3. Record in Chronicle
@ -82,7 +77,7 @@ class DelegationManager:
agent_type=request.agent_type, agent_type=request.agent_type,
agent_model=request.agent_model, agent_model=request.agent_model,
agent_did=agent_did, agent_did=agent_did,
agent_keycloak_client_id=agent_client_id, agent_keycloak_client_id=credentials.client_id,
delegator_did=delegator_did, delegator_did=delegator_did,
delegator_ac_id=request.delegator_ac_id, delegator_ac_id=request.delegator_ac_id,
delegated_ac_id=ac_result.get("context_id", ""), delegated_ac_id=ac_result.get("context_id", ""),
@ -96,18 +91,19 @@ class DelegationManager:
await db.create_delegation(delegation) await db.create_delegation(delegation)
logger.info( logger.info(
"Delegation created: %s (%s%s)", delegation_id, delegator_did, agent_did "Delegation created: %s (%s%s) via %s",
delegation_id, delegator_did, agent_did, credentials.idp_backend,
) )
return DelegationResponse( return DelegationResponse(
delegation_id=delegation_id, delegation_id=delegation_id,
agent_principal=AgentPrincipal( agent_principal=AgentPrincipal(
did=agent_did, did=agent_did,
keycloak_client_id=agent_client_id, keycloak_client_id=credentials.client_id,
display_name=agent_display, display_name=credentials.agent_display_name,
), ),
delegated_ac=ac_result, delegated_ac=ac_result,
agent_token=kc_result.get("client_secret", ""), agent_token=credentials.client_secret,
expires_at=expires_at.isoformat(), expires_at=expires_at.isoformat(),
max_commands=scope.max_commands, max_commands=scope.max_commands,
chronicle_cid=chronicle_cid, chronicle_cid=chronicle_cid,
@ -119,7 +115,7 @@ class DelegationManager:
return False return False
if delegation.agent_keycloak_client_id: if delegation.agent_keycloak_client_id:
await self.keycloak.delete_agent_client(delegation.agent_keycloak_client_id) await self.registrar.delete_agent(delegation.agent_keycloak_client_id)
await db.revoke_delegation(delegation_id, reason) await db.revoke_delegation(delegation_id, reason)
await chronicle.delegation_revoked(delegation_id, reason) await chronicle.delegation_revoked(delegation_id, reason)
@ -128,11 +124,11 @@ class DelegationManager:
return True return True
async def cleanup_expired(self) -> int: async def cleanup_expired(self) -> int:
"""Expire stale delegations and clean up Keycloak clients.""" """Expire stale delegations and clean up IdP registrations."""
expired = await db.expire_stale() expired = await db.expire_stale()
for d in expired: for d in expired:
if d.agent_keycloak_client_id: if d.agent_keycloak_client_id:
await self.keycloak.delete_agent_client(d.agent_keycloak_client_id) await self.registrar.delete_agent(d.agent_keycloak_client_id)
await chronicle.delegation_expired(d.delegation_id) await chronicle.delegation_expired(d.delegation_id)
if expired: if expired:
logger.info("Expired %d stale delegations", len(expired)) logger.info("Expired %d stale delegations", len(expired))

36
llm_broker/registrar.py Normal file
View file

@ -0,0 +1,36 @@
"""AgentRegistrar protocol — abstract interface for agent identity registration.
Implementations:
KeycloakRegistrar Keycloak Admin REST API (§4.1)
EntraRegistrar Microsoft Entra Agent ID platform (§4.2)
StubRegistrar dev mode without a real IdP
"""
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
@dataclass
class AgentCredentials:
"""Credentials returned after registering an agent identity."""
client_id: str
client_secret: str
agent_display_name: str
idp_backend: str # "keycloak" | "entra" | "stub"
@runtime_checkable
class AgentRegistrar(Protocol):
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials: ...
async def delete_agent(self, client_id: str) -> bool: ...
async def get_agent_token(self, client_id: str) -> str | None: ...

View file

@ -0,0 +1,153 @@
"""Entra Agent ID registrar — registers agent identities via Microsoft Graph.
Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.2.
Uses standard Graph application registration with agent metadata tags.
When Entra Agent ID Blueprint APIs reach GA, this driver should be updated
to use the dedicated /agentIdentityBlueprints and /agentIdentities endpoints.
"""
import logging
from typing import Optional
import httpx
import msal
from .registrar import AgentCredentials
logger = logging.getLogger(__name__)
GRAPH_API = "https://graph.microsoft.com/v1.0"
class EntraRegistrar:
"""AgentRegistrar implementation using Microsoft Entra + Graph API."""
def __init__(
self,
tenant_id: str,
client_id: str,
client_secret: str,
agent_blueprint_id: str = "",
):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.agent_blueprint_id = agent_blueprint_id
self._app = msal.ConfidentialClientApplication(
client_id=self.client_id,
client_credential=self.client_secret,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
)
async def _get_token(self) -> str:
result = self._app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)
if "access_token" in result:
return result["access_token"]
raise RuntimeError(
f"Entra token error: {result.get('error_description', result.get('error', 'unknown'))}"
)
async def _headers(self) -> dict:
token = await self._get_token()
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials:
headers = await self._headers()
tags = [
f"agent_type:{agent_type}",
f"delegation_id:{delegation_id}",
f"delegator:{delegator_id}",
"governed:true",
"HideApp",
]
if self.agent_blueprint_id:
tags.append(f"blueprint:{self.agent_blueprint_id}")
app_body = {
"displayName": display_name,
"signInAudience": "AzureADMyOrg",
"tags": tags,
"notes": f"Governed AI agent. Delegator: {delegator_id}. Expires: {expires_at}",
"passwordCredentials": [],
}
async with httpx.AsyncClient(timeout=15.0) as http:
resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers)
if resp.status_code == 401:
headers = await self._headers()
resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers)
resp.raise_for_status()
app_data = resp.json()
app_id = app_data["appId"]
object_id = app_data["id"]
# Add client secret with TTL matching delegation expiry
secret_resp = await http.post(
f"{GRAPH_API}/applications/{object_id}/addPassword",
json={
"passwordCredential": {
"displayName": f"delegation-{delegation_id}",
"endDateTime": expires_at,
}
},
headers=headers,
)
secret_resp.raise_for_status()
client_secret = secret_resp.json().get("secretText", "")
# Create service principal
sp_resp = await http.post(
f"{GRAPH_API}/servicePrincipals",
json={"appId": app_id, "displayName": display_name, "tags": tags},
headers=headers,
)
if sp_resp.status_code not in (200, 201, 409):
sp_resp.raise_for_status()
logger.info("Entra: registered agent %s (appId=%s)", display_name, app_id)
return AgentCredentials(
client_id=app_id,
client_secret=client_secret,
agent_display_name=display_name,
idp_backend="entra",
)
async def delete_agent(self, client_id: str) -> bool:
headers = await self._headers()
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
f"{GRAPH_API}/applications",
params={"$filter": f"appId eq '{client_id}'"},
headers=headers,
)
resp.raise_for_status()
apps = resp.json().get("value", [])
if not apps:
return False
object_id = apps[0]["id"]
del_resp = await http.delete(
f"{GRAPH_API}/applications/{object_id}",
headers=headers,
)
deleted = del_resp.status_code in (200, 204)
if deleted:
logger.info("Entra: deleted agent app %s", client_id)
return deleted
async def get_agent_token(self, client_id: str) -> str | None:
return None

View file

@ -0,0 +1,64 @@
"""Registrar factory — selects the appropriate AgentRegistrar based on config."""
import logging
from .registrar import AgentRegistrar
from .registrar_stub import StubRegistrar
logger = logging.getLogger(__name__)
def create_registrar(config) -> AgentRegistrar:
"""Create the appropriate registrar based on AGENT_REGISTRAR setting."""
driver = config.agent_registrar
if driver == "keycloak":
if not config.keycloak_admin_client_secret:
logger.warning("Keycloak secret not configured, using stub")
return StubRegistrar()
from .registrar_keycloak import KeycloakRegistrar
return KeycloakRegistrar(
base_url=config.keycloak_url,
realm=config.keycloak_realm,
client_id=config.keycloak_admin_client_id,
client_secret=config.keycloak_admin_client_secret,
)
elif driver == "entra":
if not config.entra_client_secret:
logger.warning("Entra secret not configured, using stub")
return StubRegistrar()
from .registrar_entra import EntraRegistrar
return EntraRegistrar(
tenant_id=config.entra_tenant_id,
client_id=config.entra_client_id,
client_secret=config.entra_client_secret,
agent_blueprint_id=config.entra_agent_blueprint_id,
)
elif driver == "auto":
if config.entra_client_secret:
from .registrar_entra import EntraRegistrar
logger.info("Auto-selected Entra registrar")
return EntraRegistrar(
tenant_id=config.entra_tenant_id,
client_id=config.entra_client_id,
client_secret=config.entra_client_secret,
agent_blueprint_id=config.entra_agent_blueprint_id,
)
elif config.keycloak_admin_client_secret:
from .registrar_keycloak import KeycloakRegistrar
logger.info("Auto-selected Keycloak registrar")
return KeycloakRegistrar(
base_url=config.keycloak_url,
realm=config.keycloak_realm,
client_id=config.keycloak_admin_client_id,
client_secret=config.keycloak_admin_client_secret,
)
else:
logger.warning("No IdP configured, using stub registrar")
return StubRegistrar()
else:
logger.warning("Unknown registrar driver: %s, using stub", driver)
return StubRegistrar()

View file

@ -1,7 +1,6 @@
"""Keycloak Admin API client — ephemeral agent client registration. """Keycloak registrar — registers ephemeral agent clients via Admin REST API.
Registers and deletes confidential Keycloak clients for AI agent Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1.
delegations per GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1.
""" """
import logging import logging
@ -9,10 +8,14 @@ from typing import Optional
import httpx import httpx
from .registrar import AgentCredentials
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class KeycloakAdmin: class KeycloakRegistrar:
"""AgentRegistrar implementation using Keycloak Admin REST API."""
def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str): def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str):
self.base_url = base_url.rstrip("/") self.base_url = base_url.rstrip("/")
self.realm = realm self.realm = realm
@ -39,23 +42,20 @@ class KeycloakAdmin:
await self._get_admin_token() await self._get_admin_token()
return {"Authorization": f"Bearer {self._token}"} return {"Authorization": f"Bearer {self._token}"}
async def register_agent_client( async def register_agent(
self, self,
client_id: str,
display_name: str,
delegator_did: str,
delegation_id: str, delegation_id: str,
agent_type: str, agent_type: str,
) -> dict: delegator_id: str,
"""Register ephemeral Keycloak client for an AI agent.""" display_name: str,
if not self.client_secret: expires_at: str,
logger.info("Keycloak not configured — dev mode stub for %s", client_id) metadata: dict | None = None,
return {"client_id": client_id, "client_secret": f"dev-secret-{delegation_id}", "client_uuid": None} ) -> AgentCredentials:
headers = await self._headers() headers = await self._headers()
kc_client_id = f"agent-{agent_type}-{delegation_id}"
client_rep = { client_rep = {
"clientId": client_id, "clientId": kc_client_id,
"name": display_name, "name": display_name,
"enabled": True, "enabled": True,
"serviceAccountsEnabled": True, "serviceAccountsEnabled": True,
@ -64,7 +64,7 @@ class KeycloakAdmin:
"protocol": "openid-connect", "protocol": "openid-connect",
"attributes": { "attributes": {
"agent_type": agent_type, "agent_type": agent_type,
"delegator_did": delegator_did, "delegator_did": delegator_id,
"delegation_id": delegation_id, "delegation_id": delegation_id,
}, },
} }
@ -75,7 +75,6 @@ class KeycloakAdmin:
json=client_rep, json=client_rep,
headers=headers, headers=headers,
) )
if resp.status_code == 401: if resp.status_code == 401:
headers = {"Authorization": f"Bearer {await self._get_admin_token()}"} headers = {"Authorization": f"Bearer {await self._get_admin_token()}"}
resp = await http.post( resp = await http.post(
@ -83,13 +82,11 @@ class KeycloakAdmin:
json=client_rep, json=client_rep,
headers=headers, headers=headers,
) )
resp.raise_for_status() resp.raise_for_status()
# Retrieve the generated client secret
location = resp.headers.get("Location", "") location = resp.headers.get("Location", "")
client_uuid = location.rstrip("/").split("/")[-1] if location else None client_uuid = location.rstrip("/").split("/")[-1] if location else None
client_secret = "" secret = ""
if client_uuid: if client_uuid:
secret_resp = await http.get( secret_resp = await http.get(
@ -97,21 +94,17 @@ class KeycloakAdmin:
headers=headers, headers=headers,
) )
if secret_resp.status_code == 200: if secret_resp.status_code == 200:
client_secret = secret_resp.json().get("value", "") secret = secret_resp.json().get("value", "")
logger.info("Registered agent client: %s (uuid=%s)", client_id, client_uuid) logger.info("Keycloak: registered agent %s (uuid=%s)", kc_client_id, client_uuid)
return { return AgentCredentials(
"client_id": client_id, client_id=kc_client_id,
"client_secret": client_secret, client_secret=secret,
"client_uuid": client_uuid, agent_display_name=display_name,
} idp_backend="keycloak",
)
async def delete_agent_client(self, client_id: str) -> bool:
"""Delete ephemeral agent client on revocation/expiry."""
if not self.client_secret:
logger.info("Keycloak not configured — dev mode stub delete for %s", client_id)
return True
async def delete_agent(self, client_id: str) -> bool:
headers = await self._headers() headers = await self._headers()
async with httpx.AsyncClient(timeout=10.0) as http: async with httpx.AsyncClient(timeout=10.0) as http:
@ -134,5 +127,8 @@ class KeycloakAdmin:
) )
deleted = del_resp.status_code in (200, 204) deleted = del_resp.status_code in (200, 204)
if deleted: if deleted:
logger.info("Deleted agent client: %s", client_id) logger.info("Keycloak: deleted agent %s", client_id)
return deleted return deleted
async def get_agent_token(self, client_id: str) -> str | None:
return None

View file

@ -0,0 +1,35 @@
"""Stub registrar for development without a real IdP."""
import logging
from .registrar import AgentCredentials
logger = logging.getLogger(__name__)
class StubRegistrar:
"""AgentRegistrar stub — returns dev credentials without calling any IdP."""
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials:
logger.info("Stub registrar: register %s for delegation %s", agent_type, delegation_id)
return AgentCredentials(
client_id=f"stub-agent-{agent_type}-{delegation_id}",
client_secret=f"stub-secret-{delegation_id}",
agent_display_name=display_name,
idp_backend="stub",
)
async def delete_agent(self, client_id: str) -> bool:
logger.info("Stub registrar: delete %s", client_id)
return True
async def get_agent_token(self, client_id: str) -> str | None:
return f"stub-token-{client_id}"

View file

@ -21,6 +21,15 @@ class Settings(BaseSettings):
keycloak_admin_client_id: str = "llm-broker-admin" keycloak_admin_client_id: str = "llm-broker-admin"
keycloak_admin_client_secret: str = "" keycloak_admin_client_secret: str = ""
# Agent registrar driver: auto | keycloak | entra
agent_registrar: str = "auto"
# Entra Agent ID
entra_tenant_id: str = ""
entra_client_id: str = ""
entra_client_secret: str = ""
entra_agent_blueprint_id: str = ""
# Chronicle # Chronicle
chronicle_webhook_url: Optional[str] = None chronicle_webhook_url: Optional[str] = None

View file

@ -16,6 +16,7 @@ dependencies = [
"sqlmodel>=0.0.19", "sqlmodel>=0.0.19",
"aiosqlite>=0.20.0", "aiosqlite>=0.20.0",
"structlog>=24.1.0", "structlog>=24.1.0",
"msal>=1.28.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]