"""GSAP broker client — requests delegated ACs via on_behalf_of. Uses the same /governance/authorize/ endpoint as any other AC request, with on_behalf_of set to the agent's DID. """ import logging import httpx logger = logging.getLogger(__name__) class GSAPClient: def __init__(self, broker_url: str, bearer_token: str): self.broker_url = broker_url.rstrip("/") self.bearer_token = bearer_token async def request_delegated_ac( self, delegator_ac_id: str, agent_did: str, delegation_id: str, corpus_entry_cid: str, capability_ceiling: str, ttl_minutes: int, ) -> dict: """Request an AC for the agent, delegated from the human's AC.""" if not self.bearer_token: logger.info("GSAP broker not configured — dev mode stub AC for %s", delegation_id) return { "status": "authorized", "context_id": f"ac-dev-{delegation_id}", "principal_did": agent_did, "delegation_id": delegation_id, "capability_ceiling": capability_ceiling, } headers = {} if self.bearer_token: headers["Authorization"] = f"Bearer {self.bearer_token}" request_body = { "driver_id": "keycloak", "principal": agent_did, "playbook": f"delegation:{delegation_id}", "corpus_entry_cid": corpus_entry_cid, "parameters_cid": f"sha256:delegation-{delegation_id}", "accord_template": "ai-delegation-standard", "session_mode": "delegation", "on_behalf_of": agent_did, } async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.post( f"{self.broker_url}/governance/authorize/", json=request_body, headers=headers, ) resp.raise_for_status() data = resp.json() logger.info("Delegated AC issued: %s for %s", delegation_id, agent_did) return data async def validate_ac(self, poll_token: str) -> dict | None: """Validate that the delegator's AC is still active.""" async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get( f"{self.broker_url}/governance/authorize/{poll_token}/", ) if resp.status_code == 200: return resp.json() return None