fastapi-gsap/gsap_broker/connectors/examples/echo_connector.py
Tyler J King 0987704264 feat: governed connector module
GCAP-SPEC-CONNECTOR-DESCRIPTOR-0001 implementation.

ConnectorPlugin — abstract base for governed connectors.
ConnectorRegistry — register/discover connectors.
ConnectorRuntime — wraps invoke with Chronicle lineage.
EchoConnector — dev/test example.

API endpoints:
  GET  /connectors/           — browse catalogue
  GET  /connectors/{id}/      — connector descriptor
  POST /connectors/{id}/invoke/ — governed invocation
  GET  /connectors/{id}/health/ — health check

5 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:42:38 -04:00

24 lines
716 B
Python

"""EchoConnector — minimal example governed connector."""
from __future__ import annotations
from typing import Any
from gsap_broker.connectors.base import ConnectorContext, ConnectorPlugin, ConnectorResult
class EchoConnector(ConnectorPlugin):
connector_id = "echo"
corpus_entry_cid = "sha256:" + "e" * 64
capability_mask = 1
declared_endpoints = ["localhost"]
accord_template = ""
gsap_required = False
chronicle_enabled = True
async def invoke(
self, operation: str, parameters: dict[str, Any], context: ConnectorContext
) -> ConnectorResult:
return ConnectorResult(success=True, data=parameters)
def health_check(self) -> bool:
return True