# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Tests for credential resolver, backends, and session connectors.""" import pytest from datetime import datetime, timedelta, UTC from gsap_broker.credentials.resolver import ( BasculeCredential, Credential, CredentialResolver, CredentialResolutionError, KerberosCredential, NoBackendAvailable, OAuthCredential, SSHCertCredential, ) from gsap_broker.credentials.stub_backend import StubCredentialBackend from gsap_broker.connectors.base import ConnectorContext from gsap_broker.connectors.bascule import BasculeConnector from gsap_broker.connectors.powershell import PowerShellConnector from gsap_broker.connectors.ansible import AnsibleConnector from gsap_broker.connectors.orchestrator import WorkflowPlan, WorkflowStep from gsap_broker.routing.device_router import DeviceRouter, UnknownDevice AC_CONTEXT = { "gsap_context_id": "test-ac", "expires_at": (datetime.now(UTC) + timedelta(hours=1)).isoformat(), "accord": {"template": "test-ops"}, "principal": {"did": "did:web:test/p/alice"}, } # ── CredentialResolver ──────────────────────────────────────────── @pytest.mark.asyncio async def test_resolver_routes_to_correct_backend(): """TEST 2: Resolver routes each credential type to the stub backend.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) for cred_type, expected_cls in [ ("bascule_ac", BasculeCredential), ("kerberos", KerberosCredential), ("oauth", OAuthCredential), ("ssh_cert", SSHCertCredential), ]: cred = await resolver.resolve(cred_type, "target-1", AC_CONTEXT) assert isinstance(cred, expected_cls) assert cred.expires_at is not None assert not cred.expired @pytest.mark.asyncio async def test_resolver_rejects_unknown_type(): """TEST 3: Unknown credential type raises NoBackendAvailable.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) with pytest.raises(NoBackendAvailable) as exc_info: await resolver.resolve("quantum_key", "target", AC_CONTEXT) assert "quantum_key" in str(exc_info.value) @pytest.mark.asyncio async def test_bascule_credential_preserves_ac(): """TEST 4: Bascule credential passes AC through.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) cred = await resolver.resolve("bascule_ac", "target-1", AC_CONTEXT) assert isinstance(cred, BasculeCredential) assert cred.authorization_context == AC_CONTEXT @pytest.mark.asyncio async def test_stub_credentials_not_expired(): """TEST 5: Stub backend returns credentials that are not expired.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) for cred_type in ["bascule_ac", "kerberos", "oauth", "ssh_cert"]: cred = await resolver.resolve(cred_type, "target", AC_CONTEXT) assert not cred.expired # ── SessionConnector lifecycle ──────────────────────────────────── @pytest.mark.asyncio async def test_session_connector_success(): """TEST 6: Full lifecycle — resolve, connect, execute, disconnect.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) connector = BasculeConnector(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke("run-script", {"target": "node-1"}, ctx) assert result.success assert result.data["target"] == "node-1" assert result.data["command"] == "run-script" assert result.data["stub"] is True @pytest.mark.asyncio async def test_session_connector_missing_target(): """Session connector requires target parameter.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) connector = BasculeConnector(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke("run-script", {}, ctx) assert not result.success assert "target required" in result.error @pytest.mark.asyncio async def test_session_connector_transport_failure(): """TEST 7: Transport failure still calls disconnect (cleanup guarantee).""" from gsap_broker.connectors.session import SessionTransport, SessionConnector from gsap_broker.credentials.resolver import Credential disconnect_called = False class FailingTransport(SessionTransport): transport_id = "failing" async def connect(self, target, credential): pass async def execute(self, command, params=None): raise RuntimeError("Transport exploded") async def disconnect(self): nonlocal disconnect_called disconnect_called = True async def is_alive(self): return False class FailingConnector(SessionConnector): connector_id = "failing" corpus_entry_cid = "sha256:test" credential_type = "bascule_ac" transport_class = FailingTransport capability_mask = 1 declared_endpoints = [] resolver = CredentialResolver() resolver.register(StubCredentialBackend()) connector = FailingConnector(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke("crash", {"target": "node-1"}, ctx) assert not result.success assert "exploded" in result.error assert disconnect_called, "disconnect() must be called even on failure" # ── PowerShell connector ────────────────────────────────────────── @pytest.mark.asyncio async def test_powershell_connector_stub(): """PowerShell connector lifecycle with stubbed transport.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) connector = PowerShellConnector(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke("Get-Process", {"target": "win-srv-01:5986"}, ctx) assert result.success assert result.data["transport"] == "psrp" assert result.data["target"] == "win-srv-01:5986" # ── OrchestratorConnector (Ansible) ────────────────────────────── @pytest.mark.asyncio async def test_orchestrator_all_steps_succeed(): """TEST 8: All steps succeed → aggregate success.""" resolver = CredentialResolver() resolver.register(StubCredentialBackend()) connector = AnsibleConnector(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke( "playbook", {"playbook": "site.yml", "targets": ["host-1", "host-2"]}, ctx, ) assert result.success assert len(result.data["steps"]) == 1 assert result.data["steps"][0]["success"] is True @pytest.mark.asyncio async def test_orchestrator_required_step_fails(): """TEST 9: Required step fails → early termination, partial results.""" from gsap_broker.connectors.orchestrator import OrchestratorConnector, WorkflowPlan, WorkflowStep resolver = CredentialResolver() resolver.register(StubCredentialBackend()) class FailingOrchestrator(OrchestratorConnector): connector_id = "failing-orch" corpus_entry_cid = "sha256:test" capability_mask = 1 declared_endpoints = [] async def plan(self, operation, parameters, context): return WorkflowPlan(steps=[ WorkflowStep(name="step-1", command="ok", required=True), WorkflowStep(name="step-2", command="fail", required=True), WorkflowStep(name="step-3", command="never", required=True), ]) async def execute_step(self, step, context): if step.command == "fail": return {"success": False, "error": "boom"} return {"success": True} connector = FailingOrchestrator(credential_resolver=resolver) ctx = ConnectorContext(gsap_context_id="test-ac") result = await connector.invoke("run", {}, ctx) assert not result.success assert result.data["failed_at"] == "step-2" assert len(result.data["completed"]) == 2 # step-1 succeeded, step-2 failed # step-3 was never executed # ── DeviceRouter ────────────────────────────────────────────────── @pytest.mark.asyncio async def test_router_api_operation_routes_to_intune(): """TEST 12: API-mediated operations route to Intune.""" from gsap_broker.connectors.registry import ConnectorRegistry registry = ConnectorRegistry() router = DeviceRouter(connector_registry=registry) ctx = ConnectorContext(gsap_context_id="test-ac") connector_id, op = await router.route("get_compliance", "dev-1", ctx) assert connector_id == "intune" @pytest.mark.asyncio async def test_router_fleet_operation_routes_to_ansible(): """Fleet operations route to Ansible.""" from gsap_broker.connectors.registry import ConnectorRegistry registry = ConnectorRegistry() router = DeviceRouter(connector_registry=registry) ctx = ConnectorContext(gsap_context_id="test-ac") connector_id, op = await router.route("playbook", "fleet", ctx) assert connector_id == "ansible" @pytest.mark.asyncio async def test_router_unknown_device_raises(): """TEST 13: Unknown device raises clear error.""" from gsap_broker.connectors.registry import ConnectorRegistry registry = ConnectorRegistry() router = DeviceRouter(connector_registry=registry) ctx = ConnectorContext(gsap_context_id="test-ac") with pytest.raises(UnknownDevice): await router.route("exec", "mystery-host", ctx) @pytest.mark.asyncio async def test_router_invoke_missing_connector(): """Router invoke returns error when connector not registered.""" from gsap_broker.connectors.registry import ConnectorRegistry registry = ConnectorRegistry() router = DeviceRouter(connector_registry=registry) ctx = ConnectorContext(gsap_context_id="test-ac") result = await router.invoke("get_compliance", "dev-1", {}, ctx) assert not result.success assert "not registered" in result.error # ── Catalog conditional registration ───────────────────────────── @pytest.mark.asyncio async def test_connectors_absent_when_disabled(): """TEST 15: Optional connectors not in catalog when disabled.""" from httpx import AsyncClient, ASGITransport from gsap_broker.app import app async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: resp = await client.get("/connectors/") assert resp.status_code == 200 ids = [c["connector_id"] for c in resp.json()] # With default settings, only echo is registered assert "bascule" not in ids assert "powershell" not in ids assert "ansible" not in ids