GCAP-SPEC-FUNCTION-DESCRIPTOR-0001 implementation. Mirrors connector runtime pattern exactly. FunctionPlugin — trigger_events, handle(), descriptor(), knative_manifest() FunctionRegistry — trigger_index for event-driven routing FunctionRuntime — invoke() + dispatch() with Chronicle lineage governed_function decorator — SDK surface for function authors BillingProcessor — GSAP_CR_RECEIVED → billable event with Chronicle CID EchoFunction — dev/test API: /functions/ catalogue, invoke, dispatch, manifest, health 8 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
120 lines
3.7 KiB
Python
120 lines
3.7 KiB
Python
"""Functions router — governed serverless function catalogue and invocation."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from gsap_broker.functions.base import FunctionContext
|
|
from gsap_broker.functions.registry import FunctionRegistry
|
|
from gsap_broker.functions.runtime import FunctionRuntime
|
|
from gsap_broker.functions.examples.billing import BillingProcessor
|
|
from gsap_broker.functions.examples.echo import EchoFunction
|
|
|
|
router = APIRouter()
|
|
|
|
# Module-level registry and runtime
|
|
_registry = FunctionRegistry()
|
|
_runtime = FunctionRuntime(registry=_registry)
|
|
|
|
# Register built-in functions
|
|
_registry.register(BillingProcessor())
|
|
_registry.register(EchoFunction())
|
|
|
|
|
|
class InvokeRequest(BaseModel):
|
|
event: dict[str, Any] = {}
|
|
trigger_event_kind: str = ""
|
|
trigger_event_cid: str = ""
|
|
chronicle_session_id: str = ""
|
|
gsap_context_id: str = ""
|
|
pipeline_run_id: str = ""
|
|
|
|
|
|
class DispatchRequest(BaseModel):
|
|
event_kind: str
|
|
event_data: dict[str, Any] = {}
|
|
trigger_cid: str = ""
|
|
|
|
|
|
class InvokeResponse(BaseModel):
|
|
success: bool
|
|
data: Any = None
|
|
error: str | None = None
|
|
lineage_cid: str = ""
|
|
function_id: str = ""
|
|
duration_ms: float = 0.0
|
|
|
|
|
|
class DispatchResponse(BaseModel):
|
|
event_kind: str
|
|
dispatched_count: int
|
|
results: list[dict[str, Any]]
|
|
|
|
|
|
@router.get("/")
|
|
async def catalogue() -> dict:
|
|
return {
|
|
"functions": _registry.catalogue(),
|
|
"trigger_index": _registry.trigger_index(),
|
|
}
|
|
|
|
|
|
@router.get("/{function_id}/")
|
|
async def get_descriptor(function_id: str) -> dict:
|
|
function = _registry.get(function_id)
|
|
if function is None:
|
|
raise HTTPException(status_code=404, detail=f"Function not found: {function_id}")
|
|
return function.descriptor()
|
|
|
|
|
|
@router.get("/{function_id}/manifest/")
|
|
async def knative_manifest(function_id: str, image: str = "gcr.io/default/function:latest", namespace: str = "default") -> dict:
|
|
function = _registry.get(function_id)
|
|
if function is None:
|
|
raise HTTPException(status_code=404, detail=f"Function not found: {function_id}")
|
|
return function.knative_manifest(image=image, namespace=namespace)
|
|
|
|
|
|
@router.post("/{function_id}/invoke/")
|
|
async def invoke_function(function_id: str, body: InvokeRequest) -> InvokeResponse:
|
|
function = _registry.get(function_id)
|
|
if function is None:
|
|
raise HTTPException(status_code=404, detail=f"Function not found: {function_id}")
|
|
|
|
ctx = FunctionContext(
|
|
trigger_event_kind=body.trigger_event_kind,
|
|
trigger_event_cid=body.trigger_event_cid,
|
|
chronicle_session_id=body.chronicle_session_id,
|
|
gsap_context_id=body.gsap_context_id,
|
|
pipeline_run_id=body.pipeline_run_id,
|
|
)
|
|
result = await _runtime.invoke(function_id, body.event, ctx)
|
|
return InvokeResponse(
|
|
success=result.success,
|
|
data=result.data,
|
|
error=result.error,
|
|
lineage_cid=result.lineage_cid,
|
|
function_id=function_id,
|
|
duration_ms=result.duration_ms,
|
|
)
|
|
|
|
|
|
@router.post("/dispatch/")
|
|
async def dispatch_event(body: DispatchRequest) -> DispatchResponse:
|
|
results = await _runtime.dispatch(body.event_kind, body.event_data, body.trigger_cid)
|
|
return DispatchResponse(
|
|
event_kind=body.event_kind,
|
|
dispatched_count=len(results),
|
|
results=results,
|
|
)
|
|
|
|
|
|
@router.get("/{function_id}/health/")
|
|
async def health_check(function_id: str) -> dict:
|
|
function = _registry.get(function_id)
|
|
if function is None:
|
|
raise HTTPException(status_code=404, detail=f"Function not found: {function_id}")
|
|
healthy = function.health_check()
|
|
return {"function_id": function_id, "healthy": healthy}
|