Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”

This article also participated in the “Digitalstar Project” to win a creative gift package and creative incentive money

Minio_s3 official documentation: docs.minio.org.cn/docs/master…

  • Install related configuration packages
pip install  minio
Copy the code
  • Add the base configuration to the YAMl file (filename:minio.yaml)
Base: POINT_URL: xxx.xxx.xxx. XXX ACCESS_KEY: your own key SECRET_KEY REGION: your own REGION SCHEME: HTTPCopy the code
  • Get the configuration code (only part of the main code is given)
from utils.security.aes import aes_decrypt

_stream = open('minio.yaml', 'r')

class BaseMinioConfig(DBConfig):
    point_url = _data['base']['POINT_URL']
    access_key = aes_decrypt(_data['base']['ACCESS_KEY'])
    secret_key = aes_decrypt(_data['base']['SECRET_KEY'])
    region = _data['base']['REGION']
    scheme = _data['base']['SCHEME']
    
minio_conf = {
    EnvEnum.base.value: MiniobaseConfig,
}
Copy the code
  • Configuring the Settings file
Minio import minio_conf # MinioS3 Configure minio_conf = minio_conf['base'] PUBLIC_BUCKET_NAME = f"demo_base_public_bucket".replace('_', '-') BUCKET_NAME = f"demo_base_bucket" MINIO_ACCESS_KEY_ID = MINIO_CONF.access_key MINIO_SECRET_ACCESS_KEY = MINIO_CONF.secret_key MINIO_ENDPOINT_URL = MINIO_CONF.point_url MINIO_REGION = MINIO_CONF.region MINIO_SCHEME = MINIO_CONF.schemeCopy the code
  • Minio_s3 invokes the relevant code

base.py

Import uuid def make_s3_key_with_part(prefix: STR, suffix: STR, dir: STR = '/') -> STR: "" :param suffix: :param dir: :return: """ if not isinstance(dir, str): raise ValueError('Please enter a valid dir') if dir and not dir.endswith('/'): dir = f'{dir}/' return '{}{}-{}.{}'.format(dir, prefix, uuid.uuid4().hex, suffix) def make_s3_key(file_name: str, dir: STR = '/') -> STR: """ build key(complete) :param file_name: :param dir: :return: """ from utils.basic.file import split_file_path prefix, suffix = split_file_path(file_name) if not all([prefix, suffix]): raise ValueError('Please enter a valid file name') key = make_s3_key_with_part(prefix, suffix, dir) return key def is_false(value): return value == False def is_false_f(value): return value[0] == False def is_none(value): return not value class BaseOssclient(object): def __init__(self, bucket_name='', access_key_id='', secret_access_key='', endpoint_url='', region='' ): self.bucket_name = bucket_name.replace("_", "-") self.access_key_id = access_key_id self.secret_access_key = secret_access_key self.endpoint_url = endpoint_url self.region = region @property def client(self): Raise NotImplementedError() # def upload_file(self, file_path, upload_path, add_suffix=True, **kwargs): """ :param file_path: s3 storage location :param upload_path: file upload location :return: """ raise NotImplementedError() def upload_file_obj(self, file, upload_path, **kwargs): """ :param file_path: Return: """ raise NotImplementedError() def get_signed_url(self, key): """ Signature :param key: :return: """ raise NotImplementedError() def download_file(self, download_path, key): """ Param download_path: Return: """ raise NotImplementedError() def download_file_obj(self, key): """ :return: """ raise NotImplementedError() def delete_file(self, key): """ Param key: :return: "" raise NotImplementedError() def check_key_is_exist(self, key): "" param key: :return: Raise NotImplementedError() def get_pages(self, delimiter='/', prefix=None): :param prefix: :return: """ raise NotImplementedError() def get_object_list(self, delimiter='/', prefix=None): """ Get all files :param delimiter: :param prefix: :return: """ raise NotImplementedError() def download_dir(self, delimiter='/', prefix=None, local='/tmp'): Download the entire folder :param Delimiter: :param prefix: :param local: :return: "" raise NotImplementedError() def creat_bucket(self): """ return NotImplementedError()Copy the code

minio_s3.py

