1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
import oss2 import json from PIL import Image from celery_module.utils.logger import get_logger from celery_module.config.config import *
logger = get_logger('ali_oss_logger')
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), upload_endpoint, bucket_name)
for param in (access_key_id, access_key_secret, bucket_name, upload_endpoint): assert '<' not in param, '请设置参数:' + param
def get_local_image_info(image_file): """获取本地图片信息 :param str image_file: 本地图片 :return tuple: a 3-tuple(height, width, format). """ im = Image.open(image_file) return im.height, im.width, im.format
def upload_img(ali_oss_path, path, evidence_id, title, *args, **kwargs): """
:param ali_oss_path: 上传到OSS的文件路径 eg:original/example.png :param path: 本地图片路径 eg:example.png """ result = bucket.put_object_from_file(ali_oss_path, path, *args, **kwargs) record_log("图片上传", result) return {"code": result.status, "ali_oss_path": ali_oss_path, "path": path, "evidence_id": evidence_id, 'title': title}
def get_img_info(ali_oss_path, *args, **kwargs): """ :param ali_oss_path: 阿里oss的图片路径 eg:original/example.png :param args: :param kwargs: :return: 返回json加载过后的图片信息 example:{'FileSize': {'value': '346013'}, 'Format': {'value': 'png'}, 'ImageHeight': {'value': '4000'}, 'ImageWidth': {'value': '1920'}} """
result = bucket.get_object(ali_oss_path, *args, process='image/info', **kwargs) json_content = result.read() decoded_json = json.loads(oss2.to_unicode(json_content)) return decoded_json
def delete_img(ali_oss_path): result = bucket.delete_object(ali_oss_path) record_log("图片删除", result)
def record_log(msg, result): if result.status == 200: logger.info(f"{msg}成功") else: logger.warning(f"{msg}失败") logger.warning(f"状态码:{result.status}-----------请求ID:{result.request_id}")
if __name__ == '__main__': a = get_img_info('original/syb222.png') print(a)
|