fastapi-gsap/.venv/lib/python3.12/site-packages/jwt/utils.py
Tyler J King e744336385 fix: capability enforcement, credential safety, atomic delegations, input validation
C-6: ConnectorRuntime enforces capability_mask per operation.
     READ-only ACs cannot invoke MUTATE operations (wipe, lock, retire).
C-7: AC validated against database (exists, active, not expired)
     before connector invocation.
C-9: Delegated AC capability bounded by delegator's capability.
C-10: Command counter uses atomic SQL increment with limit check.
M-23: expire_stale() uses same atomic SQL pattern.

H-1: Sensitive credential fields hidden from repr/logs via repr=False.
H-2: Stub backend requires ALLOW_STUB_CREDENTIALS=true to activate.
H-3: Kerberos backend raises CredentialResolutionError instead of
     returning stub ticket.
H-4: Chronicle INTENT emitted before execution, RESULT after.
H-5: device_id validated as UUID before Graph API URL interpolation.
H-8: ConnectorRuntime enforces governance for all connector invocations.

Signed-off-by: Tyler King <tking@guildhouse.dev>
2026-04-14 08:13:27 -04:00

142 lines
3.6 KiB
Python

import base64
import binascii
import re
from typing import Optional, Union
try:
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature,
encode_dss_signature,
)
except ModuleNotFoundError:
pass
def force_bytes(value: Union[bytes, str]) -> bytes:
if isinstance(value, str):
return value.encode("utf-8")
elif isinstance(value, bytes):
return value
else:
raise TypeError("Expected a string value")
def base64url_decode(input: Union[bytes, str]) -> bytes:
input_bytes = force_bytes(input)
rem = len(input_bytes) % 4
if rem > 0:
input_bytes += b"=" * (4 - rem)
return base64.urlsafe_b64decode(input_bytes)
def base64url_encode(input: bytes) -> bytes:
return base64.urlsafe_b64encode(input).replace(b"=", b"")
def to_base64url_uint(val: int, *, bit_length: Optional[int] = None) -> bytes:
if val < 0:
raise ValueError("Must be a positive integer")
int_bytes = bytes_from_int(val, bit_length=bit_length)
if len(int_bytes) == 0:
int_bytes = b"\x00"
return base64url_encode(int_bytes)
def from_base64url_uint(val: Union[bytes, str]) -> int:
data = base64url_decode(force_bytes(val))
return int.from_bytes(data, byteorder="big")
def number_to_bytes(num: int, num_bytes: int) -> bytes:
padded_hex = "%0*x" % (2 * num_bytes, num)
return binascii.a2b_hex(padded_hex.encode("ascii"))
def bytes_to_number(string: bytes) -> int:
return int(binascii.b2a_hex(string), 16)
def bytes_from_int(val: int, *, bit_length: Optional[int] = None) -> bytes:
if bit_length is None:
bit_length = val.bit_length()
byte_length = (bit_length + 7) // 8
return val.to_bytes(byte_length, "big", signed=False)
def der_to_raw_signature(der_sig: bytes, curve: "EllipticCurve") -> bytes:
num_bits = curve.key_size
num_bytes = (num_bits + 7) // 8
r, s = decode_dss_signature(der_sig)
return number_to_bytes(r, num_bytes) + number_to_bytes(s, num_bytes)
def raw_to_der_signature(raw_sig: bytes, curve: "EllipticCurve") -> bytes:
num_bits = curve.key_size
num_bytes = (num_bits + 7) // 8
if len(raw_sig) != 2 * num_bytes:
raise ValueError("Invalid signature")
r = bytes_to_number(raw_sig[:num_bytes])
s = bytes_to_number(raw_sig[num_bytes:])
return bytes(encode_dss_signature(r, s))
# Based on https://github.com/hynek/pem/blob/7ad94db26b0bc21d10953f5dbad3acfdfacf57aa/src/pem/_core.py#L224-L252
_PEMS = {
b"CERTIFICATE",
b"TRUSTED CERTIFICATE",
b"PRIVATE KEY",
b"PUBLIC KEY",
b"ENCRYPTED PRIVATE KEY",
b"OPENSSH PRIVATE KEY",
b"DSA PRIVATE KEY",
b"RSA PRIVATE KEY",
b"RSA PUBLIC KEY",
b"EC PRIVATE KEY",
b"DH PARAMETERS",
b"NEW CERTIFICATE REQUEST",
b"CERTIFICATE REQUEST",
b"SSH2 PUBLIC KEY",
b"SSH2 ENCRYPTED PRIVATE KEY",
b"X509 CRL",
}
_PEM_RE = re.compile(
b"----[- ]BEGIN ("
+ b"|".join(_PEMS)
+ b""")[- ]----\r?
.+?\r?
----[- ]END \\1[- ]----\r?\n?""",
re.DOTALL,
)
def is_pem_format(key: bytes) -> bool:
return bool(_PEM_RE.search(key))
# Based on https://github.com/pyca/cryptography/blob/bcb70852d577b3f490f015378c75cba74986297b/src/cryptography/hazmat/primitives/serialization/ssh.py#L40-L46
_SSH_KEY_FORMATS = (
b"ssh-ed25519",
b"ssh-rsa",
b"ssh-dss",
b"ecdsa-sha2-nistp256",
b"ecdsa-sha2-nistp384",
b"ecdsa-sha2-nistp521",
)
def is_ssh_key(key: bytes) -> bool:
return key.startswith(_SSH_KEY_FORMATS)