diff --git a/gsap_broker/connectors/session.py b/gsap_broker/connectors/session.py new file mode 100644 index 0000000..e9b9a82 --- /dev/null +++ b/gsap_broker/connectors/session.py @@ -0,0 +1,138 @@ +# Copyright 2026 Guildhouse Dev +# SPDX-License-Identifier: Apache-2.0 + +"""Session-based connector framework. + +Session connectors establish stateful connections to target endpoints +(SSH, WinRM/PSRP, Shellstream/Bascule) and execute commands over them. + +Credential lifecycle: + 1. ``SessionConnector.invoke()`` is called with an operation and + target. + 2. The connector calls ``CredentialResolver.resolve()`` to acquire + a short-lived, scoped credential for that target. + 3. The credential is passed to ``SessionTransport.connect()`` which + uses it to establish the session. + 4. The command is executed via ``SessionTransport.execute()``. + 5. ``SessionTransport.disconnect()`` is called in a finally block + — guaranteed even on failure. + 6. The credential goes out of scope and is garbage-collected. + No reference is stored anywhere in the broker. + +Rust port note: + ``SessionTransport`` maps to an async trait with an associated + error type. ``SessionConnector`` becomes a generic struct + parameterized by the transport type. The finally-block cleanup + maps to Drop + an async shutdown method (or a wrapper that calls + disconnect on drop via ``tokio::spawn``). +""" + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import Any, Optional, Type + +from gsap_broker.connectors.base import ConnectorContext, ConnectorPlugin, ConnectorResult +from gsap_broker.credentials.resolver import Credential, CredentialResolver + +logger = logging.getLogger(__name__) + + +class SessionTransport(ABC): + """A stateful connection to a target endpoint. + + Implementations wrap protocol-specific clients: + - ``BasculeTransport``: Shellstream via Bascule proxy + - ``PowerShellTransport``: PSRP via pypsrp + - ``SSHTransport``: SSH via asyncssh (future) + + Transports are ephemeral — created per invocation, not pooled. + """ + + transport_id: str = "" + + @abstractmethod + async def connect(self, target: str, credential: Credential) -> None: + """Establish the session using the provided credential.""" + ... + + @abstractmethod + async def execute(self, command: str, params: Optional[dict[str, Any]] = None) -> dict[str, Any]: + """Execute a command over the established session. + + Returns a dict with at minimum ``stdout``, ``stderr``, + ``exit_code`` keys (for shell transports) or + transport-specific structured output. + """ + ... + + @abstractmethod + async def disconnect(self) -> None: + """Tear down the session. MUST be idempotent.""" + ... + + @abstractmethod + async def is_alive(self) -> bool: + """Check if the session is still usable.""" + ... + + +class SessionConnector(ConnectorPlugin): + """Base for connectors that establish sessions to endpoints. + + Subclasses set ``credential_type`` and ``transport_class`` + to wire the connector to a specific transport and credential + backend. + + The ``invoke()`` method handles the full lifecycle: + credential acquisition → transport connect → execute → + disconnect, with guaranteed cleanup on failure. + """ + + credential_type: str = "" + transport_class: Type[SessionTransport] = SessionTransport # overridden by subclass + + def __init__(self, credential_resolver: CredentialResolver): + self._resolver = credential_resolver + + async def invoke( + self, operation: str, parameters: dict[str, Any], context: ConnectorContext + ) -> ConnectorResult: + target = parameters.get("target", "") + if not target: + return ConnectorResult(success=False, error="target required for session connector") + + # Build an AC-like context dict for the resolver. + ac_context = { + "gsap_context_id": context.gsap_context_id, + "accord": {"template": getattr(self, "accord_template", "")}, + } + + try: + credential = await self._resolver.resolve( + self.credential_type, target, ac_context + ) + except Exception as e: + return ConnectorResult(success=False, error=f"Credential resolution failed: {e}") + + transport = self.transport_class() + try: + await transport.connect(target, credential) + result = await transport.execute(operation, parameters) + return ConnectorResult(success=True, data=result) + except Exception as e: + logger.error("Session connector %s failed: %s", self.connector_id, e) + return ConnectorResult(success=False, error=str(e)) + finally: + try: + await transport.disconnect() + except Exception as cleanup_err: + logger.warning( + "Transport disconnect failed for %s: %s", + self.connector_id, + cleanup_err, + ) + + def health_check(self) -> bool: + return True