import logging import boto3 SECRET_NAME = "fission-ailbl-user-avatar-env" K8S_NAMESPACE = "default" def get_current_namespace() -> str: try: with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: namespace = f.read() except: namespace = K8S_NAMESPACE return str(namespace) def get_secret(key: str, default=None) -> str: namespace = get_current_namespace() path = f"/secrets/{namespace}/{SECRET_NAME}/{key}" try: with open(path, "r") as f: return f.read() except: return default S3_BUCKET = get_secret("S3_BUCKET") S3_PREFIX = get_secret("S3_PREFIX") s3_client = boto3.client( "s3", endpoint_url=get_secret("S3_ENDPOINT_URL"), aws_access_key_id=get_secret("S3_ACCESS_KEY_ID"), aws_secret_access_key=get_secret("S3_SECRET_ACCESS_KEY"), config=boto3.session.Config(signature_version="s3v4"), )