Some checks failed
K8S Fission Deployment / Deployment fission functions (push) Failing after 21s
36 lines
927 B
Python
36 lines
927 B
Python
import logging
|
|
import boto3
|
|
SECRET_NAME = "fission-ailbl-user-avatar-env"
|
|
K8S_NAMESPACE = "default"
|
|
|
|
|
|
def get_current_namespace() -> str:
|
|
try:
|
|
with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f:
|
|
namespace = f.read()
|
|
except:
|
|
namespace = K8S_NAMESPACE
|
|
return str(namespace)
|
|
|
|
|
|
def get_secret(key: str, default=None) -> str:
|
|
namespace = get_current_namespace()
|
|
path = f"/secrets/{namespace}/{SECRET_NAME}/{key}"
|
|
try:
|
|
with open(path, "r") as f:
|
|
return f.read()
|
|
except:
|
|
return default
|
|
|
|
|
|
S3_BUCKET = get_secret("S3_BUCKET")
|
|
S3_PREFIX = get_secret("S3_PREFIX")
|
|
|
|
s3_client = boto3.client(
|
|
"s3",
|
|
endpoint_url=get_secret("S3_ENDPOINT_URL"),
|
|
aws_access_key_id=get_secret("S3_ACCESS_KEY_ID"),
|
|
aws_secret_access_key=get_secret("S3_SECRET_ACCESS_KEY"),
|
|
config=boto3.session.Config(signature_version="s3v4"),
|
|
)
|