fastapi-gsap/tests/test_security.py
Tyler J King e744336385 fix: capability enforcement, credential safety, atomic delegations, input validation
C-6: ConnectorRuntime enforces capability_mask per operation.
     READ-only ACs cannot invoke MUTATE operations (wipe, lock, retire).
C-7: AC validated against database (exists, active, not expired)
     before connector invocation.
C-9: Delegated AC capability bounded by delegator's capability.
C-10: Command counter uses atomic SQL increment with limit check.
M-23: expire_stale() uses same atomic SQL pattern.

H-1: Sensitive credential fields hidden from repr/logs via repr=False.
H-2: Stub backend requires ALLOW_STUB_CREDENTIALS=true to activate.
H-3: Kerberos backend raises CredentialResolutionError instead of
     returning stub ticket.
H-4: Chronicle INTENT emitted before execution, RESULT after.
H-5: device_id validated as UUID before Graph API URL interpolation.
H-8: ConnectorRuntime enforces governance for all connector invocations.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 08:13:27 -04:00

156 lines
6 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Security regression tests for audit findings C-2, C-6, C-7, C-9, H-1, H-5."""
import pytest
from datetime import datetime, timedelta, UTC
from unittest.mock import AsyncMock, MagicMock
from gsap_broker.connectors.base import CAP_MUTATE, CAP_READ, ConnectorContext, ConnectorResult
from gsap_broker.connectors.intune import IntuneConnector, _validate_device_id
from gsap_broker.credentials.resolver import (
BasculeCredential, KerberosCredential, OAuthCredential, SSHCertCredential,
)
# ── H-1: Credential repr does not leak secrets ──────────────────
def test_oauth_credential_repr_hides_token():
"""H-1: access_token must not appear in repr."""
cred = OAuthCredential(
target="target", expires_at=datetime.now(UTC) + timedelta(hours=1),
access_token="super-secret-token-123",
)
r = repr(cred)
assert "super-secret-token-123" not in r
def test_kerberos_credential_repr_hides_ticket():
"""H-1: ticket bytes must not appear in repr."""
cred = KerberosCredential(
target="target", expires_at=datetime.now(UTC) + timedelta(hours=1),
ticket=b"SECRET_KERBEROS_TICKET_BYTES",
)
r = repr(cred)
assert "SECRET_KERBEROS_TICKET_BYTES" not in r
def test_ssh_credential_repr_hides_key():
"""H-1: private_key must not appear in repr."""
cred = SSHCertCredential(
target="target", expires_at=datetime.now(UTC) + timedelta(hours=1),
certificate="cert-data", private_key="PRIVATE-KEY-MATERIAL",
)
r = repr(cred)
assert "PRIVATE-KEY-MATERIAL" not in r
# ── H-5: device_id path traversal rejected ──────────────────────
def test_device_id_path_traversal_rejected():
"""H-5: path traversal in device_id is rejected."""
with pytest.raises(ValueError, match="UUID format"):
_validate_device_id("../../users/admin")
def test_device_id_valid_uuid_accepted():
"""H-5: valid UUID device_id is accepted."""
result = _validate_device_id("550e8400-e29b-41d4-a716-446655440000")
assert result == "550e8400-e29b-41d4-a716-446655440000"
def test_device_id_empty_rejected():
with pytest.raises(ValueError, match="device_id required"):
_validate_device_id("")
def test_device_id_not_uuid_rejected():
with pytest.raises(ValueError, match="UUID format"):
_validate_device_id("not-a-uuid")
# ── C-6: capability_mask enforcement ─────────────────────────────
@pytest.mark.asyncio
async def test_read_only_ac_cannot_invoke_wipe():
"""C-6: READ-only AC must be denied for MUTATE operations."""
mock_graph = MagicMock()
mock_graph.tenant_id = "t"
mock_graph.client_id = "c"
connector = IntuneConnector(graph_client=mock_graph)
# Verify the connector declares wipe as MUTATE
assert connector.capability_for_operation("wipe_device") == CAP_MUTATE
assert connector.capability_for_operation("remote_lock") == CAP_MUTATE
assert connector.capability_for_operation("retire_device") == CAP_MUTATE
# Verify READ operations are READ
assert connector.capability_for_operation("list_devices") == CAP_READ
assert connector.capability_for_operation("get_compliance") == CAP_READ
# ── C-9: delegation capability bounding ──────────────────────────
@pytest.mark.asyncio
async def test_delegation_capability_exceeding_delegator_rejected():
"""C-9: delegated capability cannot exceed delegator's."""
from gsap_broker.delegations.lifecycle import DelegationManager, _capability_mask_for
from gsap_broker.delegations.models import DelegationRequest, DelegationScope
manager = DelegationManager()
request = DelegationRequest(
delegator_ac_id="test-ac",
agent_type="claude-code",
scope=DelegationScope(capability_ceiling="CAP_ADMIN"),
)
# Delegator has only READ capability (mask=1)
with pytest.raises(ValueError, match="exceeds delegator"):
await manager.create_delegation(
request, delegator_did="did:web:test/p/alice",
delegator_capability_mask=CAP_READ,
)
# ── C-2: on_behalf_of gate ───────────────────────────────────────
@pytest.mark.asyncio
async def test_on_behalf_of_without_impersonate_role_rejected(mocker):
"""C-2: on_behalf_of without gsap:impersonate role is rejected."""
from httpx import AsyncClient, ASGITransport
from sqlmodel import SQLModel
from sqlalchemy.ext.asyncio import create_async_engine
from gsap_broker.app import app
from gsap_broker import db as db_module
from gsap_broker.drivers.base import AuthResult
engine = create_async_engine("sqlite+aiosqlite:///./test_security.db")
db_module.engine = engine
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
try:
mocker.patch(
"gsap_broker.drivers.keycloak.KeycloakDriver.authenticate",
return_value=AuthResult(
status=AuthResult.STATUS_AUTHORIZED,
principal_did="did:web:test/p/alice",
token_jti="jti",
),
)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
resp = await client.post("/governance/authorize/", json={
"playbook": "test",
"corpus_entry_cid": "sha256:" + "a" * 64,
"parameters_cid": "sha256:" + "b" * 64,
"accord_template": "test",
"driver_id": "keycloak",
"on_behalf_of": "did:web:test/p/admin",
})
assert resp.status_code == 403
assert "gsap:impersonate" in resp.json()["detail"]
finally:
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.drop_all)