Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.
Architecture:
FastAPI + SQLModel + Pydantic + async SQLite
Single container deployment: Dockerfile included
OpenAPI schema at /docs is the machine-readable GSAP contract
Broker Interface (§5):
POST /governance/authorize/ — Issue AC
GET /governance/authorize/{p}/ — Poll elevation
POST /governance/complete/ — Receive CR
GET /governance/session/{id}/ — View chain of custody
POST /governance/elevate/ — JIT elevation
GET /governance/drivers/ — List drivers
Identity Driver Interface (§2.2):
IdentityDriver — abstract base (ISV extension point)
KeycloakDriver — Keycloak implementation
DriverRegistry — driver lookup and registration
Chronicle integration (§1.4):
Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
Forgejo push event format for receiver compatibility
Models:
Pydantic schemas for AC, CR, Principal, Accord, Operation
SQLModel DB models for persistence
Tests: 6 async tests including full AC→CR cycle
695 lines across 27 files.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
70 lines
3.3 KiB
Python
70 lines
3.3 KiB
Python
"""Tests for fastapi-gsap broker."""
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_health(client: AsyncClient):
|
|
resp = await client.get("/health/")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["gsap_version"] == "0.1.0"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_list_drivers(client: AsyncClient):
|
|
resp = await client.get("/governance/drivers/")
|
|
assert resp.status_code == 200
|
|
assert "keycloak" in resp.json()["drivers"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_authorize_bad_driver(client: AsyncClient):
|
|
resp = await client.post("/governance/authorize/", json={
|
|
"playbook": "test", "corpus_entry_cid": "sha256:a", "parameters_cid": "sha256:b",
|
|
"accord_template": "test", "driver_id": "nonexistent"})
|
|
assert resp.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_authorize_no_token(client: AsyncClient):
|
|
resp = await client.post("/governance/authorize/", json={
|
|
"playbook": "test", "corpus_entry_cid": "sha256:a", "parameters_cid": "sha256:b",
|
|
"accord_template": "test", "driver_id": "keycloak"})
|
|
assert resp.status_code == 403
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_full_ac_cr_cycle(client: AsyncClient, mocker):
|
|
from gsap_broker.drivers.base import AuthResult
|
|
mocker.patch("gsap_broker.drivers.keycloak.KeycloakDriver.authenticate",
|
|
return_value=AuthResult(status=AuthResult.STATUS_AUTHORIZED,
|
|
principal_did="did:web:test/p/sam", token_jti="jti-1", mfa_satisfied=True))
|
|
|
|
auth_resp = await client.post("/governance/authorize/", json={
|
|
"playbook": "test-echo", "corpus_entry_cid": "sha256:" + "a"*64,
|
|
"parameters_cid": "sha256:" + "b"*64, "accord_template": "test-ops", "driver_id": "keycloak"})
|
|
assert auth_resp.status_code == 200
|
|
ctx_id = auth_resp.json()["authorization_context"]["context_id"]
|
|
|
|
cr_resp = await client.post("/governance/complete/", json={
|
|
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
|
|
assert cr_resp.status_code == 200
|
|
|
|
session_resp = await client.get(f"/governance/session/{ctx_id}/")
|
|
assert session_resp.status_code == 200
|
|
assert session_resp.json()["status"] == "consumed"
|
|
assert session_resp.json()["completion_receipt"]["outcome"] == "completed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_consumed_ac_rejected(client: AsyncClient, mocker):
|
|
from gsap_broker.drivers.base import AuthResult
|
|
mocker.patch("gsap_broker.drivers.keycloak.KeycloakDriver.authenticate",
|
|
return_value=AuthResult(status=AuthResult.STATUS_AUTHORIZED,
|
|
principal_did="did:web:test/p/sam", token_jti="jti-2"))
|
|
|
|
auth_resp = await client.post("/governance/authorize/", json={
|
|
"playbook": "test", "corpus_entry_cid": "sha256:x", "parameters_cid": "sha256:y",
|
|
"accord_template": "test", "driver_id": "keycloak"})
|
|
ctx_id = auth_resp.json()["authorization_context"]["context_id"]
|
|
|
|
await client.post("/governance/complete/", json={
|
|
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
|
|
|
|
second = await client.post("/governance/complete/", json={
|
|
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
|
|
assert second.status_code == 404
|