fastapi-gsap/gsap_broker/functions/registry.py
Tyler J King 0c77943ceb feat: governed function runtime + billing drain
GCAP-SPEC-FUNCTION-DESCRIPTOR-0001 implementation.
Mirrors connector runtime pattern exactly.

FunctionPlugin — trigger_events, handle(), descriptor(), knative_manifest()
FunctionRegistry — trigger_index for event-driven routing
FunctionRuntime — invoke() + dispatch() with Chronicle lineage
governed_function decorator — SDK surface for function authors
BillingProcessor — GSAP_CR_RECEIVED → billable event with Chronicle CID
EchoFunction — dev/test

API: /functions/ catalogue, invoke, dispatch, manifest, health
8 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 22:12:29 -04:00

37 lines
1.4 KiB
Python

"""FunctionRegistry — catalogue of governed serverless functions."""
from __future__ import annotations
from .base import FunctionPlugin
class FunctionRegistry:
def __init__(self) -> None:
self._functions: dict[str, FunctionPlugin] = {}
self._trigger_index: dict[str, list[str]] = {}
def register(self, function: FunctionPlugin) -> None:
if not function.function_id:
raise ValueError("function_id must be non-empty")
if not function.corpus_entry_cid:
raise ValueError("corpus_entry_cid must be non-empty")
self._functions[function.function_id] = function
# Build trigger index
for event_kind in function.trigger_events:
self._trigger_index.setdefault(event_kind, []).append(function.function_id)
def get(self, function_id: str) -> FunctionPlugin | None:
return self._functions.get(function_id)
def catalogue(self) -> list[dict]:
return [f.descriptor() for f in self._functions.values()]
def list_ids(self) -> list[str]:
return list(self._functions.keys())
def trigger_index(self) -> dict[str, list[str]]:
return dict(self._trigger_index)
def by_trigger(self, event_kind: str) -> list[FunctionPlugin]:
"""Return all functions registered for a given trigger event kind."""
ids = self._trigger_index.get(event_kind, [])
return [self._functions[fid] for fid in ids if fid in self._functions]