GCAP-SPEC-CONNECTOR-DESCRIPTOR-0001 implementation.
ConnectorPlugin — abstract base for governed connectors.
ConnectorRegistry — register/discover connectors.
ConnectorRuntime — wraps invoke with Chronicle lineage.
EchoConnector — dev/test example.
API endpoints:
GET /connectors/ — browse catalogue
GET /connectors/{id}/ — connector descriptor
POST /connectors/{id}/invoke/ — governed invocation
GET /connectors/{id}/health/ — health check
5 tests passing.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
"""Connectors router — governed connector catalogue and invocation."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from gsap_broker.connectors.base import ConnectorContext
|
|
from gsap_broker.connectors.registry import ConnectorRegistry
|
|
from gsap_broker.connectors.runtime import ConnectorRuntime
|
|
from gsap_broker.connectors.examples.echo_connector import EchoConnector
|
|
|
|
router = APIRouter()
|
|
|
|
# Module-level registry and runtime
|
|
_registry = ConnectorRegistry()
|
|
_runtime = ConnectorRuntime(registry=_registry)
|
|
|
|
# Register built-in connectors
|
|
_registry.register(EchoConnector())
|
|
|
|
|
|
class InvokeRequest(BaseModel):
|
|
operation: str
|
|
parameters: dict[str, Any] = {}
|
|
chronicle_session_id: str = ""
|
|
gsap_context_id: str = ""
|
|
pipeline_run_id: str = ""
|
|
dag_id: str = ""
|
|
|
|
|
|
class InvokeResponse(BaseModel):
|
|
success: bool
|
|
data: Any = None
|
|
error: str | None = None
|
|
lineage_cid: str = ""
|
|
connector_id: str = ""
|
|
operation: str = ""
|
|
|
|
|
|
@router.get("/")
|
|
async def catalogue() -> list[dict]:
|
|
return _registry.catalogue()
|
|
|
|
|
|
@router.get("/{connector_id}/")
|
|
async def get_descriptor(connector_id: str) -> dict:
|
|
connector = _registry.get(connector_id)
|
|
if connector is None:
|
|
raise HTTPException(status_code=404, detail=f"Connector not found: {connector_id}")
|
|
return connector.descriptor()
|
|
|
|
|
|
@router.post("/{connector_id}/invoke/")
|
|
async def invoke_connector(connector_id: str, body: InvokeRequest) -> InvokeResponse:
|
|
connector = _registry.get(connector_id)
|
|
if connector is None:
|
|
raise HTTPException(status_code=404, detail=f"Connector not found: {connector_id}")
|
|
|
|
ctx = ConnectorContext(
|
|
chronicle_session_id=body.chronicle_session_id,
|
|
gsap_context_id=body.gsap_context_id,
|
|
pipeline_run_id=body.pipeline_run_id,
|
|
dag_id=body.dag_id,
|
|
)
|
|
result = await _runtime.invoke(connector_id, body.operation, body.parameters, ctx)
|
|
return InvokeResponse(
|
|
success=result.success,
|
|
data=result.data,
|
|
error=result.error,
|
|
lineage_cid=result.lineage_cid,
|
|
connector_id=connector_id,
|
|
operation=body.operation,
|
|
)
|
|
|
|
|
|
@router.get("/{connector_id}/health/")
|
|
async def health_check(connector_id: str) -> dict:
|
|
connector = _registry.get(connector_id)
|
|
if connector is None:
|
|
raise HTTPException(status_code=404, detail=f"Connector not found: {connector_id}")
|
|
healthy = connector.health_check()
|
|
return {"connector_id": connector_id, "healthy": healthy}
|