阿里云oss

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import oss2
import json
from PIL import Image
from celery_module.utils.logger import get_logger
from celery_module.config.config import *

# 获取logger对象
logger = get_logger('ali_oss_logger')
# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), upload_endpoint, bucket_name)

# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, upload_endpoint):
assert '<' not in param, '请设置参数:' + param


def get_local_image_info(image_file):
"""获取本地图片信息
:param str image_file: 本地图片
:return tuple: a 3-tuple(height, width, format).
"""
im = Image.open(image_file)
return im.height, im.width, im.format


def upload_img(ali_oss_path, path, evidence_id, title, *args, **kwargs):
"""

:param ali_oss_path: 上传到OSS的文件路径 eg:original/example.png
:param path: 本地图片路径 eg:example.png
"""
# 上传图片
result = bucket.put_object_from_file(ali_oss_path, path, *args, **kwargs)
record_log("图片上传", result)
return {"code": result.status, "ali_oss_path": ali_oss_path, "path": path, "evidence_id": evidence_id,
'title': title}


def get_img_info(ali_oss_path, *args, **kwargs):
"""
:param ali_oss_path: 阿里oss的图片路径 eg:original/example.png
:param args:
:param kwargs:
:return: 返回json加载过后的图片信息
example:{'FileSize': {'value': '346013'}, 'Format': {'value': 'png'}, 'ImageHeight': {'value': '4000'}, 'ImageWidth': {'value': '1920'}}
"""

result = bucket.get_object(ali_oss_path, *args, process='image/info', **kwargs)
json_content = result.read()
decoded_json = json.loads(oss2.to_unicode(json_content))
return decoded_json


def delete_img(ali_oss_path):
result = bucket.delete_object(ali_oss_path)
record_log("图片删除", result)


def record_log(msg, result):
if result.status == 200:
logger.info(f"{msg}成功")
else:
logger.warning(f"{msg}失败")
logger.warning(f"状态码:{result.status}-----------请求ID:{result.request_id}")


if __name__ == '__main__':
a = get_img_info('original/syb222.png')
print(a)