celery多任务结构
上一篇博客我们简单的介绍了celery的简单使用,但是为了方便管理,我们一般不采用单文件的形式去使用celery,而是采用包的形式去管理clery任务
1 2 3 4 5 6 7 8 9
| celery多任务结构
project ├── celery_task │ ├── __init__.py │ ├── celery.py │ └── tasks.py ├── add_task.py └── get_result.py
|
celery.py
1 2 3 4 5 6 7
| from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
ps:include参数的内容写的是celery任务的文件位置,可以填写多个
|
tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from .celery import app import time @app.task def add(n, m): print(n) print(m) time.sleep(10) print('n+m的结果:%s' % (n + m)) return n + m
@app.task def low(n, m): print(n) print(m) print('n-m的结果:%s' % (n - m)) return n - m
|
add_task.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from celery_task import tasks
t1 = tasks.add.delay(10, 20) t2 = tasks.low.delay(100, 50) print(t1.id)
from datetime import datetime, timedelta eta=datetime.utcnow() + timedelta(seconds=10) tasks.low.apply_async(args=(200, 50), eta=eta)
|
get_result.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5' if __name__ == '__main__': async = AsyncResult(id=id, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('任务失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
|