fastapi-gsap/gsap_broker/drivers/base.py
Tyler J King cbc9ad73f7 feat: fastapi-gsap — lightweight GSAP broker PoC
Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.

Architecture:
  FastAPI + SQLModel + Pydantic + async SQLite
  Single container deployment: Dockerfile included
  OpenAPI schema at /docs is the machine-readable GSAP contract

Broker Interface (§5):
  POST   /governance/authorize/     — Issue AC
  GET    /governance/authorize/{p}/ — Poll elevation
  POST   /governance/complete/      — Receive CR
  GET    /governance/session/{id}/  — View chain of custody
  POST   /governance/elevate/       — JIT elevation
  GET    /governance/drivers/       — List drivers

Identity Driver Interface (§2.2):
  IdentityDriver — abstract base (ISV extension point)
  KeycloakDriver — Keycloak implementation
  DriverRegistry — driver lookup and registration

Chronicle integration (§1.4):
  Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
  Forgejo push event format for receiver compatibility

Models:
  Pydantic schemas for AC, CR, Principal, Accord, Operation
  SQLModel DB models for persistence

Tests: 6 async tests including full AC→CR cycle

695 lines across 27 files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:10:21 -04:00

39 lines
1.1 KiB
Python

"""Identity Driver Interface — GSAP §2.2. Zero framework imports."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ElevationRequired:
role: str
activation_url: str
instructions: str
mechanism: str = "custom"
@dataclass
class AuthResult:
STATUS_AUTHORIZED = "authorized"
STATUS_PENDING_ELEVATION = "pending_elevation"
STATUS_DENIED = "denied"
status: str
principal_did: str = ""
display_name: str = ""
stable_id: str = ""
token_jti: str = ""
elevation_active: list = field(default_factory=list)
mfa_satisfied: bool = False
elevation_required: Optional[ElevationRequired] = None
denial_reason: str = ""
@property
def is_authorized(self): return self.status == self.STATUS_AUTHORIZED
@property
def needs_elevation(self): return self.status == self.STATUS_PENDING_ELEVATION
class IdentityDriver(ABC):
def __init__(self, config: dict): self.config = config
@abstractmethod
async def authenticate(self) -> AuthResult: ...
@abstractmethod
async def revoke(self, session_id: str) -> None: ...