This repository has been archived on 2026-04-16. You can view files and clone it, but cannot push or open issues or pull requests.
llm-principal-broker/llm_broker/delegation.py
Tyler King f3ffeb38ae feat: AgentRegistrar abstraction + Entra Agent ID driver
Extracts agent identity registration behind AgentRegistrar protocol.
Three implementations:
  KeycloakRegistrar — Keycloak Admin REST API (existing, refactored)
  EntraRegistrar    — Microsoft Entra Agent ID platform (NEW)
  StubRegistrar     — dev mode without real IdP

Driver selection via AGENT_REGISTRAR env var:
  auto     — prefers Entra if configured, Keycloak fallback, stub default
  keycloak — explicit Keycloak
  entra    — explicit Entra Agent ID

Entra integration:
  Registers agent as Entra app + service principal via Graph API
  Tags with delegation metadata (agent_type, delegator, governed:true)
  Client secret TTL matches delegation expiry
  Deletes application on revocation (cascades to SP)
  Uses msal for token acquisition
  Future: native Agent ID Blueprint API when GA

Files:
  registrar.py         — AgentRegistrar protocol + AgentCredentials dataclass
  registrar_keycloak.py — refactored from keycloak.py
  registrar_entra.py   — NEW Entra Graph API driver
  registrar_stub.py    — dev mode stub
  registrar_factory.py — driver selection factory
  delegation.py        — updated to use registrar abstraction
  settings.py          — added Entra config + agent_registrar field

All 7 smoke tests pass with stub registrar.
Implements GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1-§4.3.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 16:52:45 -04:00

135 lines
5.1 KiB
Python

"""Core delegation lifecycle — GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §3."""
import logging
import uuid
from datetime import datetime, timedelta
from llm_broker import chronicle, db
from llm_broker.db import DelegationDB
from llm_broker.gsap import GSAPClient
from llm_broker.models import (
AgentPrincipal,
DelegationRequest,
DelegationResponse,
DelegationScope,
)
from llm_broker.registrar_factory import create_registrar
from llm_broker.settings import Settings
logger = logging.getLogger(__name__)
class DelegationManager:
def __init__(self, config: Settings):
self.config = config
self.registrar = create_registrar(config)
self.gsap = GSAPClient(config.gsap_broker_url, config.gsap_bearer_token)
async def create_delegation(
self, request: DelegationRequest, delegator_did: str
) -> DelegationResponse:
delegation_id = f"del-{uuid.uuid4().hex[:8]}"
scope = request.scope or DelegationScope()
now = datetime.utcnow()
expires_at = now + timedelta(minutes=scope.max_ttl_minutes)
agent_did = f"did:web:guildhouse.dev/agent/{request.agent_type}-{delegation_id}"
delegator_short = delegator_did.rsplit("/", 1)[-1]
agent_display = f"{request.agent_type} (delegated by {delegator_short})"
# 1. Register agent identity via the configured registrar
credentials = await self.registrar.register_agent(
delegation_id=delegation_id,
agent_type=request.agent_type,
delegator_id=delegator_did,
display_name=agent_display,
expires_at=expires_at.isoformat(),
metadata={"model": request.agent_model},
)
# 2. Request delegated AC from GSAP broker
try:
ac_result = await self.gsap.request_delegated_ac(
delegator_ac_id=request.delegator_ac_id,
agent_did=agent_did,
delegation_id=delegation_id,
corpus_entry_cid="sha256:dev-jumphost",
capability_ceiling=scope.capability_ceiling,
ttl_minutes=scope.max_ttl_minutes,
)
except Exception as e:
await self.registrar.delete_agent(credentials.client_id)
raise RuntimeError(f"Failed to request delegated AC: {e}")
# 3. Record in Chronicle
chronicle_cid = await chronicle.delegation_created(
delegation_id=delegation_id,
delegator_did=delegator_did,
agent_did=agent_did,
agent_type=request.agent_type,
scope=scope.model_dump(),
)
# 4. Persist
delegation = DelegationDB(
delegation_id=delegation_id,
status="active",
agent_type=request.agent_type,
agent_model=request.agent_model,
agent_did=agent_did,
agent_keycloak_client_id=credentials.client_id,
delegator_did=delegator_did,
delegator_ac_id=request.delegator_ac_id,
delegated_ac_id=ac_result.get("context_id", ""),
capability_ceiling=scope.capability_ceiling,
ceremony_required_for=",".join(scope.ceremony_required_for),
max_commands=scope.max_commands,
created_at=now,
expires_at=expires_at,
chronicle_cid=chronicle_cid or None,
)
await db.create_delegation(delegation)
logger.info(
"Delegation created: %s (%s%s) via %s",
delegation_id, delegator_did, agent_did, credentials.idp_backend,
)
return DelegationResponse(
delegation_id=delegation_id,
agent_principal=AgentPrincipal(
did=agent_did,
keycloak_client_id=credentials.client_id,
display_name=credentials.agent_display_name,
),
delegated_ac=ac_result,
agent_token=credentials.client_secret,
expires_at=expires_at.isoformat(),
max_commands=scope.max_commands,
chronicle_cid=chronicle_cid,
)
async def revoke_delegation(self, delegation_id: str, reason: str) -> bool:
delegation = await db.get_delegation(delegation_id)
if not delegation or delegation.status != "active":
return False
if delegation.agent_keycloak_client_id:
await self.registrar.delete_agent(delegation.agent_keycloak_client_id)
await db.revoke_delegation(delegation_id, reason)
await chronicle.delegation_revoked(delegation_id, reason)
logger.info("Delegation revoked: %s (%s)", delegation_id, reason)
return True
async def cleanup_expired(self) -> int:
"""Expire stale delegations and clean up IdP registrations."""
expired = await db.expire_stale()
for d in expired:
if d.agent_keycloak_client_id:
await self.registrar.delete_agent(d.agent_keycloak_client_id)
await chronicle.delegation_expired(d.delegation_id)
if expired:
logger.info("Expired %d stale delegations", len(expired))
return len(expired)