C-1: Keycloak driver now verifies JWT signatures via JWKS.
Forged tokens are rejected. Previously any base64 JWT was accepted.
C-2: on_behalf_of requires gsap:impersonate role in JWT claims.
C-3: Entra driver denies on JWKS failure (no unverified fallback).
H-10: JWKS cache refreshes on kid miss for key rotation.
Shared JWKSVerifier used by both drivers. alg=none blocked.
iss, aud, exp validated for all tokens.
Signed-off-by: Tyler King <tking@guildhouse.dev>
76 lines
3.3 KiB
Python
76 lines
3.3 KiB
Python
# Copyright 2026 Guildhouse Dev
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""Keycloak identity driver — GSAP §2.2.
|
|
|
|
Fix C-1: JWT signatures are now verified via JWKS.
|
|
Previously this driver accepted any base64-decoded JWT without
|
|
signature verification. Now uses shared JWKSVerifier.
|
|
"""
|
|
|
|
import logging
|
|
from .base import IdentityDriver, AuthResult, ElevationRequired
|
|
from .jwks import AuthenticationError, JWKSVerifier
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KeycloakDriver(IdentityDriver):
|
|
async def authenticate(self) -> AuthResult:
|
|
raw_token = self.config.get("_raw_token", "")
|
|
keycloak_url = self.config.get("keycloak_url", "http://localhost:8080")
|
|
keycloak_realm = self.config.get("keycloak_realm", "substrate")
|
|
keycloak_client_id = self.config.get("keycloak_client_id", "")
|
|
|
|
# Fix C-1: verify JWT signature via JWKS before trusting claims.
|
|
if raw_token and keycloak_url and keycloak_realm:
|
|
verifier = JWKSVerifier(
|
|
jwks_url=f"{keycloak_url}/realms/{keycloak_realm}/protocol/openid-connect/certs",
|
|
audience=keycloak_client_id,
|
|
issuer=f"{keycloak_url}/realms/{keycloak_realm}",
|
|
)
|
|
try:
|
|
token_data = await verifier.verify_or_refresh(raw_token)
|
|
except AuthenticationError as e:
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_DENIED,
|
|
denial_reason=str(e),
|
|
)
|
|
else:
|
|
# No raw token or no Keycloak config — cannot verify
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_DENIED,
|
|
denial_reason="No token or Keycloak configuration missing.",
|
|
)
|
|
|
|
realm_roles = token_data.get("realm_access", {}).get("roles", [])
|
|
requested_accord = self.config.get("requested_accord", "")
|
|
required_role = self.config.get("accord_roles", {}).get(requested_accord, "")
|
|
suffix = self.config.get("elevated_suffix", "-elevated")
|
|
elevation_active = [r for r in realm_roles if r.endswith(suffix)]
|
|
|
|
if required_role and required_role not in realm_roles:
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_PENDING_ELEVATION,
|
|
elevation_required=ElevationRequired(
|
|
role=required_role,
|
|
activation_url="/governance/elevate/",
|
|
instructions=f"Request elevation to '{required_role}' via POST /governance/elevate/",
|
|
))
|
|
|
|
stable_id = token_data.get("sub", "")
|
|
template = self.config.get("did_template", "did:web:{domain}/principal/{alias}")
|
|
domain = self.config.get("domain", "example.com")
|
|
alias = token_data.get("preferred_username", stable_id)
|
|
|
|
return AuthResult(
|
|
status=AuthResult.STATUS_AUTHORIZED,
|
|
principal_did=template.format(stable_id=stable_id, domain=domain, alias=alias),
|
|
display_name=token_data.get("name", alias),
|
|
stable_id=stable_id, token_jti=token_data.get("jti", ""),
|
|
elevation_active=elevation_active,
|
|
mfa_satisfied="otp" in token_data.get("amr", []) or "mfa" in token_data.get("amr", []),
|
|
)
|
|
|
|
async def revoke(self, session_id: str) -> None:
|
|
logger.info(f"Keycloak revoke: {session_id}")
|