fastapi-gsap/gsap_broker/models/gsap.py
Tyler J King cbc9ad73f7 feat: fastapi-gsap — lightweight GSAP broker PoC
Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.

Architecture:
  FastAPI + SQLModel + Pydantic + async SQLite
  Single container deployment: Dockerfile included
  OpenAPI schema at /docs is the machine-readable GSAP contract

Broker Interface (§5):
  POST   /governance/authorize/     — Issue AC
  GET    /governance/authorize/{p}/ — Poll elevation
  POST   /governance/complete/      — Receive CR
  GET    /governance/session/{id}/  — View chain of custody
  POST   /governance/elevate/       — JIT elevation
  GET    /governance/drivers/       — List drivers

Identity Driver Interface (§2.2):
  IdentityDriver — abstract base (ISV extension point)
  KeycloakDriver — Keycloak implementation
  DriverRegistry — driver lookup and registration

Chronicle integration (§1.4):
  Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
  Forgejo push event format for receiver compatibility

Models:
  Pydantic schemas for AC, CR, Principal, Accord, Operation
  SQLModel DB models for persistence

Tests: 6 async tests including full AC→CR cycle

695 lines across 27 files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:10:21 -04:00

121 lines
3.3 KiB
Python

"""GSAP Pydantic models — GCAP-SPEC-SHELLBOUND-BROKER-0001."""
from datetime import datetime
from enum import Enum
from typing import Optional
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
class ACStatus(str, Enum):
PENDING = "pending"
AUTHORIZED = "authorized"
CONSUMED = "consumed"
EXPIRED = "expired"
REVOKED = "revoked"
class Outcome(str, Enum):
COMPLETED = "completed"
FAILED = "failed"
VIOLATED = "violated"
TIMED_OUT = "timed_out"
class BehaviorStatus(str, Enum):
VERIFIED = "verified"
VIOLATED = "violated"
UNAVAILABLE = "unavailable"
class Principal(BaseModel):
did: str
display_name: str = ""
broker_session_id: str = ""
driver_id: str = ""
class Accord(BaseModel):
template: str
capability_mask: int = 3
class Operation(BaseModel):
playbook: str
corpus_entry_cid: str
parameters_cid: str
apply_authorized_cid: Optional[str] = None
class IdentityProof(BaseModel):
idp_vendor: str = "keycloak"
token_jti: str = ""
elevation_active: list[str] = []
mfa_satisfied: bool = False
class AuthorizationContext(BaseModel):
gsap_version: str = "0.1.0"
context_id: UUID = Field(default_factory=uuid4)
issued_at: datetime
expires_at: datetime
principal: Principal
accord: Accord
operation: Operation
identity_proof: IdentityProof
broker: dict = Field(default_factory=dict)
signature: Optional[dict] = None
class ChronicleEvidence(BaseModel):
session_id: Optional[str] = None
events: list[dict] = []
merkle_root: Optional[str] = None
class BehavioralAttestation(BaseModel):
status: BehaviorStatus = BehaviorStatus.UNAVAILABLE
observed_behavior_cid: Optional[str] = None
declared_behavior_cid: Optional[str] = None
class AuthorizeRequest(BaseModel):
playbook: str
corpus_entry_cid: str
parameters_cid: str
accord_template: str
driver_id: str
class AuthorizeResponse(BaseModel):
status: str
authorization_context: Optional[AuthorizationContext] = None
poll_token: Optional[str] = None
elevation_instructions: Optional[str] = None
activation_url: Optional[str] = None
class CompleteRequest(BaseModel):
context_id: UUID
outcome: Outcome
completed_at: datetime
failure_reason: Optional[str] = None
chronicle_session_id: Optional[str] = None
chronicle_evidence: ChronicleEvidence = Field(default_factory=ChronicleEvidence)
behavioral_attestation: BehavioralAttestation = Field(default_factory=BehavioralAttestation)
ffc: dict = Field(default_factory=dict)
signature: Optional[dict] = None
class CompleteResponse(BaseModel):
status: str = "received"
receipt_id: UUID
signature_verified: bool
chronicle_event_cid: Optional[str] = None
class SessionResponse(BaseModel):
context_id: UUID
principal_did: str
accord_template: str
playbook: str
status: ACStatus
issued_at: datetime
expires_at: datetime
consumed_at: Optional[datetime] = None
chronicle_event_cid: Optional[str] = None
completion_receipt: Optional[dict] = None
class ElevateRequest(BaseModel):
role_name: str
justification: str = ""
duration_minutes: int = Field(default=60, ge=5, le=480)
class ElevateResponse(BaseModel):
status: str
elevation_id: Optional[str] = None
message: str = ""