fastapi-gsap/tests/test_broker.py
Tyler J King cbc9ad73f7 feat: fastapi-gsap — lightweight GSAP broker PoC
Reference implementation of GCAP-SPEC-SHELLBOUND-BROKER-0001
in FastAPI. Designed for ISVs and enterprises implementing
the governed shell authorization protocol.

Architecture:
  FastAPI + SQLModel + Pydantic + async SQLite
  Single container deployment: Dockerfile included
  OpenAPI schema at /docs is the machine-readable GSAP contract

Broker Interface (§5):
  POST   /governance/authorize/     — Issue AC
  GET    /governance/authorize/{p}/ — Poll elevation
  POST   /governance/complete/      — Receive CR
  GET    /governance/session/{id}/  — View chain of custody
  POST   /governance/elevate/       — JIT elevation
  GET    /governance/drivers/       — List drivers

Identity Driver Interface (§2.2):
  IdentityDriver — abstract base (ISV extension point)
  KeycloakDriver — Keycloak implementation
  DriverRegistry — driver lookup and registration

Chronicle integration (§1.4):
  Optional CloudEvents emission via CHRONICLE_WEBHOOK_URL
  Forgejo push event format for receiver compatibility

Models:
  Pydantic schemas for AC, CR, Principal, Accord, Operation
  SQLModel DB models for persistence

Tests: 6 async tests including full AC→CR cycle

695 lines across 27 files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:10:21 -04:00

70 lines
3.3 KiB
Python

"""Tests for fastapi-gsap broker."""
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_health(client: AsyncClient):
resp = await client.get("/health/")
assert resp.status_code == 200
assert resp.json()["gsap_version"] == "0.1.0"
@pytest.mark.asyncio
async def test_list_drivers(client: AsyncClient):
resp = await client.get("/governance/drivers/")
assert resp.status_code == 200
assert "keycloak" in resp.json()["drivers"]
@pytest.mark.asyncio
async def test_authorize_bad_driver(client: AsyncClient):
resp = await client.post("/governance/authorize/", json={
"playbook": "test", "corpus_entry_cid": "sha256:a", "parameters_cid": "sha256:b",
"accord_template": "test", "driver_id": "nonexistent"})
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_authorize_no_token(client: AsyncClient):
resp = await client.post("/governance/authorize/", json={
"playbook": "test", "corpus_entry_cid": "sha256:a", "parameters_cid": "sha256:b",
"accord_template": "test", "driver_id": "keycloak"})
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_full_ac_cr_cycle(client: AsyncClient, mocker):
from gsap_broker.drivers.base import AuthResult
mocker.patch("gsap_broker.drivers.keycloak.KeycloakDriver.authenticate",
return_value=AuthResult(status=AuthResult.STATUS_AUTHORIZED,
principal_did="did:web:test/p/sam", token_jti="jti-1", mfa_satisfied=True))
auth_resp = await client.post("/governance/authorize/", json={
"playbook": "test-echo", "corpus_entry_cid": "sha256:" + "a"*64,
"parameters_cid": "sha256:" + "b"*64, "accord_template": "test-ops", "driver_id": "keycloak"})
assert auth_resp.status_code == 200
ctx_id = auth_resp.json()["authorization_context"]["context_id"]
cr_resp = await client.post("/governance/complete/", json={
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
assert cr_resp.status_code == 200
session_resp = await client.get(f"/governance/session/{ctx_id}/")
assert session_resp.status_code == 200
assert session_resp.json()["status"] == "consumed"
assert session_resp.json()["completion_receipt"]["outcome"] == "completed"
@pytest.mark.asyncio
async def test_consumed_ac_rejected(client: AsyncClient, mocker):
from gsap_broker.drivers.base import AuthResult
mocker.patch("gsap_broker.drivers.keycloak.KeycloakDriver.authenticate",
return_value=AuthResult(status=AuthResult.STATUS_AUTHORIZED,
principal_did="did:web:test/p/sam", token_jti="jti-2"))
auth_resp = await client.post("/governance/authorize/", json={
"playbook": "test", "corpus_entry_cid": "sha256:x", "parameters_cid": "sha256:y",
"accord_template": "test", "driver_id": "keycloak"})
ctx_id = auth_resp.json()["authorization_context"]["context_id"]
await client.post("/governance/complete/", json={
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
second = await client.post("/governance/complete/", json={
"context_id": ctx_id, "outcome": "completed", "completed_at": "2026-01-01T00:00:00Z"})
assert second.status_code == 404