# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Session-based connector framework. Session connectors establish stateful connections to target endpoints (SSH, WinRM/PSRP, Shellstream/Bascule) and execute commands over them. Credential lifecycle: 1. ``SessionConnector.invoke()`` is called with an operation and target. 2. The connector calls ``CredentialResolver.resolve()`` to acquire a short-lived, scoped credential for that target. 3. The credential is passed to ``SessionTransport.connect()`` which uses it to establish the session. 4. The command is executed via ``SessionTransport.execute()``. 5. ``SessionTransport.disconnect()`` is called in a finally block — guaranteed even on failure. 6. The credential goes out of scope and is garbage-collected. No reference is stored anywhere in the broker. Rust port note: ``SessionTransport`` maps to an async trait with an associated error type. ``SessionConnector`` becomes a generic struct parameterized by the transport type. The finally-block cleanup maps to Drop + an async shutdown method (or a wrapper that calls disconnect on drop via ``tokio::spawn``). """ from __future__ import annotations import logging from abc import ABC, abstractmethod from typing import Any, Optional, Type from gsap_broker.connectors.base import ConnectorContext, ConnectorPlugin, ConnectorResult from gsap_broker.credentials.resolver import Credential, CredentialResolver logger = logging.getLogger(__name__) class SessionTransport(ABC): """A stateful connection to a target endpoint. Implementations wrap protocol-specific clients: - ``BasculeTransport``: Shellstream via Bascule proxy - ``PowerShellTransport``: PSRP via pypsrp - ``SSHTransport``: SSH via asyncssh (future) Transports are ephemeral — created per invocation, not pooled. """ transport_id: str = "" @abstractmethod async def connect(self, target: str, credential: Credential) -> None: """Establish the session using the provided credential.""" ... @abstractmethod async def execute(self, command: str, params: Optional[dict[str, Any]] = None) -> dict[str, Any]: """Execute a command over the established session. Returns a dict with at minimum ``stdout``, ``stderr``, ``exit_code`` keys (for shell transports) or transport-specific structured output. """ ... @abstractmethod async def disconnect(self) -> None: """Tear down the session. MUST be idempotent.""" ... @abstractmethod async def is_alive(self) -> bool: """Check if the session is still usable.""" ... class SessionConnector(ConnectorPlugin): """Base for connectors that establish sessions to endpoints. Subclasses set ``credential_type`` and ``transport_class`` to wire the connector to a specific transport and credential backend. The ``invoke()`` method handles the full lifecycle: credential acquisition → transport connect → execute → disconnect, with guaranteed cleanup on failure. """ credential_type: str = "" transport_class: Type[SessionTransport] = SessionTransport # overridden by subclass def __init__(self, credential_resolver: CredentialResolver): self._resolver = credential_resolver async def invoke( self, operation: str, parameters: dict[str, Any], context: ConnectorContext ) -> ConnectorResult: target = parameters.get("target", "") if not target: return ConnectorResult(success=False, error="target required for session connector") # Build an AC-like context dict for the resolver. ac_context = { "gsap_context_id": context.gsap_context_id, "accord": {"template": getattr(self, "accord_template", "")}, } try: credential = await self._resolver.resolve( self.credential_type, target, ac_context ) except Exception as e: return ConnectorResult(success=False, error=f"Credential resolution failed: {e}") transport = self.transport_class() try: await transport.connect(target, credential) result = await transport.execute(operation, parameters) return ConnectorResult(success=True, data=result) except Exception as e: logger.error("Session connector %s failed: %s", self.connector_id, e) return ConnectorResult(success=False, error=str(e)) finally: try: await transport.disconnect() except Exception as cleanup_err: logger.warning( "Transport disconnect failed for %s: %s", self.connector_id, cleanup_err, ) def health_check(self) -> bool: return True