# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """FastAPI router for delegation lifecycle. Fix C-8: All endpoints require bearer token authentication. Fix H-7: Delegation depth enforced via max_delegation_depth. Endpoints: POST /delegations/ create_delegation SS8.1 POST /delegations/{id}/revoke revoke_delegation SS8.2 GET /delegations/{id} get_delegation SS8.3 GET /delegations/ list_delegations SS8.4 """ from datetime import datetime, UTC from fastapi import APIRouter, Depends, HTTPException from gsap_broker.auth.middleware import verify_bearer from gsap_broker.drivers.base import AuthResult from gsap_broker.settings import settings from .lifecycle import DelegationManager from .models import ( ActiveDelegation, AgentListResponse, DelegationInfo, DelegationRequest, DelegationResponse, DelegationStatus, RevokeRequest, RevokeResponse, ) from .storage import get_active_delegations, get_delegation as db_get router = APIRouter(prefix="/delegations", tags=["Delegations"]) manager = DelegationManager() @router.post("/", response_model=DelegationResponse) async def create_delegation( request: DelegationRequest, auth: AuthResult = Depends(verify_bearer), ): """Request delegation of authority to an AI agent (SS8.1). Fix C-8: delegator_did derived from verified token. Fix H-7: delegation depth enforced. """ delegator_did = auth.principal_did # Validate delegator_ac_id belongs to this principal from gsap_broker.db import engine from sqlmodel.ext.asyncio.session import AsyncSession from sqlalchemy import text async with AsyncSession(engine) as session: result = await session.execute( text("SELECT principal_did, capability_mask FROM authorization_contexts WHERE context_id = :ctx_id AND status IN ('authorized', 'active')"), {"ctx_id": request.delegator_ac_id.replace("-", "")}, ) ac_row = result.first() if not ac_row or ac_row[0] != delegator_did: raise HTTPException(403, "delegator_ac_id not found or does not belong to authenticated principal") delegator_cap = ac_row[1] if ac_row[1] else 0x7 # Fix H-7: check delegation depth parent_delegation = await _find_delegation_by_ac(request.delegator_ac_id) new_depth = (parent_delegation.depth + 1) if parent_delegation and hasattr(parent_delegation, "depth") else 0 if new_depth >= settings.max_delegation_depth: raise HTTPException( 403, f"Maximum delegation depth ({settings.max_delegation_depth}) exceeded. " f"Current depth: {new_depth}", ) try: result = await manager.create_delegation( request, delegator_did, delegator_capability_mask=delegator_cap, ) return result except ValueError as e: raise HTTPException(403, str(e)) except RuntimeError as e: raise HTTPException(502, str(e)) @router.post("/{delegation_id}/revoke", response_model=RevokeResponse) async def revoke_delegation( delegation_id: str, request: RevokeRequest = RevokeRequest(), auth: AuthResult = Depends(verify_bearer), ): """Revoke an active delegation (SS8.2). Only the delegator can revoke.""" delegation = await db_get(delegation_id) if not delegation: raise HTTPException(404, "Delegation not found") if delegation.delegator_did != auth.principal_did: raise HTTPException(403, "Only the delegator can revoke this delegation") success = await manager.revoke_delegation(delegation_id, request.reason) if not success: raise HTTPException(404, "Delegation not found or not active") return RevokeResponse(delegation_id=delegation_id, reason=request.reason) @router.get("/{delegation_id}", response_model=DelegationInfo) async def get_delegation( delegation_id: str, auth: AuthResult = Depends(verify_bearer), ): """Query delegation status (SS8.3). Delegator or delegate can view.""" d = await db_get(delegation_id) if not d: raise HTTPException(404, "Delegation not found") if d.delegator_did != auth.principal_did and d.agent_did != auth.principal_did: raise HTTPException(403, "Not authorized to view this delegation") now = datetime.now(UTC).replace(tzinfo=None) ttl_remaining = max(0, int((d.expires_at - now).total_seconds())) return DelegationInfo( delegation_id=d.delegation_id, status=DelegationStatus(d.status), agent_did=d.agent_did, agent_type=d.agent_type, delegator_did=d.delegator_did, commands_executed=d.commands_executed, commands_remaining=max(0, d.max_commands - d.commands_executed), ttl_remaining_seconds=ttl_remaining, created_at=d.created_at.isoformat(), expires_at=d.expires_at.isoformat(), ) @router.get("/", response_model=AgentListResponse) async def list_delegations( auth: AuthResult = Depends(verify_bearer), ): """List active delegations for the authenticated principal (SS8.4).""" active = await get_active_delegations() now = datetime.now(UTC).replace(tzinfo=None) # Filter to only this principal's delegations mine = [d for d in active if d.delegator_did == auth.principal_did] return AgentListResponse( active_delegations=[ ActiveDelegation( delegation_id=d.delegation_id, agent_type=d.agent_type, delegator=d.delegator_did, commands_executed=d.commands_executed, ttl_remaining_seconds=max( 0, int((d.expires_at - now).total_seconds()) ), status=d.status, ) for d in mine ], total_active=len(mine), ) async def _find_delegation_by_ac(ac_id: str): """Find a delegation that was created from this AC (for depth tracking).""" from .storage import get_active_delegations all_delegations = await get_active_delegations() for d in all_delegations: if d.delegated_ac_id == ac_id: return d return None