Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.
Architecture:
FastAPI + SQLModel + Pydantic + async SQLite
Single container deployment: Dockerfile included
OpenAPI schema at /docs is the machine-readable GSAP contract
Broker Interface (§5):
POST /governance/authorize/ — Issue AC
GET /governance/authorize/{p}/ — Poll elevation
POST /governance/complete/ — Receive CR
GET /governance/session/{id}/ — View chain of custody
POST /governance/elevate/ — JIT elevation
GET /governance/drivers/ — List drivers
Identity Driver Interface (§2.2):
IdentityDriver — abstract base (ISV extension point)
KeycloakDriver — Keycloak implementation
DriverRegistry — driver lookup and registration
Chronicle integration (§1.4):
Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
Forgejo push event format for receiver compatibility
Models:
Pydantic schemas for AC, CR, Principal, Accord, Operation
SQLModel DB models for persistence
Tests: 6 async tests including full AC→CR cycle
695 lines across 27 files.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
43 lines
2 KiB
Python
43 lines
2 KiB
Python
"""Keycloak identity driver — GSAP §2.2."""
|
|
import logging
|
|
from .base import IdentityDriver, AuthResult, ElevationRequired
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class KeycloakDriver(IdentityDriver):
|
|
async def authenticate(self) -> AuthResult:
|
|
token_data = self.config.get("_token_data", {})
|
|
if not token_data:
|
|
return AuthResult(status=AuthResult.STATUS_DENIED, denial_reason="No token in context.")
|
|
|
|
realm_roles = token_data.get("realm_access", {}).get("roles", [])
|
|
requested_accord = self.config.get("requested_accord", "")
|
|
required_role = self.config.get("accord_roles", {}).get(requested_accord, "")
|
|
suffix = self.config.get("elevated_suffix", "-elevated")
|
|
elevation_active = [r for r in realm_roles if r.endswith(suffix)]
|
|
|
|
if required_role and required_role not in realm_roles:
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_PENDING_ELEVATION,
|
|
elevation_required=ElevationRequired(
|
|
role=required_role,
|
|
activation_url="/governance/elevate/",
|
|
instructions=f"Request elevation to '{required_role}' via POST /governance/elevate/",
|
|
))
|
|
|
|
stable_id = token_data.get("sub", "")
|
|
template = self.config.get("did_template", "did:web:{domain}/principal/{alias}")
|
|
domain = self.config.get("domain", "example.com")
|
|
alias = token_data.get("preferred_username", stable_id)
|
|
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_AUTHORIZED,
|
|
principal_did=template.format(stable_id=stable_id, domain=domain, alias=alias),
|
|
display_name=token_data.get("name", alias),
|
|
stable_id=stable_id, token_jti=token_data.get("jti", ""),
|
|
elevation_active=elevation_active,
|
|
mfa_satisfied="otp" in token_data.get("amr", []) or "mfa" in token_data.get("amr", []),
|
|
)
|
|
|
|
async def revoke(self, session_id: str) -> None:
|
|
logger.info(f"Keycloak revoke: {session_id}")
|