Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.
Architecture:
FastAPI + SQLModel + Pydantic + async SQLite
Single container deployment: Dockerfile included
OpenAPI schema at /docs is the machine-readable GSAP contract
Broker Interface (§5):
POST /governance/authorize/ — Issue AC
GET /governance/authorize/{p}/ — Poll elevation
POST /governance/complete/ — Receive CR
GET /governance/session/{id}/ — View chain of custody
POST /governance/elevate/ — JIT elevation
GET /governance/drivers/ — List drivers
Identity Driver Interface (§2.2):
IdentityDriver — abstract base (ISV extension point)
KeycloakDriver — Keycloak implementation
DriverRegistry — driver lookup and registration
Chronicle integration (§1.4):
Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
Forgejo push event format for receiver compatibility
Models:
Pydantic schemas for AC, CR, Principal, Accord, Operation
SQLModel DB models for persistence
Tests: 6 async tests including full AC→CR cycle
695 lines across 27 files.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
121 lines
3.3 KiB
Python
121 lines
3.3 KiB
Python
"""GSAP Pydantic models — GCAP-SPEC-SHELLBOUND-BROKER-0001."""
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Optional
|
|
from uuid import UUID, uuid4
|
|
from pydantic import BaseModel, Field
|
|
|
|
class ACStatus(str, Enum):
|
|
PENDING = "pending"
|
|
AUTHORIZED = "authorized"
|
|
CONSUMED = "consumed"
|
|
EXPIRED = "expired"
|
|
REVOKED = "revoked"
|
|
|
|
class Outcome(str, Enum):
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
VIOLATED = "violated"
|
|
TIMED_OUT = "timed_out"
|
|
|
|
class BehaviorStatus(str, Enum):
|
|
VERIFIED = "verified"
|
|
VIOLATED = "violated"
|
|
UNAVAILABLE = "unavailable"
|
|
|
|
class Principal(BaseModel):
|
|
did: str
|
|
display_name: str = ""
|
|
broker_session_id: str = ""
|
|
driver_id: str = ""
|
|
|
|
class Accord(BaseModel):
|
|
template: str
|
|
capability_mask: int = 3
|
|
|
|
class Operation(BaseModel):
|
|
playbook: str
|
|
corpus_entry_cid: str
|
|
parameters_cid: str
|
|
apply_authorized_cid: Optional[str] = None
|
|
|
|
class IdentityProof(BaseModel):
|
|
idp_vendor: str = "keycloak"
|
|
token_jti: str = ""
|
|
elevation_active: list[str] = []
|
|
mfa_satisfied: bool = False
|
|
|
|
class AuthorizationContext(BaseModel):
|
|
gsap_version: str = "0.1.0"
|
|
context_id: UUID = Field(default_factory=uuid4)
|
|
issued_at: datetime
|
|
expires_at: datetime
|
|
principal: Principal
|
|
accord: Accord
|
|
operation: Operation
|
|
identity_proof: IdentityProof
|
|
broker: dict = Field(default_factory=dict)
|
|
signature: Optional[dict] = None
|
|
|
|
class ChronicleEvidence(BaseModel):
|
|
session_id: Optional[str] = None
|
|
events: list[dict] = []
|
|
merkle_root: Optional[str] = None
|
|
|
|
class BehavioralAttestation(BaseModel):
|
|
status: BehaviorStatus = BehaviorStatus.UNAVAILABLE
|
|
observed_behavior_cid: Optional[str] = None
|
|
declared_behavior_cid: Optional[str] = None
|
|
|
|
class AuthorizeRequest(BaseModel):
|
|
playbook: str
|
|
corpus_entry_cid: str
|
|
parameters_cid: str
|
|
accord_template: str
|
|
driver_id: str
|
|
|
|
class AuthorizeResponse(BaseModel):
|
|
status: str
|
|
authorization_context: Optional[AuthorizationContext] = None
|
|
poll_token: Optional[str] = None
|
|
elevation_instructions: Optional[str] = None
|
|
activation_url: Optional[str] = None
|
|
|
|
class CompleteRequest(BaseModel):
|
|
context_id: UUID
|
|
outcome: Outcome
|
|
completed_at: datetime
|
|
failure_reason: Optional[str] = None
|
|
chronicle_session_id: Optional[str] = None
|
|
chronicle_evidence: ChronicleEvidence = Field(default_factory=ChronicleEvidence)
|
|
behavioral_attestation: BehavioralAttestation = Field(default_factory=BehavioralAttestation)
|
|
ffc: dict = Field(default_factory=dict)
|
|
signature: Optional[dict] = None
|
|
|
|
class CompleteResponse(BaseModel):
|
|
status: str = "received"
|
|
receipt_id: UUID
|
|
signature_verified: bool
|
|
chronicle_event_cid: Optional[str] = None
|
|
|
|
class SessionResponse(BaseModel):
|
|
context_id: UUID
|
|
principal_did: str
|
|
accord_template: str
|
|
playbook: str
|
|
status: ACStatus
|
|
issued_at: datetime
|
|
expires_at: datetime
|
|
consumed_at: Optional[datetime] = None
|
|
chronicle_event_cid: Optional[str] = None
|
|
completion_receipt: Optional[dict] = None
|
|
|
|
class ElevateRequest(BaseModel):
|
|
role_name: str
|
|
justification: str = ""
|
|
duration_minutes: int = Field(default=60, ge=5, le=480)
|
|
|
|
class ElevateResponse(BaseModel):
|
|
status: str
|
|
elevation_id: Optional[str] = None
|
|
message: str = ""
|