fastapi-gsap/gsap_broker/delegations/router.py
Tyler J King 782f5654ac fix: shared bearer auth, delegation depth, SQLite permissions
C-4: MCP endpoint requires verified bearer token. Unauthenticated
     requests rejected. _extract_principal() replaced by verified
     AuthResult from middleware.
C-8: All delegation endpoints require verified bearer token.
     X-Delegator-DID header removed — identity from token only.
     delegator_ac_id validated to belong to authenticated principal.
     Only delegators can revoke. Only delegator/delegate can view.
H-6: SQLite file permissions restricted to 0o600 (owner-only).
     Umask set before creation. WAL/SHM files also restricted.
H-7: Delegation depth tracked and enforced against max_delegation_depth.
     Sub-delegations increment depth. Exceeded depth → 403.

Shared TokenAuthenticator auto-detects identity driver from JWT
issuer claim (Keycloak or Entra). verify_bearer FastAPI dependency
for all protected endpoints. Health endpoint remains public.

ALL 10 critical findings CLOSED. ALL 10 high findings CLOSED.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 17:31:46 -04:00

176 lines
6.1 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""FastAPI router for delegation lifecycle.
Fix C-8: All endpoints require bearer token authentication.
Fix H-7: Delegation depth enforced via max_delegation_depth.
Endpoints:
POST /delegations/ create_delegation SS8.1
POST /delegations/{id}/revoke revoke_delegation SS8.2
GET /delegations/{id} get_delegation SS8.3
GET /delegations/ list_delegations SS8.4
"""
from datetime import datetime, UTC
from fastapi import APIRouter, Depends, HTTPException
from gsap_broker.auth.middleware import verify_bearer
from gsap_broker.drivers.base import AuthResult
from gsap_broker.settings import settings
from .lifecycle import DelegationManager
from .models import (
ActiveDelegation,
AgentListResponse,
DelegationInfo,
DelegationRequest,
DelegationResponse,
DelegationStatus,
RevokeRequest,
RevokeResponse,
)
from .storage import get_active_delegations, get_delegation as db_get
router = APIRouter(prefix="/delegations", tags=["Delegations"])
manager = DelegationManager()
@router.post("/", response_model=DelegationResponse)
async def create_delegation(
request: DelegationRequest,
auth: AuthResult = Depends(verify_bearer),
):
"""Request delegation of authority to an AI agent (SS8.1).
Fix C-8: delegator_did derived from verified token.
Fix H-7: delegation depth enforced.
"""
delegator_did = auth.principal_did
# Validate delegator_ac_id belongs to this principal
from gsap_broker.db import engine
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import text
async with AsyncSession(engine) as session:
result = await session.execute(
text("SELECT principal_did, capability_mask FROM authorization_contexts WHERE context_id = :ctx_id AND status IN ('authorized', 'active')"),
{"ctx_id": request.delegator_ac_id.replace("-", "")},
)
ac_row = result.first()
if not ac_row or ac_row[0] != delegator_did:
raise HTTPException(403, "delegator_ac_id not found or does not belong to authenticated principal")
delegator_cap = ac_row[1] if ac_row[1] else 0x7
# Fix H-7: check delegation depth
parent_delegation = await _find_delegation_by_ac(request.delegator_ac_id)
new_depth = (parent_delegation.depth + 1) if parent_delegation and hasattr(parent_delegation, "depth") else 0
if new_depth >= settings.max_delegation_depth:
raise HTTPException(
403,
f"Maximum delegation depth ({settings.max_delegation_depth}) exceeded. "
f"Current depth: {new_depth}",
)
try:
result = await manager.create_delegation(
request, delegator_did,
delegator_capability_mask=delegator_cap,
)
return result
except ValueError as e:
raise HTTPException(403, str(e))
except RuntimeError as e:
raise HTTPException(502, str(e))
@router.post("/{delegation_id}/revoke", response_model=RevokeResponse)
async def revoke_delegation(
delegation_id: str,
request: RevokeRequest = RevokeRequest(),
auth: AuthResult = Depends(verify_bearer),
):
"""Revoke an active delegation (SS8.2). Only the delegator can revoke."""
delegation = await db_get(delegation_id)
if not delegation:
raise HTTPException(404, "Delegation not found")
if delegation.delegator_did != auth.principal_did:
raise HTTPException(403, "Only the delegator can revoke this delegation")
success = await manager.revoke_delegation(delegation_id, request.reason)
if not success:
raise HTTPException(404, "Delegation not found or not active")
return RevokeResponse(delegation_id=delegation_id, reason=request.reason)
@router.get("/{delegation_id}", response_model=DelegationInfo)
async def get_delegation(
delegation_id: str,
auth: AuthResult = Depends(verify_bearer),
):
"""Query delegation status (SS8.3). Delegator or delegate can view."""
d = await db_get(delegation_id)
if not d:
raise HTTPException(404, "Delegation not found")
if d.delegator_did != auth.principal_did and d.agent_did != auth.principal_did:
raise HTTPException(403, "Not authorized to view this delegation")
now = datetime.now(UTC).replace(tzinfo=None)
ttl_remaining = max(0, int((d.expires_at - now).total_seconds()))
return DelegationInfo(
delegation_id=d.delegation_id,
status=DelegationStatus(d.status),
agent_did=d.agent_did,
agent_type=d.agent_type,
delegator_did=d.delegator_did,
commands_executed=d.commands_executed,
commands_remaining=max(0, d.max_commands - d.commands_executed),
ttl_remaining_seconds=ttl_remaining,
created_at=d.created_at.isoformat(),
expires_at=d.expires_at.isoformat(),
)
@router.get("/", response_model=AgentListResponse)
async def list_delegations(
auth: AuthResult = Depends(verify_bearer),
):
"""List active delegations for the authenticated principal (SS8.4)."""
active = await get_active_delegations()
now = datetime.now(UTC).replace(tzinfo=None)
# Filter to only this principal's delegations
mine = [d for d in active if d.delegator_did == auth.principal_did]
return AgentListResponse(
active_delegations=[
ActiveDelegation(
delegation_id=d.delegation_id,
agent_type=d.agent_type,
delegator=d.delegator_did,
commands_executed=d.commands_executed,
ttl_remaining_seconds=max(
0, int((d.expires_at - now).total_seconds())
),
status=d.status,
)
for d in mine
],
total_active=len(mine),
)
async def _find_delegation_by_ac(ac_id: str):
"""Find a delegation that was created from this AC (for depth tracking)."""
from .storage import get_active_delegations
all_delegations = await get_active_delegations()
for d in all_delegations:
if d.delegated_ac_id == ac_id:
return d
return None