fastapi-gsap/gsap_broker/auth/middleware.py
Tyler J King 782f5654ac fix: shared bearer auth, delegation depth, SQLite permissions
C-4: MCP endpoint requires verified bearer token. Unauthenticated
     requests rejected. _extract_principal() replaced by verified
     AuthResult from middleware.
C-8: All delegation endpoints require verified bearer token.
     X-Delegator-DID header removed — identity from token only.
     delegator_ac_id validated to belong to authenticated principal.
     Only delegators can revoke. Only delegator/delegate can view.
H-6: SQLite file permissions restricted to 0o600 (owner-only).
     Umask set before creation. WAL/SHM files also restricted.
H-7: Delegation depth tracked and enforced against max_delegation_depth.
     Sub-delegations increment depth. Exceeded depth → 403.

Shared TokenAuthenticator auto-detects identity driver from JWT
issuer claim (Keycloak or Entra). verify_bearer FastAPI dependency
for all protected endpoints. Health endpoint remains public.

ALL 10 critical findings CLOSED. ALL 10 high findings CLOSED.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 17:31:46 -04:00

175 lines
6 KiB
Python

# Copyright 2026 Guildhouse Dev
# SPDX-License-Identifier: Apache-2.0
"""Shared bearer token authentication for Bastion.
Provides a FastAPI dependency (``verify_bearer``) that:
1. Extracts the bearer token from the Authorization header
2. Inspects the token's issuer claim (unverified peek) to
determine which identity driver to use
3. Verifies the token via the appropriate driver's JWKSVerifier
4. Returns a verified AuthResult with principal_did, roles, device_id
Fix C-4: MCP endpoint uses this dependency.
Fix C-8: Delegation endpoint uses this dependency.
SECURITY: The issuer peek (step 2) is done WITHOUT verification
to determine which JWKS endpoint to use. The actual claims are
ONLY trusted after full JWKSVerifier validation in step 3.
"""
from __future__ import annotations
import base64
import json
import logging
from typing import Optional
from fastapi import Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from gsap_broker.drivers.base import AuthResult
from gsap_broker.drivers.jwks import AuthenticationError, JWKSVerifier
from gsap_broker.settings import settings
logger = logging.getLogger(__name__)
_bearer_scheme = HTTPBearer(auto_error=True)
# ── Issuer → verifier mapping, built at init time ────────────────
_verifiers: dict[str, JWKSVerifier] = {}
def init_authenticator() -> None:
"""Build issuer → JWKSVerifier mapping from settings.
Called once at broker startup.
"""
global _verifiers
_verifiers = {}
# Keycloak
if settings.keycloak_url and settings.keycloak_realm:
kc_issuer = f"{settings.keycloak_url}/realms/{settings.keycloak_realm}"
_verifiers[kc_issuer] = JWKSVerifier(
jwks_url=f"{kc_issuer}/protocol/openid-connect/certs",
audience=settings.keycloak_admin_client_id,
issuer=kc_issuer,
)
logger.info("Auth middleware: Keycloak issuer registered (%s)", kc_issuer)
# Entra
if settings.entra_tenant_id:
entra_issuer = f"https://login.microsoftonline.com/{settings.entra_tenant_id}/v2.0"
_verifiers[entra_issuer] = JWKSVerifier(
jwks_url=f"https://login.microsoftonline.com/{settings.entra_tenant_id}/discovery/v2.0/keys",
audience=settings.entra_client_id,
issuer=entra_issuer,
)
logger.info("Auth middleware: Entra issuer registered (%s)", entra_issuer)
def _peek_issuer(token: str) -> str:
"""Base64-decode the JWT payload to read the iss claim.
SECURITY: This is an UNVERIFIED peek. The iss value is used ONLY
to select the JWKS endpoint. No other claims are trusted.
"""
try:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Not a JWT (expected 3 parts)")
payload = parts[1]
payload += "=" * (4 - len(payload) % 4)
claims = json.loads(base64.urlsafe_b64decode(payload))
iss = claims.get("iss", "")
if not iss:
raise ValueError("Token has no iss claim")
return iss
except Exception as e:
raise HTTPException(status_code=401, detail=f"Malformed token: {e}")
async def _authenticate_token(token: str) -> AuthResult:
"""Verify a bearer token and return the authenticated identity."""
if not _verifiers:
raise HTTPException(
status_code=500,
detail="No identity drivers configured. Call init_authenticator() at startup.",
)
issuer = _peek_issuer(token)
verifier = _verifiers.get(issuer)
if verifier is None:
raise HTTPException(
status_code=401,
detail=f"Unknown token issuer: {issuer}",
)
try:
claims = await verifier.verify_or_refresh(token)
except AuthenticationError as e:
raise HTTPException(status_code=401, detail=str(e))
# Build AuthResult from verified claims
oid = claims.get("oid") or claims.get("sub", "")
domain = settings.keycloak_domain
# Determine DID format based on issuer type
if "microsoftonline.com" in issuer:
principal_did = f"did:web:{domain}:principal:{oid}"
else:
alias = claims.get("preferred_username", oid)
template = settings.keycloak_did_template
principal_did = template.format(stable_id=oid, domain=domain, alias=alias)
roles = claims.get("roles", [])
realm_roles = claims.get("realm_access", {}).get("roles", [])
all_roles = roles + realm_roles
amr = claims.get("amr", [])
device_id = claims.get("deviceid") or claims.get("device_id")
return AuthResult(
status=AuthResult.STATUS_AUTHORIZED,
principal_did=principal_did,
display_name=claims.get("name") or claims.get("preferred_username", ""),
stable_id=oid,
token_jti=claims.get("jti", ""),
elevation_active=[r for r in all_roles if r.endswith(settings.keycloak_elevated_role_suffix)],
mfa_satisfied="mfa" in amr or "otp" in amr or "ngcmfa" in amr,
device_id=device_id,
)
async def verify_bearer(
credentials: HTTPAuthorizationCredentials = Depends(_bearer_scheme),
) -> AuthResult:
"""FastAPI dependency for mandatory bearer token authentication.
Usage::
@router.post("/endpoint/")
async def handler(auth: AuthResult = Depends(verify_bearer)):
print(auth.principal_did) # verified identity
Returns AuthResult with verified claims.
Raises 401 if token is missing, malformed, or invalid.
"""
return await _authenticate_token(credentials.credentials)
async def optional_bearer(request: Request) -> Optional[AuthResult]:
"""FastAPI dependency for optional authentication.
Returns AuthResult if a valid bearer token is present.
Returns None if no Authorization header is provided.
Raises 401 if a token IS provided but is invalid.
"""
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return None
token = auth_header[7:]
return await _authenticate_token(token)