# Copyright 2026 Guildhouse Dev # SPDX-License-Identifier: Apache-2.0 """Shared bearer token authentication for Bastion. Provides a FastAPI dependency (``verify_bearer``) that: 1. Extracts the bearer token from the Authorization header 2. Inspects the token's issuer claim (unverified peek) to determine which identity driver to use 3. Verifies the token via the appropriate driver's JWKSVerifier 4. Returns a verified AuthResult with principal_did, roles, device_id Fix C-4: MCP endpoint uses this dependency. Fix C-8: Delegation endpoint uses this dependency. SECURITY: The issuer peek (step 2) is done WITHOUT verification to determine which JWKS endpoint to use. The actual claims are ONLY trusted after full JWKSVerifier validation in step 3. """ from __future__ import annotations import base64 import json import logging from typing import Optional from fastapi import Depends, HTTPException, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from gsap_broker.drivers.base import AuthResult from gsap_broker.drivers.jwks import AuthenticationError, JWKSVerifier from gsap_broker.settings import settings logger = logging.getLogger(__name__) _bearer_scheme = HTTPBearer(auto_error=True) # ── Issuer → verifier mapping, built at init time ──────────────── _verifiers: dict[str, JWKSVerifier] = {} def init_authenticator() -> None: """Build issuer → JWKSVerifier mapping from settings. Called once at broker startup. """ global _verifiers _verifiers = {} # Keycloak if settings.keycloak_url and settings.keycloak_realm: kc_issuer = f"{settings.keycloak_url}/realms/{settings.keycloak_realm}" _verifiers[kc_issuer] = JWKSVerifier( jwks_url=f"{kc_issuer}/protocol/openid-connect/certs", audience=settings.keycloak_admin_client_id, issuer=kc_issuer, ) logger.info("Auth middleware: Keycloak issuer registered (%s)", kc_issuer) # Entra if settings.entra_tenant_id: entra_issuer = f"https://login.microsoftonline.com/{settings.entra_tenant_id}/v2.0" _verifiers[entra_issuer] = JWKSVerifier( jwks_url=f"https://login.microsoftonline.com/{settings.entra_tenant_id}/discovery/v2.0/keys", audience=settings.entra_client_id, issuer=entra_issuer, ) logger.info("Auth middleware: Entra issuer registered (%s)", entra_issuer) def _peek_issuer(token: str) -> str: """Base64-decode the JWT payload to read the iss claim. SECURITY: This is an UNVERIFIED peek. The iss value is used ONLY to select the JWKS endpoint. No other claims are trusted. """ try: parts = token.split(".") if len(parts) != 3: raise ValueError("Not a JWT (expected 3 parts)") payload = parts[1] payload += "=" * (4 - len(payload) % 4) claims = json.loads(base64.urlsafe_b64decode(payload)) iss = claims.get("iss", "") if not iss: raise ValueError("Token has no iss claim") return iss except Exception as e: raise HTTPException(status_code=401, detail=f"Malformed token: {e}") async def _authenticate_token(token: str) -> AuthResult: """Verify a bearer token and return the authenticated identity.""" if not _verifiers: raise HTTPException( status_code=500, detail="No identity drivers configured. Call init_authenticator() at startup.", ) issuer = _peek_issuer(token) verifier = _verifiers.get(issuer) if verifier is None: raise HTTPException( status_code=401, detail=f"Unknown token issuer: {issuer}", ) try: claims = await verifier.verify_or_refresh(token) except AuthenticationError as e: raise HTTPException(status_code=401, detail=str(e)) # Build AuthResult from verified claims oid = claims.get("oid") or claims.get("sub", "") domain = settings.keycloak_domain # Determine DID format based on issuer type if "microsoftonline.com" in issuer: principal_did = f"did:web:{domain}:principal:{oid}" else: alias = claims.get("preferred_username", oid) template = settings.keycloak_did_template principal_did = template.format(stable_id=oid, domain=domain, alias=alias) roles = claims.get("roles", []) realm_roles = claims.get("realm_access", {}).get("roles", []) all_roles = roles + realm_roles amr = claims.get("amr", []) device_id = claims.get("deviceid") or claims.get("device_id") return AuthResult( status=AuthResult.STATUS_AUTHORIZED, principal_did=principal_did, display_name=claims.get("name") or claims.get("preferred_username", ""), stable_id=oid, token_jti=claims.get("jti", ""), elevation_active=[r for r in all_roles if r.endswith(settings.keycloak_elevated_role_suffix)], mfa_satisfied="mfa" in amr or "otp" in amr or "ngcmfa" in amr, device_id=device_id, ) async def verify_bearer( credentials: HTTPAuthorizationCredentials = Depends(_bearer_scheme), ) -> AuthResult: """FastAPI dependency for mandatory bearer token authentication. Usage:: @router.post("/endpoint/") async def handler(auth: AuthResult = Depends(verify_bearer)): print(auth.principal_did) # verified identity Returns AuthResult with verified claims. Raises 401 if token is missing, malformed, or invalid. """ return await _authenticate_token(credentials.credentials) async def optional_bearer(request: Request) -> Optional[AuthResult]: """FastAPI dependency for optional authentication. Returns AuthResult if a valid bearer token is present. Returns None if no Authorization header is provided. Raises 401 if a token IS provided but is invalid. """ auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return None token = auth_header[7:] return await _authenticate_token(token)