# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Entra identity driver — GSAP §2.2. Validates Entra-issued JWTs directly via JWKS verification. Extracts device_id for compliance gating, MFA status, roles, and constructs DID from Entra tenant + oid. Fix C-3: JWKS fetch failure results in denial. Never falls back to unverified claims. Fix H-10: JWKS cache refreshes on kid miss for key rotation. """ import logging from typing import Optional from .base import AuthResult, ElevationRequired, IdentityDriver from .jwks import AuthenticationError, JWKSVerifier logger = logging.getLogger(__name__) class EntraDriver(IdentityDriver): """Identity driver for direct Entra JWT validation.""" async def authenticate(self) -> AuthResult: raw_token = self.config.get("_raw_token", "") tenant_id = self.config.get("entra_tenant_id", "") expected_audience = self.config.get("entra_client_id", "") if not raw_token: return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason="No token in context.", ) if not tenant_id: return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason="Entra tenant_id not configured.", ) # Fix C-3: verify via JWKS — no fallback on failure verifier = JWKSVerifier( jwks_url=f"https://login.microsoftonline.com/{tenant_id}/discovery/v2.0/keys", audience=expected_audience, issuer=f"https://login.microsoftonline.com/{tenant_id}/v2.0", ) try: token_data = await verifier.verify_or_refresh(raw_token) except AuthenticationError as e: return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason=str(e), ) # Extract claims from VERIFIED token data oid = token_data.get("oid", "") tid = token_data.get("tid", tenant_id) roles = token_data.get("roles", []) acrs = token_data.get("acrs", []) amr = token_data.get("amr", []) device_id = token_data.get("deviceid") or token_data.get("device_id") upn = token_data.get("preferred_username") or token_data.get("upn", "") display_name = token_data.get("name", upn) if not oid: return AuthResult( status=AuthResult.STATUS_DENIED, denial_reason="Token missing oid claim.", ) # Check role requirement for requested accord requested_accord = self.config.get("requested_accord", "") required_role = self.config.get("accord_roles", {}).get(requested_accord, "") suffix = self.config.get("elevated_suffix", "-elevated") elevation_active = [r for r in roles if r.endswith(suffix)] if required_role and required_role not in roles: return AuthResult( status=AuthResult.STATUS_PENDING_ELEVATION, elevation_required=ElevationRequired( role=required_role, activation_url="/governance/elevate/", instructions=f"Request elevation to '{required_role}' via POST /governance/elevate/", mechanism="entra_pim", ), ) # Construct DID domain = self.config.get("domain", "guildhouse.dev") principal_did = f"did:web:{domain}:principal:{oid}" # MFA detection mfa_satisfied = "mfa" in amr or "ngcmfa" in amr return AuthResult( status=AuthResult.STATUS_AUTHORIZED, principal_did=principal_did, display_name=display_name, stable_id=oid, token_jti=token_data.get("jti", ""), elevation_active=elevation_active, mfa_satisfied=mfa_satisfied, device_id=device_id, ) async def revoke(self, session_id: str) -> None: logger.info("Entra revoke: %s", session_id)