import os from datetime import timedelta from django.conf import settings from minio import Minio from tenacity import stop_after_delay, stop_after_attempt, wait_random, retry_if_result, retry_if_exception_type, Retry from base import make_s3_key,BaseOssclient, is_false, is_FALSE_f # Obtain S3 URL def get_private_URL (key: STR) -> STR: """ :param key: minios3 store name :return: url """ client = MinioS3() return client.get_signed_url(key=key) def get_public_url(key: str) -> str: url = f"{settings.MINIO_SCHEME}://{settings.MINIO_ENDPOINT_URL}/" \ f"{settings.PUBLIC_BUCKET_NAME.replace('_', '-')}/{key}" return url class MinioS3(BaseOssclient): def __init__(self, bucket_name='', access_key_id='', secret_access_key='', endpoint_url='', region='', is_secure: bool = None ): self.bucket_name = bucket_name.replace("_", "-") or settings.BUCKET_NAME.replace("_", "-") self.access_key_id = access_key_id or settings.MINIO_ACCESS_KEY_ID self.secret_access_key = secret_access_key or settings.MINIO_SECRET_ACCESS_KEY self.endpoint_url = endpoint_url or settings.MINIO_ENDPOINT_URL self.region = region or  settings.MINIO_REGION if is_secure is None: self.is_secure = False if settings.MINIO_SCHEME == "http" else True else: self.is_secure = is_secure super().__init__(bucket_name=self.bucket_name, access_key_id=self.access_key_id, secret_access_key=self.secret_access_key, endpoint_url=self.endpoint_url, region=self.region) @property def client(self): client = Minio( self.endpoint_url, access_key=self.access_key_id, secret_key=self.secret_access_key, Secure =self.is_secure, region=self.region) return client def creat_bucket(self): """ Create bucket :return: """ if not self.client.bucket_exists(self.bucket_name): Self.client.make_bucket (self.bucket_name) return True return False # Retry (reraise=True, Stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03). retry=(retry_if_result(is_false) | retry_if_exception_type(Exception))) def upload_file(self, file_path, upload_path, Add_suffix =True, **kwargs): """ :param file_path: local :param upload_path: remote :return: """ if os.path.exists(file_path): self.client.fput_object(bucket_name=self.bucket_name, object_name=make_s3_key(upload_path) if add_suffix else upload_path, file_path=file_path, num_parallel_uploads=5 ) return True else: Return False @ retry (reraise = True, stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03), retry = (retry_if_result (is_false) | retry_if_exception_type (Exception))) def upload_file_obj (self, the file, Upload_path, **kwargs): """ :param upload_path: :param upload_path: :param upload_path: :param upload_path: :param upload_path: """ self.client.put_object(bucket_name=self.bucket_name, object_name=upload_path, data=file, length=file.size, content_type=file.content_type, num_parallel_uploads=5) return True @retry(reraise=True, Stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03). retry=(retry_if_result(is_false_f) | retry_if_exception_type(Exception))) def download_file(self, download_path, key): self.client.fget_object(self.bucket_name, key, download_path) return True, download_path @retry(reraise=True, Stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03). retry=(retry_if_result(is_false_f) | retry_if_exception_type(Exception))) def download_file_obj(self, key): data = self.client.get_object(self.bucket_name, key) from utils.basic.file import content_to_file buf = content_to_file(data.read()) return True, Buf @ retry (reraise = True, stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03). retry=(retry_if_result(is_false) | retry_if_exception_type(Exception))) def delete_file(self, key): Delete file :param key: :return: """ self.client.remove_object(self.bucket_name, key) return True @retry(reraise=True, Stop = (stop_after_delay (10) | stop_after_attempt (5)), wait = wait_random (min = 0.01, Max = 0.03). retry=(retry_if_result(is_false) | retry_if_exception_type(Exception))) def get_signed_url(self, key): if not key: return '' url = self.client.presigned_get_object(self.bucket_name, key, expires=timedelta(hours=24)) return url def public_policy(self, bucket_name): policy = '{"Version":"","Statement":[{"Effect":"Allow","Principal":' \ '{"AWS": ["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":' \ '["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},' \ '"Action": ["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}' % ( bucket_name, bucket_name) return policyCopy the code