All connectors registered conditionally based on settings. CredentialResolver with Entra backend (production) or Stub backend (dev mode). 15 new tests covering credential resolution, session lifecycle, orchestrator workflows, and device routing. Signed-off-by: Tyler King <tking@guildhouse.dev>
316 lines
11 KiB
Python
316 lines
11 KiB
Python
# Copyright 2026 Guildhouse Dev
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""Tests for credential resolver, backends, and session connectors."""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta, UTC
|
|
|
|
from gsap_broker.credentials.resolver import (
|
|
BasculeCredential,
|
|
Credential,
|
|
CredentialResolver,
|
|
CredentialResolutionError,
|
|
KerberosCredential,
|
|
NoBackendAvailable,
|
|
OAuthCredential,
|
|
SSHCertCredential,
|
|
)
|
|
from gsap_broker.credentials.stub_backend import StubCredentialBackend
|
|
from gsap_broker.connectors.base import ConnectorContext
|
|
from gsap_broker.connectors.bascule import BasculeConnector
|
|
from gsap_broker.connectors.powershell import PowerShellConnector
|
|
from gsap_broker.connectors.ansible import AnsibleConnector
|
|
from gsap_broker.connectors.orchestrator import WorkflowPlan, WorkflowStep
|
|
from gsap_broker.routing.device_router import DeviceRouter, UnknownDevice
|
|
|
|
|
|
AC_CONTEXT = {
|
|
"gsap_context_id": "test-ac",
|
|
"expires_at": (datetime.now(UTC) + timedelta(hours=1)).isoformat(),
|
|
"accord": {"template": "test-ops"},
|
|
"principal": {"did": "did:web:test/p/alice"},
|
|
}
|
|
|
|
|
|
# ── CredentialResolver ────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolver_routes_to_correct_backend():
|
|
"""TEST 2: Resolver routes each credential type to the stub backend."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
for cred_type, expected_cls in [
|
|
("bascule_ac", BasculeCredential),
|
|
("kerberos", KerberosCredential),
|
|
("oauth", OAuthCredential),
|
|
("ssh_cert", SSHCertCredential),
|
|
]:
|
|
cred = await resolver.resolve(cred_type, "target-1", AC_CONTEXT)
|
|
assert isinstance(cred, expected_cls)
|
|
assert cred.expires_at is not None
|
|
assert not cred.expired
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_resolver_rejects_unknown_type():
|
|
"""TEST 3: Unknown credential type raises NoBackendAvailable."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
with pytest.raises(NoBackendAvailable) as exc_info:
|
|
await resolver.resolve("quantum_key", "target", AC_CONTEXT)
|
|
assert "quantum_key" in str(exc_info.value)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bascule_credential_preserves_ac():
|
|
"""TEST 4: Bascule credential passes AC through."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
cred = await resolver.resolve("bascule_ac", "target-1", AC_CONTEXT)
|
|
assert isinstance(cred, BasculeCredential)
|
|
assert cred.authorization_context == AC_CONTEXT
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stub_credentials_not_expired():
|
|
"""TEST 5: Stub backend returns credentials that are not expired."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
for cred_type in ["bascule_ac", "kerberos", "oauth", "ssh_cert"]:
|
|
cred = await resolver.resolve(cred_type, "target", AC_CONTEXT)
|
|
assert not cred.expired
|
|
|
|
|
|
# ── SessionConnector lifecycle ────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_session_connector_success():
|
|
"""TEST 6: Full lifecycle — resolve, connect, execute, disconnect."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
connector = BasculeConnector(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke("run-script", {"target": "node-1"}, ctx)
|
|
|
|
assert result.success
|
|
assert result.data["target"] == "node-1"
|
|
assert result.data["command"] == "run-script"
|
|
assert result.data["stub"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_session_connector_missing_target():
|
|
"""Session connector requires target parameter."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
connector = BasculeConnector(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke("run-script", {}, ctx)
|
|
assert not result.success
|
|
assert "target required" in result.error
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_session_connector_transport_failure():
|
|
"""TEST 7: Transport failure still calls disconnect (cleanup guarantee)."""
|
|
from gsap_broker.connectors.session import SessionTransport, SessionConnector
|
|
from gsap_broker.credentials.resolver import Credential
|
|
|
|
disconnect_called = False
|
|
|
|
class FailingTransport(SessionTransport):
|
|
transport_id = "failing"
|
|
|
|
async def connect(self, target, credential):
|
|
pass
|
|
|
|
async def execute(self, command, params=None):
|
|
raise RuntimeError("Transport exploded")
|
|
|
|
async def disconnect(self):
|
|
nonlocal disconnect_called
|
|
disconnect_called = True
|
|
|
|
async def is_alive(self):
|
|
return False
|
|
|
|
class FailingConnector(SessionConnector):
|
|
connector_id = "failing"
|
|
corpus_entry_cid = "sha256:test"
|
|
credential_type = "bascule_ac"
|
|
transport_class = FailingTransport
|
|
capability_mask = 1
|
|
declared_endpoints = []
|
|
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
connector = FailingConnector(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke("crash", {"target": "node-1"}, ctx)
|
|
|
|
assert not result.success
|
|
assert "exploded" in result.error
|
|
assert disconnect_called, "disconnect() must be called even on failure"
|
|
|
|
|
|
# ── PowerShell connector ──────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_powershell_connector_stub():
|
|
"""PowerShell connector lifecycle with stubbed transport."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
connector = PowerShellConnector(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke("Get-Process", {"target": "win-srv-01:5986"}, ctx)
|
|
|
|
assert result.success
|
|
assert result.data["transport"] == "psrp"
|
|
assert result.data["target"] == "win-srv-01:5986"
|
|
|
|
|
|
# ── OrchestratorConnector (Ansible) ──────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orchestrator_all_steps_succeed():
|
|
"""TEST 8: All steps succeed → aggregate success."""
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
connector = AnsibleConnector(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke(
|
|
"playbook",
|
|
{"playbook": "site.yml", "targets": ["host-1", "host-2"]},
|
|
ctx,
|
|
)
|
|
|
|
assert result.success
|
|
assert len(result.data["steps"]) == 1
|
|
assert result.data["steps"][0]["success"] is True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_orchestrator_required_step_fails():
|
|
"""TEST 9: Required step fails → early termination, partial results."""
|
|
from gsap_broker.connectors.orchestrator import OrchestratorConnector, WorkflowPlan, WorkflowStep
|
|
|
|
resolver = CredentialResolver()
|
|
resolver.register(StubCredentialBackend())
|
|
|
|
class FailingOrchestrator(OrchestratorConnector):
|
|
connector_id = "failing-orch"
|
|
corpus_entry_cid = "sha256:test"
|
|
capability_mask = 1
|
|
declared_endpoints = []
|
|
|
|
async def plan(self, operation, parameters, context):
|
|
return WorkflowPlan(steps=[
|
|
WorkflowStep(name="step-1", command="ok", required=True),
|
|
WorkflowStep(name="step-2", command="fail", required=True),
|
|
WorkflowStep(name="step-3", command="never", required=True),
|
|
])
|
|
|
|
async def execute_step(self, step, context):
|
|
if step.command == "fail":
|
|
return {"success": False, "error": "boom"}
|
|
return {"success": True}
|
|
|
|
connector = FailingOrchestrator(credential_resolver=resolver)
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
|
|
result = await connector.invoke("run", {}, ctx)
|
|
|
|
assert not result.success
|
|
assert result.data["failed_at"] == "step-2"
|
|
assert len(result.data["completed"]) == 2 # step-1 succeeded, step-2 failed
|
|
# step-3 was never executed
|
|
|
|
|
|
# ── DeviceRouter ──────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_api_operation_routes_to_intune():
|
|
"""TEST 12: API-mediated operations route to Intune."""
|
|
from gsap_broker.connectors.registry import ConnectorRegistry
|
|
registry = ConnectorRegistry()
|
|
router = DeviceRouter(connector_registry=registry)
|
|
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
connector_id, op = await router.route("get_compliance", "dev-1", ctx)
|
|
assert connector_id == "intune"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_fleet_operation_routes_to_ansible():
|
|
"""Fleet operations route to Ansible."""
|
|
from gsap_broker.connectors.registry import ConnectorRegistry
|
|
registry = ConnectorRegistry()
|
|
router = DeviceRouter(connector_registry=registry)
|
|
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
connector_id, op = await router.route("playbook", "fleet", ctx)
|
|
assert connector_id == "ansible"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_unknown_device_raises():
|
|
"""TEST 13: Unknown device raises clear error."""
|
|
from gsap_broker.connectors.registry import ConnectorRegistry
|
|
registry = ConnectorRegistry()
|
|
router = DeviceRouter(connector_registry=registry)
|
|
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
with pytest.raises(UnknownDevice):
|
|
await router.route("exec", "mystery-host", ctx)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_invoke_missing_connector():
|
|
"""Router invoke returns error when connector not registered."""
|
|
from gsap_broker.connectors.registry import ConnectorRegistry
|
|
registry = ConnectorRegistry()
|
|
router = DeviceRouter(connector_registry=registry)
|
|
|
|
ctx = ConnectorContext(gsap_context_id="test-ac")
|
|
result = await router.invoke("get_compliance", "dev-1", {}, ctx)
|
|
assert not result.success
|
|
assert "not registered" in result.error
|
|
|
|
|
|
# ── Catalog conditional registration ─────────────────────────────
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connectors_absent_when_disabled():
|
|
"""TEST 15: Optional connectors not in catalog when disabled."""
|
|
from httpx import AsyncClient, ASGITransport
|
|
from gsap_broker.app import app
|
|
|
|
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
|
|
resp = await client.get("/connectors/")
|
|
assert resp.status_code == 200
|
|
ids = [c["connector_id"] for c in resp.json()]
|
|
# With default settings, only echo is registered
|
|
assert "bascule" not in ids
|
|
assert "powershell" not in ids
|
|
assert "ansible" not in ids
|