feat: absorb llm-principal-broker as gsap_broker/delegations/

Merges the standalone llm-principal-broker (1,132 LOC) into fastapi-gsap
as an in-process module. The previous architecture had two FastAPI
processes where the broker called GSAP over HTTP on every delegation
creation; now the lifecycle code uses GSAP's own async DB engine
directly and inserts AuthorizationContextDB rows in the same
transaction context.

New module: gsap_broker/delegations/
  models.py             Pydantic request/response shapes
  storage.py            DelegationDB SQLModel sharing the GSAP engine
  lifecycle.py          DelegationManager — in-process AC issuance via
                        AuthorizationContextDB.insert (no HTTP self-call)
  cleanup.py            30s background task for stale delegations
  router.py             /delegations/* FastAPI router (4 endpoints)
  registrars/
    base.py             AgentRegistrar Protocol + AgentCredentials
    stub.py             dev-mode no-op
    keycloak.py         Keycloak Admin REST API
    entra.py            Microsoft Entra Agent ID via Graph (lazy import)
    factory.py          driver selection (auto/stub/keycloak/entra)

Wiring:
  app.py mounts the delegations router and starts the cleanup task in
  the existing lifespan context manager.
  settings.py absorbs the keycloak_admin_*, entra_*, and
  agent_registrar fields from the old broker's settings.
  pyproject.toml adds an optional `entra` extra for the msal dep.

Behaviour preservation:
  - Endpoints kept identical: POST /, POST /{id}/revoke, GET /{id}, GET /
  - Chronicle event codes preserved: 0x3001 / 0x3003 / 0x3004
  - DelegationScope defaults unchanged (max_ttl_minutes=60, max_commands=500)
  - Capability ceiling -> capability_mask conversion documented inline

Smoke test: `python -c "from gsap_broker.app import app"` loads cleanly
with 26 routes including the four /delegations/ endpoints.

The standalone llm-principal-broker repo is archived to
~/projects/archive/llm-principal-broker.

Signed-off-by: Tyler King <tking@guildhouse.dev>
This commit is contained in:
Tyler King 2026-04-08 13:37:06 -04:00
parent 3ed75169c7
commit f7c49387c1
15 changed files with 1076 additions and 4 deletions

View file

@ -1,4 +1,12 @@
"""fastapi-gsap: Lightweight GSAP broker — GCAP-SPEC-SHELLBOUND-BROKER-0001."""
"""fastapi-gsap: Lightweight GSAP broker — GCAP-SPEC-SHELLBOUND-BROKER-0001.
Also hosts the delegation lifecycle module
(GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001) absorbed from the standalone
llm-principal-broker service. The delegation router is mounted at
``/delegations`` and the storage layer reuses GSAP's existing async DB
engine no separate process, no HTTP self-call.
"""
import asyncio
import structlog
from contextlib import asynccontextmanager
from fastapi import FastAPI
@ -7,14 +15,30 @@ from gsap_broker.settings import settings
from gsap_broker.db import init_db
from gsap_broker.routers import authorize, complete, session, elevate, health, drivers, connectors, functions
from gsap_broker import mcp
from gsap_broker.delegations.router import router as delegations_router, manager as delegation_manager
from gsap_broker.delegations.cleanup import cleanup_loop as delegation_cleanup_loop
# Import storage so SQLModel.metadata picks up the delegations table at init_db().
from gsap_broker.delegations import storage as _delegations_storage # noqa: F401
logger = structlog.get_logger()
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
logger.info("fastapi-gsap started", broker_did=settings.broker_did)
cleanup_task = asyncio.create_task(delegation_cleanup_loop(delegation_manager))
logger.info(
"fastapi-gsap started",
broker_did=settings.broker_did,
delegations_router="/delegations",
)
try:
yield
finally:
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
app = FastAPI(title="fastapi-gsap", description="GSAP broker PoC — GCAP-SPEC-SHELLBOUND-BROKER-0001",
version="0.1.0", lifespan=lifespan)
@ -29,3 +53,4 @@ app.include_router(connectors.router, prefix="/connectors", tags=["Connectors"])
app.include_router(functions.router, prefix="/functions", tags=["Functions"])
app.include_router(health.router, tags=["Health"])
app.include_router(mcp.router, tags=["MCP"])
app.include_router(delegations_router)

View file

View file

@ -0,0 +1,21 @@
"""Background task: expire stale delegations every 30 seconds.
Started by ``gsap_broker/app.py`` lifespan. Stopped on shutdown.
"""
import asyncio
import logging
logger = logging.getLogger(__name__)
async def cleanup_loop(manager) -> None:
"""Periodically expire stale delegations (30s interval)."""
while True:
try:
await manager.cleanup_expired()
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning("delegation cleanup error: %s", e)
await asyncio.sleep(30)

View file

@ -0,0 +1,241 @@
"""Delegation lifecycle — GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §3.
Originally a separate llm-principal-broker service that called GSAP's
``/governance/authorize/`` endpoint over HTTP. Now an in-process module:
``create_delegation`` opens an AsyncSession against GSAP's own DB engine
and inserts an ``AuthorizationContextDB`` row directly. No process boundary,
no HTTP hop, same Chronicle event emission.
"""
import logging
import uuid
from datetime import datetime, timedelta, UTC
from sqlmodel.ext.asyncio.session import AsyncSession
from gsap_broker import chronicle
from gsap_broker.db import engine
from gsap_broker.db_models import AuthorizationContextDB
from gsap_broker.settings import settings as gsap_settings
from .models import (
AgentPrincipal,
DelegationRequest,
DelegationResponse,
DelegationScope,
)
from .registrars.factory import create_registrar
from .storage import (
DelegationDB,
create_delegation as db_create,
expire_stale,
get_delegation as db_get,
revoke_delegation as db_revoke,
)
logger = logging.getLogger(__name__)
class DelegationManager:
"""Owns the delegation lifecycle. Constructed once at app startup."""
def __init__(self, config=gsap_settings):
self.config = config
self.registrar = create_registrar(config)
async def create_delegation(
self, request: DelegationRequest, delegator_did: str
) -> DelegationResponse:
delegation_id = f"del-{uuid.uuid4().hex[:8]}"
scope = request.scope or DelegationScope()
now = datetime.now(UTC)
expires_at = now + timedelta(minutes=scope.max_ttl_minutes)
agent_did = f"did:web:guildhouse.dev/agent/{request.agent_type}-{delegation_id}"
delegator_short = delegator_did.rsplit("/", 1)[-1]
agent_display = f"{request.agent_type} (delegated by {delegator_short})"
# 1. Register agent identity via the configured registrar.
credentials = await self.registrar.register_agent(
delegation_id=delegation_id,
agent_type=request.agent_type,
delegator_id=delegator_did,
display_name=agent_display,
expires_at=expires_at.isoformat(),
metadata={"model": request.agent_model},
)
# 2. Issue a delegated AuthorizationContext directly against the
# GSAP DB. Mirrors routers/authorize.py for the on_behalf_of
# trusted-caller path. We bypass the HTTP layer because GSAP
# is calling itself.
try:
ac_result = await self._issue_delegated_ac(
delegation_id=delegation_id,
agent_did=agent_did,
accord_template=request.accord_template,
expires_at=expires_at,
scope=scope,
)
except Exception as e:
await self.registrar.delete_agent(credentials.client_id)
raise RuntimeError(f"Failed to issue delegated AC: {e}")
# 3. Chronicle event for the delegation creation.
chronicle_cid = await chronicle.emit(
"DELEGATION_CREATED",
{
"event_code": "0x3001",
"delegation_id": delegation_id,
"delegator_did": delegator_did,
"agent_did": agent_did,
"agent_type": request.agent_type,
"scope": scope.model_dump(),
"timestamp": now.isoformat(),
},
)
# 4. Persist the delegation row.
delegation = DelegationDB(
delegation_id=delegation_id,
status="active",
agent_type=request.agent_type,
agent_model=request.agent_model,
agent_did=agent_did,
agent_keycloak_client_id=credentials.client_id,
delegator_did=delegator_did,
delegator_ac_id=request.delegator_ac_id,
delegated_ac_id=ac_result.get("context_id", ""),
capability_ceiling=scope.capability_ceiling,
ceremony_required_for=",".join(scope.ceremony_required_for),
max_commands=scope.max_commands,
created_at=now.replace(tzinfo=None),
expires_at=expires_at.replace(tzinfo=None),
chronicle_cid=chronicle_cid or None,
)
await db_create(delegation)
logger.info(
"Delegation created: %s (%s -> %s) via %s",
delegation_id,
delegator_did,
agent_did,
credentials.idp_backend,
)
return DelegationResponse(
delegation_id=delegation_id,
agent_principal=AgentPrincipal(
did=agent_did,
keycloak_client_id=credentials.client_id,
display_name=credentials.agent_display_name,
),
delegated_ac=ac_result,
agent_token=credentials.client_secret,
expires_at=expires_at.isoformat(),
max_commands=scope.max_commands,
chronicle_cid=chronicle_cid,
)
async def _issue_delegated_ac(
self,
delegation_id: str,
agent_did: str,
accord_template: str,
expires_at: datetime,
scope: DelegationScope,
) -> dict:
"""Insert an AuthorizationContextDB row representing the delegated AC.
Mirrors the trusted-caller (on_behalf_of) path of
``routers/authorize.authorize`` without going through HTTP.
"""
ctx_id = uuid.uuid4()
now = datetime.now(UTC)
ac_db = AuthorizationContextDB(
context_id=ctx_id,
principal_did=agent_did,
driver_id="delegation",
playbook=f"delegation:{delegation_id}",
corpus_entry_cid="sha256:delegated-agent",
parameters_cid=f"sha256:delegation-{delegation_id}",
accord_template=accord_template,
capability_mask=_capability_mask_for(scope.capability_ceiling),
idp_vendor="delegation",
token_jti="",
elevation_active=[],
mfa_satisfied=False,
status="authorized",
issued_at=now.replace(tzinfo=None),
expires_at=expires_at.replace(tzinfo=None),
session_mode=True,
)
async with AsyncSession(engine) as session:
session.add(ac_db)
await session.commit()
return {
"status": "authorized",
"context_id": str(ctx_id),
"principal_did": agent_did,
"delegation_id": delegation_id,
"capability_ceiling": scope.capability_ceiling,
"expires_at": expires_at.isoformat(),
}
async def revoke_delegation(self, delegation_id: str, reason: str) -> bool:
delegation = await db_get(delegation_id)
if not delegation or delegation.status != "active":
return False
if delegation.agent_keycloak_client_id:
await self.registrar.delete_agent(delegation.agent_keycloak_client_id)
await db_revoke(delegation_id, reason)
await chronicle.emit(
"DELEGATION_REVOKED",
{
"event_code": "0x3003",
"delegation_id": delegation_id,
"reason": reason,
"timestamp": datetime.now(UTC).isoformat(),
},
)
logger.info("Delegation revoked: %s (%s)", delegation_id, reason)
return True
async def cleanup_expired(self) -> int:
"""Expire stale delegations and clean up IdP registrations."""
expired = await expire_stale()
for d in expired:
if d.agent_keycloak_client_id:
await self.registrar.delete_agent(d.agent_keycloak_client_id)
await chronicle.emit(
"DELEGATION_EXPIRED",
{
"event_code": "0x3004",
"delegation_id": d.delegation_id,
"timestamp": datetime.now(UTC).isoformat(),
},
)
if expired:
logger.info("Expired %d stale delegations", len(expired))
return len(expired)
def _capability_mask_for(ceiling: str) -> int:
"""Convert a capability ceiling string to the GSAP capability mask bits."""
bits = {"CAP_READ": 1, "CAP_PROPOSE": 2, "CAP_MUTATE": 4, "CAP_ADMIN": 8}
if ceiling in bits:
# Ceiling is inclusive: CAP_MUTATE means READ | PROPOSE | MUTATE.
order = ["CAP_READ", "CAP_PROPOSE", "CAP_MUTATE", "CAP_ADMIN"]
idx = order.index(ceiling)
mask = 0
for cap in order[: idx + 1]:
mask |= bits[cap]
return mask
return 1 # default to READ

View file

@ -0,0 +1,96 @@
"""Pydantic models for delegation lifecycle.
Originally from llm-principal-broker (GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001
§3, §8). Absorbed into fastapi-gsap as the delegations submodule so that
the lifecycle bookkeeping shares process and database with the AC issuance
endpoints. The previous standalone service made an HTTP call back to GSAP
on every delegation creation; now it is an in-process function call.
"""
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
class DelegationStatus(str, Enum):
REQUESTED = "requested"
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
class DelegationScope(BaseModel):
inherit_corpus: bool = True
inherit_contexts: bool = True
capability_ceiling: str = "CAP_MUTATE"
ceremony_required_for: list[str] = Field(
default_factory=lambda: ["delete", "destroy", "drop"]
)
prohibited_commands: list[str] = Field(default_factory=list)
max_ttl_minutes: int = 60
max_commands: int = 500
class DelegationRequest(BaseModel):
"""POST /delegations/ request body — §8.1."""
delegator_ac_id: str
agent_type: str = "claude-code"
agent_model: Optional[str] = None
scope: Optional[DelegationScope] = None
accord_template: str = "ai-delegation-standard"
class AgentPrincipal(BaseModel):
did: str
keycloak_client_id: str
display_name: str
class DelegationResponse(BaseModel):
"""POST /delegations/ response — §3.2."""
delegation_id: str
agent_principal: AgentPrincipal
delegated_ac: dict
agent_token: str
expires_at: str
max_commands: int
chronicle_cid: Optional[str] = None
class DelegationInfo(BaseModel):
"""GET /delegations/{id} response — §8.3."""
delegation_id: str
status: DelegationStatus
agent_did: str
agent_type: str
delegator_did: str
commands_executed: int
commands_remaining: int
ttl_remaining_seconds: int
created_at: str
expires_at: str
class RevokeRequest(BaseModel):
reason: str = "manual_revocation"
class RevokeResponse(BaseModel):
delegation_id: str
status: str = "revoked"
reason: str
chronicle_cid: Optional[str] = None
class ActiveDelegation(BaseModel):
delegation_id: str
agent_type: str
delegator: str
commands_executed: int
ttl_remaining_seconds: int
status: str
class AgentListResponse(BaseModel):
active_delegations: list[ActiveDelegation]
total_active: int

View file

@ -0,0 +1,36 @@
"""AgentRegistrar protocol — abstract interface for agent identity registration.
Implementations:
KeycloakRegistrar Keycloak Admin REST API (§4.1)
EntraRegistrar Microsoft Entra Agent ID platform (§4.2)
StubRegistrar dev mode without a real IdP
"""
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
@dataclass
class AgentCredentials:
"""Credentials returned after registering an agent identity."""
client_id: str
client_secret: str
agent_display_name: str
idp_backend: str # "keycloak" | "entra" | "stub"
@runtime_checkable
class AgentRegistrar(Protocol):
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials: ...
async def delete_agent(self, client_id: str) -> bool: ...
async def get_agent_token(self, client_id: str) -> str | None: ...

View file

@ -0,0 +1,150 @@
"""Entra Agent ID registrar — registers agent identities via Microsoft Graph.
Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.2.
Uses standard Graph application registration with agent metadata tags.
When Entra Agent ID Blueprint APIs reach GA, this driver should be updated
to use the dedicated /agentIdentityBlueprints and /agentIdentities endpoints.
"""
import logging
import httpx
import msal
from .base import AgentCredentials
logger = logging.getLogger(__name__)
GRAPH_API = "https://graph.microsoft.com/v1.0"
class EntraRegistrar:
"""AgentRegistrar implementation using Microsoft Entra + Graph API."""
def __init__(
self,
tenant_id: str,
client_id: str,
client_secret: str,
agent_blueprint_id: str = "",
):
self.tenant_id = tenant_id
self.client_id = client_id
self.client_secret = client_secret
self.agent_blueprint_id = agent_blueprint_id
self._app = msal.ConfidentialClientApplication(
client_id=self.client_id,
client_credential=self.client_secret,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
)
async def _get_token(self) -> str:
result = self._app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)
if "access_token" in result:
return result["access_token"]
raise RuntimeError(
f"Entra token error: {result.get('error_description', result.get('error', 'unknown'))}"
)
async def _headers(self) -> dict:
token = await self._get_token()
return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials:
headers = await self._headers()
tags = [
f"agent_type:{agent_type}",
f"delegation_id:{delegation_id}",
f"delegator:{delegator_id}",
"governed:true",
"HideApp",
]
if self.agent_blueprint_id:
tags.append(f"blueprint:{self.agent_blueprint_id}")
app_body = {
"displayName": display_name,
"signInAudience": "AzureADMyOrg",
"tags": tags,
"notes": f"Governed AI agent. Delegator: {delegator_id}. Expires: {expires_at}",
"passwordCredentials": [],
}
async with httpx.AsyncClient(timeout=15.0) as http:
resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers)
if resp.status_code == 401:
headers = await self._headers()
resp = await http.post(f"{GRAPH_API}/applications", json=app_body, headers=headers)
resp.raise_for_status()
app_data = resp.json()
app_id = app_data["appId"]
object_id = app_data["id"]
secret_resp = await http.post(
f"{GRAPH_API}/applications/{object_id}/addPassword",
json={
"passwordCredential": {
"displayName": f"delegation-{delegation_id}",
"endDateTime": expires_at,
}
},
headers=headers,
)
secret_resp.raise_for_status()
client_secret = secret_resp.json().get("secretText", "")
sp_resp = await http.post(
f"{GRAPH_API}/servicePrincipals",
json={"appId": app_id, "displayName": display_name, "tags": tags},
headers=headers,
)
if sp_resp.status_code not in (200, 201, 409):
sp_resp.raise_for_status()
logger.info("Entra: registered agent %s (appId=%s)", display_name, app_id)
return AgentCredentials(
client_id=app_id,
client_secret=client_secret,
agent_display_name=display_name,
idp_backend="entra",
)
async def delete_agent(self, client_id: str) -> bool:
headers = await self._headers()
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
f"{GRAPH_API}/applications",
params={"$filter": f"appId eq '{client_id}'"},
headers=headers,
)
resp.raise_for_status()
apps = resp.json().get("value", [])
if not apps:
return False
object_id = apps[0]["id"]
del_resp = await http.delete(
f"{GRAPH_API}/applications/{object_id}",
headers=headers,
)
deleted = del_resp.status_code in (200, 204)
if deleted:
logger.info("Entra: deleted agent app %s", client_id)
return deleted
async def get_agent_token(self, client_id: str) -> str | None:
return None

View file

@ -0,0 +1,65 @@
"""Registrar factory — selects the appropriate AgentRegistrar based on settings."""
import logging
from .base import AgentRegistrar
from .stub import StubRegistrar
logger = logging.getLogger(__name__)
def create_registrar(config) -> AgentRegistrar:
"""Create the appropriate registrar based on agent_registrar setting."""
driver = config.agent_registrar
if driver == "stub":
return StubRegistrar()
if driver == "keycloak":
if not config.keycloak_admin_client_secret:
logger.warning("Keycloak secret not configured, using stub")
return StubRegistrar()
from .keycloak import KeycloakRegistrar
return KeycloakRegistrar(
base_url=config.keycloak_url,
realm=config.keycloak_realm,
client_id=config.keycloak_admin_client_id,
client_secret=config.keycloak_admin_client_secret,
)
if driver == "entra":
if not config.entra_client_secret:
logger.warning("Entra secret not configured, using stub")
return StubRegistrar()
from .entra import EntraRegistrar
return EntraRegistrar(
tenant_id=config.entra_tenant_id,
client_id=config.entra_client_id,
client_secret=config.entra_client_secret,
agent_blueprint_id=config.entra_agent_blueprint_id,
)
if driver == "auto":
if config.entra_client_secret:
from .entra import EntraRegistrar
logger.info("Auto-selected Entra registrar")
return EntraRegistrar(
tenant_id=config.entra_tenant_id,
client_id=config.entra_client_id,
client_secret=config.entra_client_secret,
agent_blueprint_id=config.entra_agent_blueprint_id,
)
if config.keycloak_admin_client_secret:
from .keycloak import KeycloakRegistrar
logger.info("Auto-selected Keycloak registrar")
return KeycloakRegistrar(
base_url=config.keycloak_url,
realm=config.keycloak_realm,
client_id=config.keycloak_admin_client_id,
client_secret=config.keycloak_admin_client_secret,
)
logger.warning("No IdP configured, using stub registrar")
return StubRegistrar()
logger.warning("Unknown registrar driver: %s, using stub", driver)
return StubRegistrar()

View file

@ -0,0 +1,134 @@
"""Keycloak registrar — registers ephemeral agent clients via Admin REST API.
Implements AgentRegistrar for GCAP-SPEC-LLM-PRINCIPAL-BROKER-0001 §4.1.
"""
import logging
from typing import Optional
import httpx
from .base import AgentCredentials
logger = logging.getLogger(__name__)
class KeycloakRegistrar:
"""AgentRegistrar implementation using Keycloak Admin REST API."""
def __init__(self, base_url: str, realm: str, client_id: str, client_secret: str):
self.base_url = base_url.rstrip("/")
self.realm = realm
self.client_id = client_id
self.client_secret = client_secret
self._token: Optional[str] = None
async def _get_admin_token(self) -> str:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(
f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token",
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
},
)
resp.raise_for_status()
self._token = resp.json()["access_token"]
return self._token
async def _headers(self) -> dict:
if not self._token:
await self._get_admin_token()
return {"Authorization": f"Bearer {self._token}"}
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials:
headers = await self._headers()
kc_client_id = f"agent-{agent_type}-{delegation_id}"
client_rep = {
"clientId": kc_client_id,
"name": display_name,
"enabled": True,
"serviceAccountsEnabled": True,
"directAccessGrantsEnabled": False,
"publicClient": False,
"protocol": "openid-connect",
"attributes": {
"agent_type": agent_type,
"delegator_did": delegator_id,
"delegation_id": delegation_id,
},
}
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.post(
f"{self.base_url}/admin/realms/{self.realm}/clients",
json=client_rep,
headers=headers,
)
if resp.status_code == 401:
headers = {"Authorization": f"Bearer {await self._get_admin_token()}"}
resp = await http.post(
f"{self.base_url}/admin/realms/{self.realm}/clients",
json=client_rep,
headers=headers,
)
resp.raise_for_status()
location = resp.headers.get("Location", "")
client_uuid = location.rstrip("/").split("/")[-1] if location else None
secret = ""
if client_uuid:
secret_resp = await http.get(
f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}/client-secret",
headers=headers,
)
if secret_resp.status_code == 200:
secret = secret_resp.json().get("value", "")
logger.info("Keycloak: registered agent %s (uuid=%s)", kc_client_id, client_uuid)
return AgentCredentials(
client_id=kc_client_id,
client_secret=secret,
agent_display_name=display_name,
idp_backend="keycloak",
)
async def delete_agent(self, client_id: str) -> bool:
headers = await self._headers()
async with httpx.AsyncClient(timeout=10.0) as http:
resp = await http.get(
f"{self.base_url}/admin/realms/{self.realm}/clients",
params={"clientId": client_id},
headers=headers,
)
if resp.status_code != 200:
return False
clients = resp.json()
if not clients:
return False
client_uuid = clients[0]["id"]
del_resp = await http.delete(
f"{self.base_url}/admin/realms/{self.realm}/clients/{client_uuid}",
headers=headers,
)
deleted = del_resp.status_code in (200, 204)
if deleted:
logger.info("Keycloak: deleted agent %s", client_id)
return deleted
async def get_agent_token(self, client_id: str) -> str | None:
return None

View file

@ -0,0 +1,35 @@
"""Stub registrar for development without a real IdP."""
import logging
from .base import AgentCredentials
logger = logging.getLogger(__name__)
class StubRegistrar:
"""AgentRegistrar stub — returns dev credentials without calling any IdP."""
async def register_agent(
self,
delegation_id: str,
agent_type: str,
delegator_id: str,
display_name: str,
expires_at: str,
metadata: dict | None = None,
) -> AgentCredentials:
logger.info("Stub registrar: register %s for delegation %s", agent_type, delegation_id)
return AgentCredentials(
client_id=f"stub-agent-{agent_type}-{delegation_id}",
client_secret=f"stub-secret-{delegation_id}",
agent_display_name=display_name,
idp_backend="stub",
)
async def delete_agent(self, client_id: str) -> bool:
logger.info("Stub registrar: delete %s", client_id)
return True
async def get_agent_token(self, client_id: str) -> str | None:
return f"stub-token-{client_id}"

View file

@ -0,0 +1,103 @@
"""FastAPI router for delegation lifecycle.
Endpoints (originally from llm-principal-broker, now in-process):
POST /delegations/ create_delegation §8.1
POST /delegations/{id}/revoke revoke_delegation §8.2
GET /delegations/{id} get_delegation §8.3
GET /delegations/ list_delegations §8.4
"""
from datetime import datetime
from fastapi import APIRouter, Header, HTTPException
from .lifecycle import DelegationManager
from .models import (
ActiveDelegation,
AgentListResponse,
DelegationInfo,
DelegationRequest,
DelegationResponse,
DelegationStatus,
RevokeRequest,
RevokeResponse,
)
from .storage import get_active_delegations, get_delegation as db_get
router = APIRouter(prefix="/delegations", tags=["Delegations"])
# A single DelegationManager instance is shared across requests. It holds the
# AgentRegistrar (Keycloak/Entra/Stub) and is constructed once at import time.
manager = DelegationManager()
@router.post("/", response_model=DelegationResponse)
async def create_delegation(
request: DelegationRequest,
x_delegator_did: str = Header(..., alias="X-Delegator-DID"),
):
"""Request delegation of authority to an AI agent (§8.1)."""
try:
return await manager.create_delegation(request, x_delegator_did)
except RuntimeError as e:
raise HTTPException(status_code=502, detail=str(e))
@router.post("/{delegation_id}/revoke", response_model=RevokeResponse)
async def revoke_delegation(
delegation_id: str,
request: RevokeRequest = RevokeRequest(),
):
"""Revoke an active delegation (§8.2)."""
success = await manager.revoke_delegation(delegation_id, request.reason)
if not success:
raise HTTPException(status_code=404, detail="Delegation not found or not active")
return RevokeResponse(delegation_id=delegation_id, reason=request.reason)
@router.get("/{delegation_id}", response_model=DelegationInfo)
async def get_delegation(delegation_id: str):
"""Query delegation status (§8.3)."""
d = await db_get(delegation_id)
if not d:
raise HTTPException(status_code=404, detail="Delegation not found")
now = datetime.utcnow()
ttl_remaining = max(0, int((d.expires_at - now).total_seconds()))
return DelegationInfo(
delegation_id=d.delegation_id,
status=DelegationStatus(d.status),
agent_did=d.agent_did,
agent_type=d.agent_type,
delegator_did=d.delegator_did,
commands_executed=d.commands_executed,
commands_remaining=max(0, d.max_commands - d.commands_executed),
ttl_remaining_seconds=ttl_remaining,
created_at=d.created_at.isoformat(),
expires_at=d.expires_at.isoformat(),
)
@router.get("/", response_model=AgentListResponse)
async def list_delegations():
"""List all active agent delegations (§8.4)."""
active = await get_active_delegations()
now = datetime.utcnow()
return AgentListResponse(
active_delegations=[
ActiveDelegation(
delegation_id=d.delegation_id,
agent_type=d.agent_type,
delegator=d.delegator_did,
commands_executed=d.commands_executed,
ttl_remaining_seconds=max(
0, int((d.expires_at - now).total_seconds())
),
status=d.status,
)
for d in active
],
total_active=len(active),
)

View file

@ -0,0 +1,136 @@
"""Delegation persistence — shares the GSAP async engine.
The DelegationDB SQLModel is registered against the same SQLModel.metadata
as the rest of GSAP, so init_db() in gsap_broker.db creates this table
alongside the AuthorizationContextDB and friends.
"""
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field, select
from sqlmodel.ext.asyncio.session import AsyncSession
from gsap_broker.db import engine
class DelegationDB(SQLModel, table=True):
__tablename__ = "delegations"
delegation_id: str = Field(primary_key=True)
status: str = Field(default="active", index=True)
agent_type: str
agent_model: Optional[str] = None
agent_did: str
agent_keycloak_client_id: Optional[str] = None
delegator_did: str
delegator_ac_id: str
delegated_ac_id: Optional[str] = None
capability_ceiling: str = "CAP_MUTATE"
ceremony_required_for: str = ""
max_commands: int = 500
commands_executed: int = 0
created_at: datetime = Field(default_factory=lambda: datetime.utcnow())
expires_at: datetime = Field(default_factory=lambda: datetime.utcnow())
revoked_at: Optional[datetime] = None
revoke_reason: Optional[str] = None
chronicle_cid: Optional[str] = None
async def create_delegation(delegation: DelegationDB) -> None:
async with AsyncSession(engine) as session:
session.add(delegation)
await session.commit()
async def get_delegation(delegation_id: str) -> Optional[DelegationDB]:
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(DelegationDB.delegation_id == delegation_id)
)
return result.first()
async def revoke_delegation(delegation_id: str, reason: str) -> bool:
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(
DelegationDB.delegation_id == delegation_id,
DelegationDB.status == "active",
)
)
d = result.first()
if not d:
return False
d.status = "revoked"
d.revoked_at = datetime.utcnow()
d.revoke_reason = reason
session.add(d)
await session.commit()
return True
async def get_active_delegations() -> list[DelegationDB]:
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(DelegationDB.status == "active")
)
return list(result.all())
async def increment_commands(delegation_id: str) -> int:
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(DelegationDB.delegation_id == delegation_id)
)
d = result.first()
if not d:
return 0
d.commands_executed += 1
session.add(d)
await session.commit()
return d.commands_executed
async def expire_stale() -> list[DelegationDB]:
"""Find and expire delegations past TTL or command limit."""
now = datetime.utcnow()
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(DelegationDB.status == "active")
)
expired = []
for d in result.all():
if now > d.expires_at or d.commands_executed >= d.max_commands:
d.status = "expired"
d.revoke_reason = (
"command_limit"
if d.commands_executed >= d.max_commands
else "ttl_elapsed"
)
d.revoked_at = now
session.add(d)
expired.append(d)
await session.commit()
return expired
async def revoke_by_delegator_ac(delegator_ac_id: str) -> int:
"""Cascading revocation — all delegations from a specific AC."""
now = datetime.utcnow()
async with AsyncSession(engine) as session:
result = await session.exec(
select(DelegationDB).where(
DelegationDB.status == "active",
DelegationDB.delegator_ac_id == delegator_ac_id,
)
)
count = 0
for d in result.all():
d.status = "revoked"
d.revoked_at = now
d.revoke_reason = "delegator_ac_revoked"
session.add(d)
count += 1
await session.commit()
return count

View file

@ -2,7 +2,7 @@ from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False)
model_config = SettingsConfigDict(env_file=".env", case_sensitive=False, extra="ignore")
broker_did: str = "did:web:gsap-broker.example.com"
broker_name: str = "fastapi-gsap"
ac_ttl_minutes: int = 30
@ -13,4 +13,31 @@ class Settings(BaseSettings):
database_url: str = "sqlite+aiosqlite:///./gsap_broker.db"
cors_origins: list[str] = ["http://localhost:3000", "http://localhost:8000"]
# ─── Delegation lifecycle (absorbed from llm-principal-broker) ───
# The delegation router lives in gsap_broker/delegations/ and shares
# the same async engine as the rest of GSAP. It used to be a separate
# service (llm-principal-broker) that called this broker over HTTP;
# now it's an in-process router that invokes the authorize handler
# directly. See gsap_broker/delegations/router.py.
# Keycloak Admin API (for the Keycloak agent registrar)
keycloak_url: str = "http://localhost:8080"
keycloak_realm: str = "substrate"
keycloak_admin_client_id: str = "llm-broker-admin"
keycloak_admin_client_secret: str = ""
# Agent registrar driver: auto | keycloak | entra | stub
agent_registrar: str = "auto"
# Microsoft Entra Agent ID
entra_tenant_id: str = ""
entra_client_id: str = ""
entra_client_secret: str = ""
entra_agent_blueprint_id: str = ""
# Delegation defaults
default_delegation_ttl_minutes: int = 60
default_max_commands: int = 500
max_delegation_depth: int = 1
settings = Settings()

View file

@ -21,6 +21,9 @@ dependencies = [
[project.optional-dependencies]
dev = ["pytest>=8.0", "pytest-asyncio>=0.23", "httpx>=0.27", "ruff>=0.4", "pytest-mock>=3.14"]
# Optional driver for the delegations module (Microsoft Entra Agent ID).
# Stub and Keycloak registrars work without this.
entra = ["msal>=1.28.0"]
[tool.hatch.build.targets.wheel]
packages = ["gsap_broker"]