Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.
Architecture:
FastAPI + SQLModel + Pydantic + async SQLite
Single container deployment: Dockerfile included
OpenAPI schema at /docs is the machine-readable GSAP contract
Broker Interface (§5):
POST /governance/authorize/ — Issue AC
GET /governance/authorize/{p}/ — Poll elevation
POST /governance/complete/ — Receive CR
GET /governance/session/{id}/ — View chain of custody
POST /governance/elevate/ — JIT elevation
GET /governance/drivers/ — List drivers
Identity Driver Interface (§2.2):
IdentityDriver — abstract base (ISV extension point)
KeycloakDriver — Keycloak implementation
DriverRegistry — driver lookup and registration
Chronicle integration (§1.4):
Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
Forgejo push event format for receiver compatibility
Models:
Pydantic schemas for AC, CR, Principal, Accord, Operation
SQLModel DB models for persistence
Tests: 6 async tests including full AC→CR cycle
695 lines across 27 files.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
39 lines
1.1 KiB
Python
39 lines
1.1 KiB
Python
"""Identity Driver Interface — GSAP §2.2. Zero framework imports."""
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional
|
|
|
|
@dataclass
|
|
class ElevationRequired:
|
|
role: str
|
|
activation_url: str
|
|
instructions: str
|
|
mechanism: str = "custom"
|
|
|
|
@dataclass
|
|
class AuthResult:
|
|
STATUS_AUTHORIZED = "authorized"
|
|
STATUS_PENDING_ELEVATION = "pending_elevation"
|
|
STATUS_DENIED = "denied"
|
|
|
|
status: str
|
|
principal_did: str = ""
|
|
display_name: str = ""
|
|
stable_id: str = ""
|
|
token_jti: str = ""
|
|
elevation_active: list = field(default_factory=list)
|
|
mfa_satisfied: bool = False
|
|
elevation_required: Optional[ElevationRequired] = None
|
|
denial_reason: str = ""
|
|
|
|
@property
|
|
def is_authorized(self): return self.status == self.STATUS_AUTHORIZED
|
|
@property
|
|
def needs_elevation(self): return self.status == self.STATUS_PENDING_ELEVATION
|
|
|
|
class IdentityDriver(ABC):
|
|
def __init__(self, config: dict): self.config = config
|
|
@abstractmethod
|
|
async def authenticate(self) -> AuthResult: ...
|
|
@abstractmethod
|
|
async def revoke(self, session_id: str) -> None: ...
|