# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Keycloak identity driver — GSAP §2.2. Fix C-1: JWT signatures are now verified via JWKS. Previously this driver accepted any base64-decoded JWT without signature verification. Now uses shared JWKSVerifier. """ import logging from .base import IdentityDriver, AuthResult, ElevationRequired from .jwks import AuthenticationError, JWKSVerifier logger = logging.getLogger(__name__) class KeycloakDriver(IdentityDriver): async def authenticate(self) -> AuthResult: raw_token = self.config.get("_raw_token", "") keycloak_url = self.config.get("keycloak_url", "http://localhost:8080") keycloak_realm = self.config.get("keycloak_realm", "substrate") keycloak_client_id = self.config.get("keycloak_client_id", "") # Fix C-1: verify JWT signature via JWKS before trusting claims. if raw_token and keycloak_url and keycloak_realm: verifier = JWKSVerifier( jwks_url=f"{keycloak_url}/realms/{keycloak_realm}/protocol/openid-connect/certs", audience=keycloak_client_id, issuer=f"{keycloak_url}/realms/{keycloak_realm}", ) try: token_data = await verifier.verify_or_refresh(raw_token) except AuthenticationError as e: return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason=str(e), ) else: # No raw token or no Keycloak config — cannot verify return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason="No token or Keycloak configuration missing.", ) realm_roles = token_data.get("realm_access", {}).get("roles", []) requested_accord = self.config.get("requested_accord", "") required_role = self.config.get("accord_roles", {}).get(requested_accord, "") suffix = self.config.get("elevated_suffix", "-elevated") elevation_active = [r for r in realm_roles if r.endswith(suffix)] if required_role and required_role not in realm_roles: return AuthResult( status=AuthResult.STATUS_PENDING_ELEVATION, elevation_required=ElevationRequired( role=required_role, activation_url="/governance/elevate/", instructions=f"Request elevation to '{required_role}' via POST /governance/elevate/", )) stable_id = token_data.get("sub", "") template = self.config.get("did_template", "did:web:{domain}/principal/{alias}") domain = self.config.get("domain", "example.com") alias = token_data.get("preferred_username", stable_id) return AuthResult( status=AuthResult.STATUS_AUTHORIZED, principal_did=template.format(stable_id=stable_id, domain=domain, alias=alias), display_name=token_data.get("name", alias), stable_id=stable_id, token_jti=token_data.get("jti", ""), elevation_active=elevation_active, mfa_satisfied="otp" in token_data.get("amr", []) or "mfa" in token_data.get("amr", []), ) async def revoke(self, session_id: str) -> None: logger.info(f"Keycloak revoke: {session_id}")