initial: substrate-sdk-python v0.1.0
Python SDK for shellbound Django applications. Provides ShellApp, ShardContext, ShellboundMiddleware. Emits Chronicle events to stdout in dev mode. Includes fix for IndexError in apps.py when DJANGO_SETTINGS_MODULE has no dots (e.g. instance_settings). Shard name now falls back safely without eager default argument parsing. Implements SHELLBOUND-APP-0001 §4 (dev mode). Wired into entropyopposition as of 2026-03-18.
This commit is contained in:
commit
89a054d656
14 changed files with 733 additions and 0 deletions
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
__pycache__/
|
||||
*.pyc
|
||||
.pytest_cache/
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
68
README.md
Normal file
68
README.md
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
# substrate-sdk-python
|
||||
|
||||
Build shellbound applications. Any Python framework. Any deployment. Trust rooted in hardware.
|
||||
|
||||
## Install
|
||||
|
||||
```bash
|
||||
pip install substrate-sdk[django]
|
||||
```
|
||||
|
||||
## Django (2 lines)
|
||||
|
||||
```python
|
||||
# settings.py
|
||||
INSTALLED_APPS = [
|
||||
...
|
||||
'substrate_sdk.django',
|
||||
]
|
||||
MIDDLEWARE = [
|
||||
...
|
||||
'substrate_sdk.django.ShellboundMiddleware',
|
||||
]
|
||||
```
|
||||
|
||||
Every request is Chronicle-attributed. Every model save is Chronicle-recorded.
|
||||
If a bascule shell is present: fully governed. If not: development mode, works normally.
|
||||
|
||||
## Direct usage
|
||||
|
||||
```python
|
||||
from substrate_sdk import ShellApp
|
||||
|
||||
app = ShellApp(shard_name="my-app", capabilities={"network": True})
|
||||
context = app.register()
|
||||
print(context.is_governed) # True if shell present
|
||||
print(context.shard_did) # DID assigned by shell
|
||||
```
|
||||
|
||||
## What this does
|
||||
|
||||
When running inside a Substrate FFC:
|
||||
- Your process gets a DID + SPIFFE SVID
|
||||
- Your process gets a governed accord
|
||||
- Every request is Chronicle-attributed
|
||||
- Every operation is audit-verifiable
|
||||
- You participate in the governance fabric
|
||||
|
||||
When running locally (no shell):
|
||||
- Development mode — everything works normally
|
||||
- Chronicle events go to stdout
|
||||
- No governance enforcement
|
||||
- Zero impact on application behavior
|
||||
|
||||
## Phase B: HFL Direct Binding (planned)
|
||||
|
||||
When the HFL kernel module is present, `substrate_sdk` will route Chronicle
|
||||
events through `substrate-hfl-python` (PyO3) instead of stdout. This upgrades
|
||||
a Tier 4 deployment to Tier 1/2 with **zero application code changes**.
|
||||
|
||||
```
|
||||
Phase A (current): SDK → stdout → bascule-filter → Chronicle
|
||||
Phase B (planned): SDK → substrate-hfl-python (PyO3) → HFL → Chronicle
|
||||
```
|
||||
|
||||
`substrate-hfl-python` is a new crate in the substrate workspace:
|
||||
- Depends on `hfl-types`
|
||||
- Exposes: `chronicle_write()`, `accord_check()`, `session_get()`
|
||||
- `substrate_sdk/chronicle.py` detects HFL availability and routes automatically
|
||||
19
pyproject.toml
Normal file
19
pyproject.toml
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "substrate-sdk"
|
||||
version = "0.1.0"
|
||||
description = "Shellbound application SDK for Python. Build governed applications for the Substrate FFC consortium."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
license = {text = "Apache-2.0"}
|
||||
dependencies = []
|
||||
|
||||
[project.optional-dependencies]
|
||||
django = ["Django>=4.2"]
|
||||
all = ["Django>=4.2"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
15
substrate_sdk/__init__.py
Normal file
15
substrate_sdk/__init__.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
from .app import ShellApp
|
||||
from .chronicle import LAYER_APPLICATION, LAYER_GOVERNANCE, LAYER_IDENTITY, LAYER_TCB, ShardEmitter, emit_event
|
||||
from .context import ShardContext
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__all__ = [
|
||||
"ShellApp",
|
||||
"ShardContext",
|
||||
"ShardEmitter",
|
||||
"emit_event",
|
||||
"LAYER_TCB",
|
||||
"LAYER_IDENTITY",
|
||||
"LAYER_GOVERNANCE",
|
||||
"LAYER_APPLICATION",
|
||||
]
|
||||
149
substrate_sdk/app.py
Normal file
149
substrate_sdk/app.py
Normal file
|
|
@ -0,0 +1,149 @@
|
|||
"""ShellApp — the main SDK entry point.
|
||||
|
||||
Usage:
|
||||
from substrate_sdk import ShellApp
|
||||
|
||||
app = ShellApp(shard_name="my-app", version="1.0.0", capabilities={"network": True})
|
||||
context = app.register()
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import uuid
|
||||
from typing import Optional
|
||||
|
||||
from .chronicle import LAYER_APPLICATION, ShardEmitter, emit_event
|
||||
from .context import ShardContext
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SHELL_SOCKET = os.environ.get("SUBSTRATE_SHELL_SOCKET", "/run/bascule/shard.sock")
|
||||
REGISTER_TIMEOUT = float(os.environ.get("SUBSTRATE_REGISTER_TIMEOUT", "5.0"))
|
||||
|
||||
|
||||
class ShellApp:
|
||||
"""A shellbound application instance.
|
||||
|
||||
Non-fatal everywhere: shell unavailable → dev mode. Emit fails → logged.
|
||||
Never raises due to shell issues. Never blocks startup >5 seconds.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
shard_name: str,
|
||||
version: str = "0.0.0",
|
||||
capabilities: Optional[dict] = None,
|
||||
language: str = "python",
|
||||
):
|
||||
self.shard_name = shard_name
|
||||
self.version = version
|
||||
self.capabilities = capabilities or {}
|
||||
self.language = language
|
||||
self._context: Optional[ShardContext] = None
|
||||
self._emitter: Optional[ShardEmitter] = None
|
||||
self._socket: Optional[socket.socket] = None
|
||||
|
||||
def register(self) -> ShardContext:
|
||||
"""Register with bascule shell. Falls back to dev mode if unavailable."""
|
||||
context = ShardContext(
|
||||
shard_name=self.shard_name,
|
||||
shard_id=str(uuid.uuid4()),
|
||||
language=self.language,
|
||||
version=self.version,
|
||||
)
|
||||
|
||||
if os.path.exists(SHELL_SOCKET):
|
||||
try:
|
||||
context = self._register_with_shell(context)
|
||||
except Exception as e:
|
||||
logger.warning("[substrate-sdk] Shell registration failed (dev mode): %s", e)
|
||||
else:
|
||||
logger.info("[substrate-sdk] %s: running without shell — development mode", self.shard_name)
|
||||
|
||||
self._context = context
|
||||
self._emitter = ShardEmitter(context)
|
||||
|
||||
self._emitter.emit(
|
||||
"APP_SHARD_STARTED",
|
||||
LAYER_APPLICATION,
|
||||
{
|
||||
"shard_name": self.shard_name,
|
||||
"shard_id": context.shard_id,
|
||||
"shard_did": context.shard_did,
|
||||
"language": self.language,
|
||||
"version": self.version,
|
||||
"capabilities": self.capabilities,
|
||||
"ffc_did": context.ffc_did,
|
||||
"governed": context.is_governed,
|
||||
},
|
||||
)
|
||||
|
||||
return context
|
||||
|
||||
def shutdown(self, reason: str = "normal") -> None:
|
||||
"""Deregister from shell. Emits APP_SHARD_STOPPED."""
|
||||
if self._emitter and self._context:
|
||||
self._emitter.emit(
|
||||
"APP_SHARD_STOPPED",
|
||||
LAYER_APPLICATION,
|
||||
{"shard_name": self.shard_name, "shard_id": self._context.shard_id, "reason": reason},
|
||||
)
|
||||
|
||||
if self._socket:
|
||||
try:
|
||||
msg = json.dumps({
|
||||
"type": "SHARD_DEREGISTER",
|
||||
"shard_id": self._context.shard_id if self._context else "",
|
||||
"reason": reason,
|
||||
})
|
||||
self._socket.sendall((msg + "\n").encode())
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
self._socket.close()
|
||||
|
||||
@property
|
||||
def emitter(self) -> Optional[ShardEmitter]:
|
||||
return self._emitter
|
||||
|
||||
@property
|
||||
def context(self) -> Optional[ShardContext]:
|
||||
return self._context
|
||||
|
||||
def _register_with_shell(self, context: ShardContext) -> ShardContext:
|
||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
s.settimeout(REGISTER_TIMEOUT)
|
||||
s.connect(SHELL_SOCKET)
|
||||
|
||||
msg = json.dumps({
|
||||
"type": "SHARD_REGISTER",
|
||||
"shard_name": self.shard_name,
|
||||
"version": self.version,
|
||||
"pid": os.getpid(),
|
||||
"capabilities": self.capabilities,
|
||||
"sdk_version": "0.1.0",
|
||||
"language": self.language,
|
||||
})
|
||||
s.sendall((msg + "\n").encode())
|
||||
|
||||
buf = b""
|
||||
while b"\n" not in buf:
|
||||
chunk = s.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
buf += chunk
|
||||
|
||||
response = json.loads(buf.split(b"\n")[0])
|
||||
|
||||
if response.get("type") == "SHARD_REGISTERED":
|
||||
context.shard_did = response.get("shard_did", "")
|
||||
context.svid = response.get("svid", "")
|
||||
context.accord_hash = response.get("accord_hash", "")
|
||||
context.ffc_did = response.get("ffc_did", "")
|
||||
context.is_governed = True
|
||||
self._socket = s
|
||||
logger.info("[substrate-sdk] %s registered. DID: %s", self.shard_name, context.shard_did)
|
||||
|
||||
return context
|
||||
110
substrate_sdk/chronicle.py
Normal file
110
substrate_sdk/chronicle.py
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
"""Chronicle event emission via stdout (Phase A) or HFL direct binding (Phase B).
|
||||
|
||||
Phase A (current): Writes substrate_event JSON lines to stdout.
|
||||
bascule-filter-python reads and translates to ChronicleEntry records.
|
||||
|
||||
Phase B (planned): When substrate-hfl-python (PyO3) is installed,
|
||||
routes events through HFL kernel module directly:
|
||||
chronicle_write(), accord_check(), session_get()
|
||||
Tier 4 → Tier 1/2 upgrade with zero application code changes.
|
||||
Detection: try import substrate_hfl; if available, use HFL path.
|
||||
|
||||
All writes are non-blocking. All failures are silently logged.
|
||||
Chronicle emission MUST NOT affect application behavior.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# TODO Phase B: HFL direct binding detection
|
||||
# When substrate-hfl-python (PyO3) is installed, use HFL path:
|
||||
# try:
|
||||
# import substrate_hfl
|
||||
# _hfl_available = True
|
||||
# except ImportError:
|
||||
# _hfl_available = False
|
||||
# Then in emit_event(): if _hfl_available, call substrate_hfl.chronicle_write()
|
||||
# instead of stdout print(). Zero application code changes required.
|
||||
_hfl_available = False
|
||||
|
||||
LAYER_TCB = 0
|
||||
LAYER_IDENTITY = 1
|
||||
LAYER_GOVERNANCE = 2
|
||||
LAYER_APPLICATION = 3
|
||||
|
||||
|
||||
def emit_event(
|
||||
kind: str,
|
||||
layer: int,
|
||||
payload: dict,
|
||||
actor_did: str = "",
|
||||
accord_hash: str = "",
|
||||
shard_id: str = "",
|
||||
) -> bool:
|
||||
"""Emit a Chronicle event to stdout.
|
||||
|
||||
Returns True if written, False on error (non-fatal).
|
||||
The substrate_event marker tells bascule-filter this is a Chronicle event.
|
||||
"""
|
||||
event = {
|
||||
"substrate_event": True,
|
||||
"kind": kind,
|
||||
"layer": layer,
|
||||
"actor_did": actor_did,
|
||||
"accord_hash": accord_hash,
|
||||
"shard_id": shard_id,
|
||||
"occurred_at": time.time_ns(),
|
||||
"payload": payload,
|
||||
}
|
||||
try:
|
||||
line = json.dumps(event, separators=(",", ":"), sort_keys=True)
|
||||
print(line, flush=True)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Chronicle emit failed (non-fatal): %s", e)
|
||||
return False
|
||||
|
||||
|
||||
class ShardEmitter:
|
||||
"""Chronicle emitter scoped to a shard. Carries shard_id, actor_did, accord_hash."""
|
||||
|
||||
def __init__(self, context):
|
||||
self._ctx = context
|
||||
|
||||
def emit(self, kind: str, layer: int, payload: dict, actor_did: Optional[str] = None) -> bool:
|
||||
return emit_event(
|
||||
kind=kind,
|
||||
layer=layer,
|
||||
payload=payload,
|
||||
actor_did=actor_did or self._ctx.actor_did,
|
||||
accord_hash=self._ctx.accord_hash,
|
||||
shard_id=self._ctx.shard_id,
|
||||
)
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: str,
|
||||
path: str,
|
||||
status: int,
|
||||
duration_ms: float,
|
||||
actor_did: Optional[str] = None,
|
||||
view_name: Optional[str] = None,
|
||||
) -> bool:
|
||||
return self.emit(
|
||||
"APP_DJANGO_REQUEST",
|
||||
LAYER_APPLICATION,
|
||||
{"method": method, "path": path, "status": status, "duration_ms": duration_ms, "view_name": view_name},
|
||||
actor_did=actor_did,
|
||||
)
|
||||
|
||||
def signal(self, signal_name: str, model: str, instance_id: Optional[str] = None) -> bool:
|
||||
return self.emit(
|
||||
"APP_DJANGO_SIGNAL",
|
||||
LAYER_APPLICATION,
|
||||
{"signal": signal_name, "model": model, "instance_id": instance_id},
|
||||
)
|
||||
27
substrate_sdk/context.py
Normal file
27
substrate_sdk/context.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
"""Shard context — identity and governance state from shell registration."""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
import uuid
|
||||
|
||||
|
||||
@dataclass
|
||||
class ShardContext:
|
||||
"""Context returned by ShellApp.register().
|
||||
|
||||
Holds all identity and governance information for this shard instance.
|
||||
"""
|
||||
|
||||
shard_name: str
|
||||
shard_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
shard_did: str = ""
|
||||
svid: str = ""
|
||||
accord_hash: str = ""
|
||||
ffc_did: str = ""
|
||||
is_governed: bool = False
|
||||
language: str = "python"
|
||||
version: str = "0.0.0"
|
||||
|
||||
@property
|
||||
def actor_did(self) -> str:
|
||||
"""The DID for events from this shard."""
|
||||
return self.shard_did or f"did:web:local:shard:{self.shard_name}"
|
||||
1
substrate_sdk/django/__init__.py
Normal file
1
substrate_sdk/django/__init__.py
Normal file
|
|
@ -0,0 +1 @@
|
|||
default_app_config = "substrate_sdk.django.apps.ShellboundDjangoApp"
|
||||
28
substrate_sdk/django/apps.py
Normal file
28
substrate_sdk/django/apps.py
Normal file
|
|
@ -0,0 +1,28 @@
|
|||
"""Django app config — registers shard with shell at startup."""
|
||||
|
||||
import os
|
||||
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class ShellboundDjangoApp(AppConfig):
|
||||
name = "substrate_sdk.django"
|
||||
label = "substrate_sdk"
|
||||
verbose_name = "Substrate Shell SDK"
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
|
||||
def ready(self):
|
||||
from substrate_sdk import ShellApp
|
||||
from substrate_sdk.django.middleware import set_emitter
|
||||
from substrate_sdk.django.signals import connect_signals
|
||||
|
||||
shard_name = os.environ.get("SUBSTRATE_SHARD_NAME")
|
||||
if not shard_name:
|
||||
parts = os.environ.get("DJANGO_SETTINGS_MODULE", "django-app").split(".")
|
||||
shard_name = parts[-2] if len(parts) >= 2 else parts[0]
|
||||
version = os.environ.get("SUBSTRATE_SHARD_VERSION", "0.0.0")
|
||||
|
||||
app = ShellApp(shard_name=shard_name, version=version)
|
||||
app.register()
|
||||
set_emitter(app.emitter)
|
||||
connect_signals(app.emitter)
|
||||
66
substrate_sdk/django/middleware.py
Normal file
66
substrate_sdk/django/middleware.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
"""Django middleware — emits APP_DJANGO_REQUEST for every request. Non-fatal."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_emitter = None
|
||||
|
||||
|
||||
def set_emitter(emitter):
|
||||
global _emitter
|
||||
_emitter = emitter
|
||||
|
||||
|
||||
class ShellboundMiddleware:
|
||||
"""WSGI middleware emitting Chronicle events per Django request."""
|
||||
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
start = time.monotonic()
|
||||
response = self.get_response(request)
|
||||
duration_ms = (time.monotonic() - start) * 1000
|
||||
|
||||
try:
|
||||
if _emitter:
|
||||
_emitter.request(
|
||||
method=request.method,
|
||||
path=request.path,
|
||||
status=response.status_code,
|
||||
duration_ms=round(duration_ms, 2),
|
||||
actor_did=_extract_did(request),
|
||||
view_name=_extract_view(request),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Chronicle middleware non-fatal: %s", e)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def _extract_did(request) -> str:
|
||||
"""Extract actor DID from request auth context."""
|
||||
if hasattr(request, "auth") and request.auth:
|
||||
claims = getattr(request.auth, "payload", {})
|
||||
if isinstance(claims, dict):
|
||||
did = claims.get("substrate_did", "")
|
||||
if did:
|
||||
return did
|
||||
|
||||
if hasattr(request, "user") and request.user and getattr(request.user, "is_authenticated", False):
|
||||
user = request.user
|
||||
if hasattr(user, "external_id") and user.external_id:
|
||||
org = getattr(user, "organization", None)
|
||||
if org:
|
||||
return f"did:web:guildhouse.dev:{org.slug}:user:{user.external_id}"
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def _extract_view(request) -> str:
|
||||
resolver = getattr(request, "resolver_match", None)
|
||||
if resolver:
|
||||
return resolver.view_name or getattr(resolver.func, "__name__", "")
|
||||
return ""
|
||||
25
substrate_sdk/django/signals.py
Normal file
25
substrate_sdk/django/signals.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
"""Connect Django signals to Chronicle events."""
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def connect_signals(emitter):
|
||||
"""Connect Django model signals to Chronicle. No-op if emitter is None."""
|
||||
if not emitter:
|
||||
return
|
||||
|
||||
from django.db.models.signals import post_save
|
||||
|
||||
def on_post_save(sender, instance, created, **kwargs):
|
||||
try:
|
||||
emitter.signal(
|
||||
signal_name="post_save_create" if created else "post_save_update",
|
||||
model=sender.__name__,
|
||||
instance_id=str(getattr(instance, "pk", "")),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Signal Chronicle non-fatal: %s", e)
|
||||
|
||||
post_save.connect(on_post_save)
|
||||
73
tests/test_app.py
Normal file
73
tests/test_app.py
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
"""Tests for ShellApp registration and lifecycle."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
from substrate_sdk.app import ShellApp
|
||||
|
||||
|
||||
class TestShellAppDevMode:
|
||||
|
||||
def test_register_no_socket(self, capsys):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="test-app", version="1.0.0")
|
||||
ctx = app.register()
|
||||
assert ctx.is_governed is False
|
||||
assert ctx.shard_id # UUID assigned
|
||||
|
||||
def test_emits_shard_started(self, capsys):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="test-app")
|
||||
app.register()
|
||||
lines = capsys.readouterr().out.strip().split("\n")
|
||||
events = [json.loads(l) for l in lines if l.strip()]
|
||||
started = [e for e in events if e.get("kind") == "APP_SHARD_STARTED"]
|
||||
assert len(started) == 1
|
||||
assert started[0]["payload"]["governed"] is False
|
||||
|
||||
def test_shutdown_emits_shard_stopped(self, capsys):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="test-app")
|
||||
app.register()
|
||||
app.shutdown("normal")
|
||||
lines = capsys.readouterr().out.strip().split("\n")
|
||||
events = [json.loads(l) for l in lines if l.strip()]
|
||||
stopped = [e for e in events if e.get("kind") == "APP_SHARD_STOPPED"]
|
||||
assert len(stopped) == 1
|
||||
|
||||
def test_register_timeout_fallback(self, tmp_path):
|
||||
sock_path = str(tmp_path / "shard.sock")
|
||||
# Create file so os.path.exists returns True, but nothing listens
|
||||
open(sock_path, "w").close()
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": sock_path, "SUBSTRATE_REGISTER_TIMEOUT": "0.1"}):
|
||||
app = ShellApp(shard_name="timeout-test")
|
||||
ctx = app.register()
|
||||
assert ctx.is_governed is False
|
||||
|
||||
def test_shard_name_in_context(self):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="my-shard")
|
||||
ctx = app.register()
|
||||
assert ctx.shard_name == "my-shard"
|
||||
|
||||
def test_emitter_available(self):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="test")
|
||||
app.register()
|
||||
assert app.emitter is not None
|
||||
|
||||
def test_capabilities_in_started(self, capsys):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="cap-test", capabilities={"network": True})
|
||||
app.register()
|
||||
lines = capsys.readouterr().out.strip().split("\n")
|
||||
started = json.loads(lines[0])
|
||||
assert started["payload"]["capabilities"]["network"] is True
|
||||
|
||||
def test_version_in_started(self, capsys):
|
||||
with patch.dict(os.environ, {"SUBSTRATE_SHELL_SOCKET": "/tmp/nonexistent.sock"}):
|
||||
app = ShellApp(shard_name="ver-test", version="1.2.3")
|
||||
app.register()
|
||||
started = json.loads(capsys.readouterr().out.strip().split("\n")[0])
|
||||
assert started["payload"]["version"] == "1.2.3"
|
||||
52
tests/test_chronicle.py
Normal file
52
tests/test_chronicle.py
Normal file
|
|
@ -0,0 +1,52 @@
|
|||
"""Tests for Chronicle event emission."""
|
||||
|
||||
import json
|
||||
from io import StringIO
|
||||
from unittest.mock import patch
|
||||
|
||||
from substrate_sdk.chronicle import LAYER_APPLICATION, ShardEmitter, emit_event
|
||||
from substrate_sdk.context import ShardContext
|
||||
|
||||
|
||||
class TestEmitEvent:
|
||||
|
||||
def test_writes_to_stdout(self, capsys):
|
||||
emit_event("TEST_EVENT", 3, {"key": "val"})
|
||||
out = capsys.readouterr().out.strip()
|
||||
data = json.loads(out)
|
||||
assert data["substrate_event"] is True
|
||||
|
||||
def test_kind_in_output(self, capsys):
|
||||
emit_event("APP_SHARD_STARTED", 3, {})
|
||||
data = json.loads(capsys.readouterr().out.strip())
|
||||
assert data["kind"] == "APP_SHARD_STARTED"
|
||||
|
||||
def test_layer_in_output(self, capsys):
|
||||
emit_event("X", 2, {})
|
||||
data = json.loads(capsys.readouterr().out.strip())
|
||||
assert data["layer"] == 2
|
||||
|
||||
def test_non_fatal_on_error(self):
|
||||
with patch("builtins.print", side_effect=Exception("boom")):
|
||||
result = emit_event("TEST", 3, {})
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestShardEmitter:
|
||||
|
||||
def test_carries_context(self, capsys):
|
||||
ctx = ShardContext(shard_name="test", shard_id="abc-123", accord_hash="xyz-456")
|
||||
emitter = ShardEmitter(ctx)
|
||||
emitter.emit("TEST", 3, {})
|
||||
data = json.loads(capsys.readouterr().out.strip())
|
||||
assert data["accord_hash"] == "xyz-456"
|
||||
assert data["shard_id"] == "abc-123"
|
||||
|
||||
def test_request_helper(self, capsys):
|
||||
ctx = ShardContext(shard_name="test", shard_id="s1")
|
||||
emitter = ShardEmitter(ctx)
|
||||
emitter.request("GET", "/api/", 200, 42.0)
|
||||
data = json.loads(capsys.readouterr().out.strip())
|
||||
assert data["kind"] == "APP_DJANGO_REQUEST"
|
||||
assert data["payload"]["method"] == "GET"
|
||||
assert data["payload"]["status"] == 200
|
||||
94
tests/test_django_middleware.py
Normal file
94
tests/test_django_middleware.py
Normal file
|
|
@ -0,0 +1,94 @@
|
|||
"""Tests for Django middleware. Requires Django."""
|
||||
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
from substrate_sdk.django.middleware import ShellboundMiddleware, _extract_did, set_emitter
|
||||
HAS_DJANGO = True
|
||||
except ImportError:
|
||||
HAS_DJANGO = False
|
||||
|
||||
pytestmark = pytest.mark.skipif(not HAS_DJANGO, reason="Django not installed")
|
||||
|
||||
|
||||
class TestShellboundMiddleware:
|
||||
|
||||
def test_emits_request_event(self, capsys):
|
||||
emitter = MagicMock()
|
||||
set_emitter(emitter)
|
||||
|
||||
request = MagicMock()
|
||||
request.method = "GET"
|
||||
request.path = "/api/test/"
|
||||
request.resolver_match = MagicMock(view_name="test-view")
|
||||
request.auth = None
|
||||
request.user = MagicMock(is_authenticated=False)
|
||||
|
||||
response = MagicMock(status_code=200)
|
||||
get_response = MagicMock(return_value=response)
|
||||
mw = ShellboundMiddleware(get_response)
|
||||
result = mw(request)
|
||||
|
||||
emitter.request.assert_called_once()
|
||||
call_kwargs = emitter.request.call_args
|
||||
assert call_kwargs.kwargs["method"] == "GET"
|
||||
assert call_kwargs.kwargs["status"] == 200
|
||||
|
||||
def test_captures_status_code(self):
|
||||
emitter = MagicMock()
|
||||
set_emitter(emitter)
|
||||
|
||||
request = MagicMock(method="POST", path="/api/create/", auth=None)
|
||||
request.user = MagicMock(is_authenticated=False)
|
||||
request.resolver_match = MagicMock(view_name="create")
|
||||
|
||||
response = MagicMock(status_code=201)
|
||||
mw = ShellboundMiddleware(MagicMock(return_value=response))
|
||||
mw(request)
|
||||
|
||||
assert emitter.request.call_args.kwargs["status"] == 201
|
||||
|
||||
def test_non_fatal_when_emitter_none(self):
|
||||
set_emitter(None)
|
||||
|
||||
request = MagicMock(method="GET", path="/")
|
||||
response = MagicMock(status_code=200)
|
||||
mw = ShellboundMiddleware(MagicMock(return_value=response))
|
||||
result = mw(request)
|
||||
assert result.status_code == 200
|
||||
|
||||
def test_extract_did_from_user(self):
|
||||
request = MagicMock()
|
||||
request.auth = None
|
||||
user = MagicMock()
|
||||
user.is_authenticated = True
|
||||
user.external_id = "entra-oid-123"
|
||||
user.organization = MagicMock(slug="acme")
|
||||
request.user = user
|
||||
|
||||
did = _extract_did(request)
|
||||
assert "entra-oid-123" in did
|
||||
assert "acme" in did
|
||||
|
||||
def test_extract_did_anonymous(self):
|
||||
request = MagicMock()
|
||||
request.auth = None
|
||||
request.user = MagicMock(is_authenticated=False)
|
||||
|
||||
assert _extract_did(request) == ""
|
||||
|
||||
def test_duration_ms_positive(self):
|
||||
emitter = MagicMock()
|
||||
set_emitter(emitter)
|
||||
|
||||
request = MagicMock(method="GET", path="/", auth=None)
|
||||
request.user = MagicMock(is_authenticated=False)
|
||||
request.resolver_match = MagicMock(view_name="root")
|
||||
|
||||
mw = ShellboundMiddleware(MagicMock(return_value=MagicMock(status_code=200)))
|
||||
mw(request)
|
||||
|
||||
assert emitter.request.call_args.kwargs["duration_ms"] >= 0
|
||||
Reference in a new issue