Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 22s
143 lines
3.6 KiB
Python
143 lines
3.6 KiB
Python
import base64
|
|
|
|
import nacl.secret
|
|
|
|
|
|
def string_to_hex(text: str) -> str:
|
|
"""
|
|
Convert a string to hexadecimal representation.
|
|
|
|
Args:
|
|
text: Input string to convert
|
|
|
|
Returns:
|
|
Hexadecimal string representation
|
|
"""
|
|
return text.encode("utf-8").hex()
|
|
|
|
|
|
def hex_to_string(hex_string: str) -> str | None:
|
|
"""
|
|
Convert a hexadecimal string back to regular string.
|
|
|
|
Args:
|
|
hex_string: Hexadecimal string to convert
|
|
|
|
Returns:
|
|
Decoded string
|
|
|
|
Raises:
|
|
ValueError: If hex_string is not valid hexadecimal
|
|
"""
|
|
return bytes.fromhex(hex_string).decode("utf-8")
|
|
|
|
|
|
def decrypt_vault(vault: str, key: str) -> str:
|
|
"""
|
|
Decrypt a vault string encrypted with PyNaCl SecretBox.
|
|
|
|
Vault format: "vault:v1:<base64_encrypted_data>"
|
|
|
|
Args:
|
|
vault: Vault-formatted string (e.g., "vault:v1:eW91cl9lbmNyeXB0ZWRfZGF0YQ==")
|
|
key: Hex string representation of 32-byte encryption key
|
|
|
|
Returns:
|
|
Decrypted string
|
|
|
|
Raises:
|
|
ValueError: If vault format is invalid or key is not valid hex
|
|
nacl.exceptions.CryptoError: If decryption fails (wrong key or corrupted data)
|
|
"""
|
|
# Parse vault format
|
|
parts = vault.split(":", 2)
|
|
if len(parts) != 3 or parts[0] != "vault" or parts[1] != "v1":
|
|
raise ValueError("Invalid vault format. Expected 'vault:v1:<encrypted_data>'")
|
|
|
|
encrypted_string = parts[2]
|
|
# Convert hex string key to bytes
|
|
key_bytes = bytes.fromhex(key)
|
|
|
|
# Create a SecretBox instance with the key
|
|
box = nacl.secret.SecretBox(key_bytes)
|
|
|
|
# Decode the base64-encoded encrypted string
|
|
encrypted_data = base64.b64decode(encrypted_string)
|
|
|
|
# Decrypt the data
|
|
decrypted_bytes = box.decrypt(encrypted_data)
|
|
|
|
# Convert bytes to string
|
|
return decrypted_bytes.decode("utf-8")
|
|
|
|
|
|
def encrypt_vault(plaintext: str, key: str) -> str:
|
|
"""
|
|
Encrypt a string and return it in vault format.
|
|
|
|
Args:
|
|
plaintext: String to encrypt
|
|
key: Hex string representation of 32-byte encryption key
|
|
|
|
Returns:
|
|
Vault-formatted encrypted string (e.g., "vault:v1:<encrypted_data>")
|
|
|
|
Raises:
|
|
ValueError: If key is not valid hex string
|
|
"""
|
|
# Convert hex string key to bytes
|
|
key_bytes = bytes.fromhex(key)
|
|
|
|
# Create a SecretBox instance with the key
|
|
box = nacl.secret.SecretBox(key_bytes)
|
|
|
|
# Encrypt the data
|
|
encrypted = box.encrypt(plaintext.encode("utf-8"))
|
|
|
|
# Encode to base64
|
|
encrypted_string = base64.b64encode(encrypted).decode("utf-8")
|
|
|
|
# Return in vault format
|
|
return f"vault:v1:{encrypted_string}"
|
|
|
|
|
|
def is_valid_vault_format(vault: str) -> bool:
|
|
"""
|
|
Check if a string is in valid vault format.
|
|
|
|
Vault format: "vault:v1:<base64_encrypted_data>"
|
|
|
|
Args:
|
|
vault: String to validate
|
|
|
|
Returns:
|
|
True if the string matches vault format structure, False otherwise
|
|
|
|
Note:
|
|
This only checks the format structure, not whether the data can be decrypted
|
|
"""
|
|
# Parse vault format
|
|
parts = vault.split(":", 2)
|
|
|
|
# Check basic structure: vault:v1:<data>
|
|
if len(parts) != 3 or parts[0] != "vault" or parts[1] != "v1":
|
|
return False
|
|
|
|
encrypted_data = parts[2]
|
|
|
|
# Check if data part is not empty
|
|
if not encrypted_data:
|
|
return False
|
|
|
|
# Check if data is valid base64
|
|
try:
|
|
decoded = base64.b64decode(encrypted_data)
|
|
except Exception:
|
|
return False
|
|
|
|
# Check if decoded data has at least nonce bytes (24 bytes for NaCl)
|
|
if len(decoded) < nacl.secret.SecretBox.NONCE_SIZE:
|
|
return False
|
|
|
|
return True
|