fastapi-gsap/tests/test_connectors.py
Tyler J King 0987704264 feat: governed connector module
GCAP-SPEC-CONNECTOR-DESCRIPTOR-0001 implementation.

ConnectorPlugin — abstract base for governed connectors.
ConnectorRegistry — register/discover connectors.
ConnectorRuntime — wraps invoke with Chronicle lineage.
EchoConnector — dev/test example.

API endpoints:
  GET  /connectors/           — browse catalogue
  GET  /connectors/{id}/      — connector descriptor
  POST /connectors/{id}/invoke/ — governed invocation
  GET  /connectors/{id}/health/ — health check

5 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:42:38 -04:00

52 lines
1.4 KiB
Python

"""Tests for governed connector module."""
import pytest
@pytest.mark.asyncio
async def test_list_connectors(client):
resp = await client.get("/connectors/")
assert resp.status_code == 200
catalogue = resp.json()
ids = [c["connector_id"] for c in catalogue]
assert "echo" in ids
@pytest.mark.asyncio
async def test_get_descriptor(client):
resp = await client.get("/connectors/echo/")
assert resp.status_code == 200
desc = resp.json()
assert "corpus_entry_cid" in desc
assert desc["corpus_entry_cid"].startswith("sha256:")
@pytest.mark.asyncio
async def test_invoke_echo(client):
resp = await client.post(
"/connectors/echo/invoke/",
json={"operation": "echo", "parameters": {"msg": "hello"}},
)
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert body["data"]["msg"] == "hello"
assert body["connector_id"] == "echo"
assert body["operation"] == "echo"
@pytest.mark.asyncio
async def test_invoke_unknown_404(client):
resp = await client.post(
"/connectors/nonexistent/invoke/",
json={"operation": "test", "parameters": {}},
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_health_check(client):
resp = await client.get("/connectors/echo/health/")
assert resp.status_code == 200
body = resp.json()
assert body["healthy"] is True
assert body["connector_id"] == "echo"