fastapi-gsap/gsap_broker/connectors/registry.py
Tyler J King 0987704264 feat: governed connector module
GCAP-SPEC-CONNECTOR-DESCRIPTOR-0001 implementation.

ConnectorPlugin — abstract base for governed connectors.
ConnectorRegistry — register/discover connectors.
ConnectorRuntime — wraps invoke with Chronicle lineage.
EchoConnector — dev/test example.

API endpoints:
  GET  /connectors/           — browse catalogue
  GET  /connectors/{id}/      — connector descriptor
  POST /connectors/{id}/invoke/ — governed invocation
  GET  /connectors/{id}/health/ — health check

5 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:42:38 -04:00

25 lines
888 B
Python

"""ConnectorRegistry — catalogue of governed connectors."""
from __future__ import annotations
from .base import ConnectorPlugin
class ConnectorRegistry:
def __init__(self) -> None:
self._connectors: dict[str, ConnectorPlugin] = {}
def register(self, connector: ConnectorPlugin) -> None:
if not connector.connector_id:
raise ValueError("connector_id must be non-empty")
if not connector.corpus_entry_cid:
raise ValueError("corpus_entry_cid must be non-empty")
self._connectors[connector.connector_id] = connector
def get(self, connector_id: str) -> ConnectorPlugin | None:
return self._connectors.get(connector_id)
def catalogue(self) -> list[dict]:
return [c.descriptor() for c in self._connectors.values()]
def list_ids(self) -> list[str]:
return list(self._connectors.keys())