update
Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 22s
Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 22s
This commit is contained in:
0
apps/__init__.py
Normal file
0
apps/__init__.py
Normal file
15
apps/build.sh
Executable file
15
apps/build.sh
Executable file
@@ -0,0 +1,15 @@
|
||||
#!/bin/sh
|
||||
ID=$( grep "^ID=" /etc/os-release | awk -F= '{print $2}' )
|
||||
|
||||
if [ "${ID}" = "debian" ]
|
||||
then
|
||||
apt-get update && apt-get install -y gcc libpq-dev python3-dev
|
||||
else
|
||||
apk update && apk add gcc postgresql-dev python3-dev
|
||||
fi
|
||||
|
||||
if [ -f ${SRC_PKG}/requirements.txt ]
|
||||
then
|
||||
pip3 install -r ${SRC_PKG}/requirements.txt -t ${SRC_PKG}
|
||||
fi
|
||||
cp -r ${SRC_PKG} ${DEPLOY_PKG}
|
||||
238
apps/filter_insert.py
Normal file
238
apps/filter_insert.py
Normal file
@@ -0,0 +1,238 @@
|
||||
import dataclasses
|
||||
import enum
|
||||
import typing
|
||||
import uuid
|
||||
|
||||
from flask import current_app, jsonify, request
|
||||
from helpers import (
|
||||
CORS_HEADERS,
|
||||
db_row_to_dict,
|
||||
db_rows_to_array,
|
||||
init_db_connection,
|
||||
str_to_bool,
|
||||
)
|
||||
from psycopg2 import IntegrityError
|
||||
from pydantic_core import ValidationError
|
||||
from schemas import AiUserCreate
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
```fission
|
||||
{
|
||||
"name": "ai-admin-filter-create-user",
|
||||
"fntimeout": 300,
|
||||
"http_triggers": {
|
||||
"ai-admin-filter-create-user-http": {
|
||||
"url": "/ai/admin/users",
|
||||
"methods": ["POST", "GET"]
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
"""
|
||||
try:
|
||||
if request.method == "GET":
|
||||
return make_filter_request()
|
||||
elif request.method == "POST":
|
||||
return make_insert_request()
|
||||
else:
|
||||
return {"error": "Method not allow"}, 405, CORS_HEADERS
|
||||
except Exception as err:
|
||||
print(f"ErrorType={type(err)}")
|
||||
return {"error": str(err)}, 500, CORS_HEADERS
|
||||
|
||||
|
||||
def make_insert_request():
|
||||
try:
|
||||
body = AiUserCreate(**(request.get_json(silent=True) or {}))
|
||||
except ValidationError as e:
|
||||
return jsonify({"errorCode": "VALIDATION_ERROR", "details": e.errors()}), 400, CORS_HEADERS
|
||||
|
||||
sql = """
|
||||
INSERT INTO public.ai_user (id, name, dob, email, gender)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
RETURNING id, name, dob, email, gender, created, modified
|
||||
"""
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(sql, (str(uuid.uuid4()), body.name,
|
||||
body.dob, body.email, body.gender))
|
||||
row = cur.fetchone()
|
||||
return jsonify(db_row_to_dict(cur, row)), 201, CORS_HEADERS
|
||||
except IntegrityError as e:
|
||||
# vi phạm unique(tag,kind,ref)
|
||||
return jsonify({"errorCode": "DUPLICATE_TAG", "details": str(e)}), 409, CORS_HEADERS
|
||||
except Exception as err:
|
||||
return jsonify({"error": str(err)}), 500, CORS_HEADERS
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
def make_filter_request():
|
||||
paging = UserPage.from_request_queries()
|
||||
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
records = __filter_users(cursor, paging)
|
||||
return jsonify(records)
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
current_app.logger.info("Close DB connection")
|
||||
|
||||
|
||||
def __filter_users(cursor, paging: "UserPage"):
|
||||
conditions = []
|
||||
values = {}
|
||||
|
||||
if paging.filter.ids:
|
||||
conditions.append("id = ANY(%(ids)s)")
|
||||
values["ids"] = paging.filter.ids
|
||||
|
||||
if paging.filter.keyword:
|
||||
conditions.append("(name ILIKE %(q)s OR email ILIKE %(q)s)")
|
||||
values["keyword"] = f"%{paging.filter.keyword.lower()}%"
|
||||
|
||||
if paging.filter.name:
|
||||
conditions.append("LOWER(name) LIKE %(name)s")
|
||||
values["name"] = f"%{paging.filter.name.lower()}%"
|
||||
|
||||
if paging.filter.email:
|
||||
conditions.append("LOWER(email) LIKE %(email)s")
|
||||
values["email"] = f"%{paging.filter.email.lower()}%"
|
||||
|
||||
if paging.filter.created_from:
|
||||
conditions.append("created >= %(created_from)s")
|
||||
values["created_from"] = paging.filter.created_from
|
||||
|
||||
if paging.filter.created_to:
|
||||
conditions.append("created <= %(created_to)s")
|
||||
values["created_to"] = paging.filter.created_to
|
||||
|
||||
if paging.filter.modified_from:
|
||||
conditions.append("modified >= %(modified_from)s")
|
||||
values["modified_from"] = paging.filter.modified_from
|
||||
|
||||
if paging.filter.modified_to:
|
||||
conditions.append("modified <= %(modified_to)s")
|
||||
values["modified_to"] = paging.filter.modified_to
|
||||
|
||||
if paging.filter.dob_from:
|
||||
conditions.append("dob >= %(dob_from)s")
|
||||
values["dob_from"] = paging.filter.dob_from
|
||||
|
||||
if paging.filter.dob_to:
|
||||
conditions.append("dob <= %(dob_to)s")
|
||||
values["dob_to"] = paging.filter.dob_to
|
||||
|
||||
where_clause = " AND ".join(conditions)
|
||||
if where_clause:
|
||||
where_clause = "WHERE " + where_clause
|
||||
|
||||
order_clause = ""
|
||||
if paging.sortby:
|
||||
direction = "ASC" if paging.asc else "DESC"
|
||||
order_clause = f" ORDER BY {paging.sortby.value} {direction} "
|
||||
|
||||
sql = f"""
|
||||
SELECT
|
||||
t.*,
|
||||
(
|
||||
SELECT COUNT(*)
|
||||
FROM ai_user r
|
||||
),
|
||||
count(*) OVER() AS total
|
||||
FROM ai_user t
|
||||
{where_clause}
|
||||
{order_clause}
|
||||
LIMIT %(limit)s OFFSET %(offset)s
|
||||
"""
|
||||
values["limit"] = paging.size
|
||||
values["offset"] = paging.page * paging.size
|
||||
|
||||
cursor.execute(sql, values)
|
||||
rows = cursor.fetchall()
|
||||
return db_rows_to_array(cursor, rows)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Page:
|
||||
page: typing.Optional[int] = None
|
||||
size: typing.Optional[int] = None
|
||||
asc: typing.Optional[bool] = None
|
||||
|
||||
@classmethod
|
||||
def from_request_queries(cls) -> "Page":
|
||||
paging = Page()
|
||||
paging.page = int(request.args.get("page", 0))
|
||||
paging.size = int(request.args.get("size", 8))
|
||||
paging.asc = request.args.get("asc", type=str_to_bool)
|
||||
return paging
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class UserFilter:
|
||||
ids: typing.Optional[typing.List[str]] = None
|
||||
keyword: typing.Optional[str] = None
|
||||
name: typing.Optional[str] = None
|
||||
email: typing.Optional[str] = None
|
||||
gender: typing.Optional[str] = None
|
||||
created_from: typing.Optional[str] = None
|
||||
created_to: typing.Optional[str] = None
|
||||
modified_from: typing.Optional[str] = None
|
||||
modified_to: typing.Optional[str] = None
|
||||
dob_from: typing.Optional[str] = None
|
||||
dob_to: typing.Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_request_queries(cls) -> "UserFilter":
|
||||
filter = UserFilter()
|
||||
filter.ids = request.args.getlist("filter[ids]")
|
||||
filter.keyword = request.args.get("filter[keyword]")
|
||||
filter.name = request.args.get("filter[name]")
|
||||
filter.email = request.args.get("filter[email]")
|
||||
filter.gender = request.args.get("filter[gender]")
|
||||
filter.created_to = request.args.get("filter[created_to]")
|
||||
filter.created_from = request.args.get("filter[created_from]")
|
||||
filter.modified_from = request.args.get("filter[modified_from]")
|
||||
filter.modified_to = request.args.get("filter[modified_to]")
|
||||
filter.dob_from = request.args.get("filter[dob_from]")
|
||||
filter.dob_to = request.args.get("filter[dob_to]")
|
||||
return filter
|
||||
|
||||
|
||||
class UserSortField(str, enum.Enum):
|
||||
CREATED = "created"
|
||||
MODIFIED = "modified"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class UserPage(Page):
|
||||
sortby: typing.Optional[UserSortField] = None
|
||||
filter: typing.Optional[UserFilter] = dataclasses.field(
|
||||
default_factory=UserFilter.from_request_queries
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_request_queries(cls) -> "UserPage":
|
||||
base = super(UserPage, cls).from_request_queries()
|
||||
paging = UserPage(**dataclasses.asdict(base))
|
||||
|
||||
sortby = request.args.get("sortby")
|
||||
if sortby:
|
||||
try:
|
||||
paging.sortby = UserSortField[sortby.upper()]
|
||||
except KeyError:
|
||||
try:
|
||||
paging.sortby = UserSortField(sortby)
|
||||
except ValueError:
|
||||
paging.sortby = None
|
||||
|
||||
return paging
|
||||
106
apps/helpers.py
Normal file
106
apps/helpers.py
Normal file
@@ -0,0 +1,106 @@
|
||||
import datetime
|
||||
import logging
|
||||
import socket
|
||||
|
||||
import psycopg2
|
||||
from flask import current_app
|
||||
from psycopg2.extras import LoggingConnection
|
||||
|
||||
CORS_HEADERS = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
SECRET_NAME = "fission-ai-work-env"
|
||||
CONFIG_NAME = "fission-ai-work-config"
|
||||
K8S_NAMESPACE = "default"
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def init_db_connection():
|
||||
db_host = get_secret("PG_HOST", "127.0.0.1")
|
||||
db_port = int(get_secret("PG_PORT", 5432))
|
||||
|
||||
if not check_port_open(ip=db_host, port=db_port):
|
||||
raise Exception(f"Establishing A Database Connection. `{db_host}:{db_port}`")
|
||||
|
||||
# options = get_secret("PG_DBSCHEMA")
|
||||
# if options:
|
||||
# options = f"-c search_path={options}" # if specific db schema
|
||||
conn = psycopg2.connect(
|
||||
database=get_secret("PG_DB", "postgres"),
|
||||
user=get_secret("PG_USER", "postgres"),
|
||||
password=get_secret("PG_PASS", "secret"),
|
||||
host=get_secret("PG_HOST", "127.0.0.1"),
|
||||
port=int(get_secret("PG_PORT", 5432)),
|
||||
# options=options,
|
||||
# cursor_factory=NamedTupleCursor,
|
||||
connection_factory=LoggingConnection,
|
||||
)
|
||||
conn.initialize(logger)
|
||||
return conn
|
||||
|
||||
|
||||
def db_row_to_dict(cursor, row):
|
||||
record = {}
|
||||
for i, column in enumerate(cursor.description):
|
||||
data = row[i]
|
||||
if isinstance(data, datetime.datetime):
|
||||
data = data.isoformat()
|
||||
record[column.name] = data
|
||||
return record
|
||||
|
||||
|
||||
def db_rows_to_array(cursor, rows):
|
||||
return [db_row_to_dict(cursor, row) for row in rows]
|
||||
|
||||
|
||||
def get_current_namespace() -> str:
|
||||
try:
|
||||
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
|
||||
namespace = f.read()
|
||||
except Exception as err:
|
||||
current_app.logger.error(err)
|
||||
namespace = K8S_NAMESPACE
|
||||
return str(namespace)
|
||||
|
||||
|
||||
def get_secret(key: str, default=None):
|
||||
namespace = get_current_namespace()
|
||||
path = f"/secrets/{namespace}/{SECRET_NAME}/{key}"
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
return f.read()
|
||||
except Exception as err:
|
||||
current_app.logger.error(path, err)
|
||||
return default
|
||||
|
||||
|
||||
def get_config(key: str, default=None):
|
||||
namespace = get_current_namespace()
|
||||
path = f"/configs/{namespace}/{CONFIG_NAME}/{key}"
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
return f.read()
|
||||
except Exception as err:
|
||||
current_app.logger.error(path, err)
|
||||
return default
|
||||
|
||||
|
||||
def str_to_bool(input: str | None) -> bool:
|
||||
input = input or ""
|
||||
# Dictionary to map string values to boolean
|
||||
BOOL_MAP = {"true": True, "false": False}
|
||||
return BOOL_MAP.get(input.strip().lower(), None)
|
||||
|
||||
|
||||
def check_port_open(ip: str, port: int, timeout: int = 30):
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.settimeout(timeout)
|
||||
result = s.connect_ex((ip, port))
|
||||
return result == 0
|
||||
except Exception as err:
|
||||
current_app.logger.err(f"Check port open error: {err}")
|
||||
return False
|
||||
4
apps/requirements.txt
Normal file
4
apps/requirements.txt
Normal file
@@ -0,0 +1,4 @@
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic==2.11.7
|
||||
PyNaCl==1.6.0
|
||||
Flask==3.1.0
|
||||
50
apps/schemas.py
Normal file
50
apps/schemas.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import re
|
||||
from datetime import date
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
|
||||
class AiUserCreate(BaseModel):
|
||||
id: Optional[str] = None
|
||||
name: str = Field(min_length=1, max_length=128)
|
||||
email: str = Field(..., max_length=256)
|
||||
dob: Optional[date] = None
|
||||
gender: Optional[str] = Field(default=None, max_length=10)
|
||||
|
||||
@field_validator("email")
|
||||
def validate_email(cls, v):
|
||||
if not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", v):
|
||||
raise ValueError("invalid email")
|
||||
return v
|
||||
|
||||
|
||||
class AiUserUpdate(BaseModel):
|
||||
name: Optional[str] = Field(default=None, min_length=1, max_length=128)
|
||||
email: Optional[str] = Field(default=None, max_length=256)
|
||||
dob: Optional[date] = None
|
||||
gender: Optional[str] = Field(default=None, max_length=10)
|
||||
|
||||
@field_validator("email")
|
||||
def validate_email(cls, v):
|
||||
if not re.match(r"^[^@\s]+@[^@\s]+\.[^@\s]+$", v):
|
||||
raise ValueError("invalid email")
|
||||
return v
|
||||
|
||||
|
||||
class AiUserFilter(BaseModel):
|
||||
q: Optional[str] = None
|
||||
name: Optional[str] = None
|
||||
email: Optional[str] = None
|
||||
gender: Optional[str] = None
|
||||
|
||||
dob_from: Optional[date] = None
|
||||
dob_to: Optional[date] = None
|
||||
|
||||
created_from: Optional[str] = None
|
||||
created_to: Optional[str] = None
|
||||
|
||||
page: int = Field(default=0, ge=0)
|
||||
size: int = Field(default=20, ge=1, le=200)
|
||||
sortby: str = "modified"
|
||||
asc: bool = False
|
||||
128
apps/update_delete.py
Normal file
128
apps/update_delete.py
Normal file
@@ -0,0 +1,128 @@
|
||||
|
||||
|
||||
from flask import current_app, jsonify, request
|
||||
from helpers import CORS_HEADERS, db_row_to_dict, init_db_connection
|
||||
from psycopg2 import IntegrityError
|
||||
from pydantic_core import ValidationError
|
||||
from schemas import AiUserUpdate
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
```fission
|
||||
{
|
||||
"name": "ai-admin-update-delete-user",
|
||||
"fntimeout": 300,
|
||||
"http_triggers": {
|
||||
"ai-admin-update-delete-user-http": {
|
||||
"url": "/ai/admin/users/{UserID}",
|
||||
"methods": ["DELETE", "PUT"]
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
"""
|
||||
try:
|
||||
if request.method == "DELETE":
|
||||
return make_delete_request()
|
||||
elif request.method == "PUT":
|
||||
return make_update_request()
|
||||
else:
|
||||
return {"error": "Method not allow"}, 405, CORS_HEADERS
|
||||
except Exception as err:
|
||||
print(f"ErrorType={type(err)}")
|
||||
return {"error": str(err)}, 500, CORS_HEADERS
|
||||
|
||||
|
||||
def make_update_request():
|
||||
user_id = request.headers.get("X-Fission-Params-UserID")
|
||||
if not user_id:
|
||||
return jsonify({"errorCode": "MISSING_USER_ID"}), 400, CORS_HEADERS
|
||||
|
||||
try:
|
||||
body = AiUserUpdate(**(request.get_json(silent=True) or {}))
|
||||
except ValidationError as e:
|
||||
return (
|
||||
jsonify({"error": "Validation failed", "details": e.errors()}),
|
||||
400,
|
||||
CORS_HEADERS,
|
||||
)
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * FROM ai_user WHERE id=%s FOR UPDATE", (user_id,))
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
return jsonify({"errorCode": "USER_NOT_FOUND"}), 404, CORS_HEADERS
|
||||
|
||||
|
||||
sets, params = [], {"id": user_id}
|
||||
if body.name is not None:
|
||||
sets.append("name=%(name)s")
|
||||
params["name"] = body.name
|
||||
|
||||
if body.email is not None:
|
||||
sets.append("email=%(email)s")
|
||||
params["email"] = body.email
|
||||
|
||||
if body.dob is not None:
|
||||
sets.append("dob=%(dob)s")
|
||||
params["dob"] = body.dob
|
||||
|
||||
if body.gender is not None:
|
||||
sets.append("gender=%(gender)s")
|
||||
params["gender"] = body.gender
|
||||
|
||||
sets.append("modified=CURRENT_TIMESTAMP")
|
||||
cur.execute(
|
||||
f"UPDATE ai_user SET {', '.join(sets)} WHERE id=%(id)s RETURNING *",
|
||||
params,
|
||||
)
|
||||
updated = db_row_to_dict(cur, cur.fetchone())
|
||||
return jsonify(updated), 200, CORS_HEADERS
|
||||
except IntegrityError as e:
|
||||
return (
|
||||
jsonify({"errorCode": "DUPLICATE_USER", "details": str(e)}),
|
||||
409,
|
||||
CORS_HEADERS,
|
||||
)
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
|
||||
|
||||
|
||||
def __delete_user(cursor, id: str):
|
||||
cursor.execute("SELECT 1 FROM ai_user WHERE id = %(id)s", {"id": id})
|
||||
if not cursor.fetchone():
|
||||
return "USER_NOT_FOUND"
|
||||
|
||||
cursor.execute("DELETE FROM ai_user WHERE id = %(id)s RETURNING *", {"id": id})
|
||||
row = cursor.fetchone()
|
||||
return db_row_to_dict(cursor, row)
|
||||
|
||||
def make_delete_request():
|
||||
|
||||
user_id = request.headers.get("X-Fission-Params-UserID")
|
||||
if not user_id:
|
||||
return jsonify({"errorCode": "MISSING_USER_ID"}), 400, CORS_HEADERS
|
||||
|
||||
conn = None
|
||||
try:
|
||||
conn = init_db_connection()
|
||||
with conn.cursor() as cursor:
|
||||
result = __delete_user(cursor, id=user_id)
|
||||
if result == "USER_NOT_FOUND":
|
||||
return jsonify({"errorCode": "USER_NOT_FOUND"}), 404
|
||||
conn.commit()
|
||||
return jsonify(result), 200
|
||||
except Exception as ex:
|
||||
return jsonify({"error": str(ex)}), 500
|
||||
finally:
|
||||
if conn is not None:
|
||||
conn.close()
|
||||
current_app.logger.info("Close DB connection")
|
||||
142
apps/vault.py
Normal file
142
apps/vault.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import base64
|
||||
|
||||
import nacl.secret
|
||||
|
||||
|
||||
def string_to_hex(text: str) -> str:
|
||||
"""
|
||||
Convert a string to hexadecimal representation.
|
||||
|
||||
Args:
|
||||
text: Input string to convert
|
||||
|
||||
Returns:
|
||||
Hexadecimal string representation
|
||||
"""
|
||||
return text.encode("utf-8").hex()
|
||||
|
||||
|
||||
def hex_to_string(hex_string: str) -> str | None:
|
||||
"""
|
||||
Convert a hexadecimal string back to regular string.
|
||||
|
||||
Args:
|
||||
hex_string: Hexadecimal string to convert
|
||||
|
||||
Returns:
|
||||
Decoded string
|
||||
|
||||
Raises:
|
||||
ValueError: If hex_string is not valid hexadecimal
|
||||
"""
|
||||
return bytes.fromhex(hex_string).decode("utf-8")
|
||||
|
||||
|
||||
def decrypt_vault(vault: str, key: str) -> str:
|
||||
"""
|
||||
Decrypt a vault string encrypted with PyNaCl SecretBox.
|
||||
|
||||
Vault format: "vault:v1:<base64_encrypted_data>"
|
||||
|
||||
Args:
|
||||
vault: Vault-formatted string (e.g., "vault:v1:eW91cl9lbmNyeXB0ZWRfZGF0YQ==")
|
||||
key: Hex string representation of 32-byte encryption key
|
||||
|
||||
Returns:
|
||||
Decrypted string
|
||||
|
||||
Raises:
|
||||
ValueError: If vault format is invalid or key is not valid hex
|
||||
nacl.exceptions.CryptoError: If decryption fails (wrong key or corrupted data)
|
||||
"""
|
||||
# Parse vault format
|
||||
parts = vault.split(":", 2)
|
||||
if len(parts) != 3 or parts[0] != "vault" or parts[1] != "v1":
|
||||
raise ValueError("Invalid vault format. Expected 'vault:v1:<encrypted_data>'")
|
||||
|
||||
encrypted_string = parts[2]
|
||||
# Convert hex string key to bytes
|
||||
key_bytes = bytes.fromhex(key)
|
||||
|
||||
# Create a SecretBox instance with the key
|
||||
box = nacl.secret.SecretBox(key_bytes)
|
||||
|
||||
# Decode the base64-encoded encrypted string
|
||||
encrypted_data = base64.b64decode(encrypted_string)
|
||||
|
||||
# Decrypt the data
|
||||
decrypted_bytes = box.decrypt(encrypted_data)
|
||||
|
||||
# Convert bytes to string
|
||||
return decrypted_bytes.decode("utf-8")
|
||||
|
||||
|
||||
def encrypt_vault(plaintext: str, key: str) -> str:
|
||||
"""
|
||||
Encrypt a string and return it in vault format.
|
||||
|
||||
Args:
|
||||
plaintext: String to encrypt
|
||||
key: Hex string representation of 32-byte encryption key
|
||||
|
||||
Returns:
|
||||
Vault-formatted encrypted string (e.g., "vault:v1:<encrypted_data>")
|
||||
|
||||
Raises:
|
||||
ValueError: If key is not valid hex string
|
||||
"""
|
||||
# Convert hex string key to bytes
|
||||
key_bytes = bytes.fromhex(key)
|
||||
|
||||
# Create a SecretBox instance with the key
|
||||
box = nacl.secret.SecretBox(key_bytes)
|
||||
|
||||
# Encrypt the data
|
||||
encrypted = box.encrypt(plaintext.encode("utf-8"))
|
||||
|
||||
# Encode to base64
|
||||
encrypted_string = base64.b64encode(encrypted).decode("utf-8")
|
||||
|
||||
# Return in vault format
|
||||
return f"vault:v1:{encrypted_string}"
|
||||
|
||||
|
||||
def is_valid_vault_format(vault: str) -> bool:
|
||||
"""
|
||||
Check if a string is in valid vault format.
|
||||
|
||||
Vault format: "vault:v1:<base64_encrypted_data>"
|
||||
|
||||
Args:
|
||||
vault: String to validate
|
||||
|
||||
Returns:
|
||||
True if the string matches vault format structure, False otherwise
|
||||
|
||||
Note:
|
||||
This only checks the format structure, not whether the data can be decrypted
|
||||
"""
|
||||
# Parse vault format
|
||||
parts = vault.split(":", 2)
|
||||
|
||||
# Check basic structure: vault:v1:<data>
|
||||
if len(parts) != 3 or parts[0] != "vault" or parts[1] != "v1":
|
||||
return False
|
||||
|
||||
encrypted_data = parts[2]
|
||||
|
||||
# Check if data part is not empty
|
||||
if not encrypted_data:
|
||||
return False
|
||||
|
||||
# Check if data is valid base64
|
||||
try:
|
||||
decoded = base64.b64decode(encrypted_data)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# Check if decoded data has at least nonce bytes (24 bytes for NaCl)
|
||||
if len(decoded) < nacl.secret.SecretBox.NONCE_SIZE:
|
||||
return False
|
||||
|
||||
return True
|
||||
Reference in New Issue
Block a user