文章目录
- 1、定义
- 2、Celery异步任务框架特点
- 3、Celery架构
- 4、使用场景
- 5、Celery的安装配置
- 6、基本使用
- 7、celery多任务结构
- 8、延时任务
- 9、定时任务
- 10、django中使用celery()
1、定义
python中的一个分布式异步任务框架
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
(1) 执行异步任务(对立: 同步任务),解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
(2) 执行延时任务(比如5分钟后干一件事): 解决延迟任务
(3) 执行定时任务: 每天隔几分钟干什么事,解决周期(周期)任务,比如每天数据统计Celery 官网: http://www.celeryproject.org/
Celery 官方文档英文版: http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版: http://docs.jinkan.org/docs/celery/
2、Celery异步任务框架特点
(1) 可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
(2) celery服务为为其他项目服务提供异步解决任务需求的注: 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与,但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求
3、Celery架构
Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和任务执行结果存储(task result store)组成
1、消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括RabbitMQ、Redis等2、任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中3、任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
4、使用场景
异步执行: 解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
延迟执行: 解决延迟任务
定时执行: 解决周期(周期)任务,比如每天数据统计关于秒杀系统可以使用celery不能秒超,使用锁机制(mysql悲观锁,乐观锁),redis锁提高并发量 ---> 把同步做成异步 ---> 使用celery前端点击秒杀按钮,向后端发送秒杀请求 ---> 同步操作同步操作请求来到后端,判断数量是否够,如果够要生成订单(mysql),订单状态是待支付状态 请求返回,告诉前端秒杀成功异步操作请求来到后端,提交一个celery任务 ---> celery任务异步的执行判断数量是否够,如果够,要生成订单(mysql)秒杀是否成功的结果还没有,直接返回了(返回任务id)前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完成(带着任务id查)如果是未执行状态或者执行中 ---> 返回给前端,前端不处理,定时任务继续执行又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功
5、Celery的安装配置
pip install celery
消息中间件: RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)
6、基本使用
1、定义一个py文件(t_celery.py)
import celery
broker='redis://127.0.0.1:6379/1'
backend='redis://127.0.0.1:6379/2'
app=celery.Celery('test',broker=broker,backend=backend)
@app.task
def add(a,b):return a+b@app.task
def mul(a,b):return a*b2、提交任务(在其它文件中,task.py)
from t_celery import add
res=add.delay(100,4)
print(res) 3、启动worker
非windows平台: celery worker -A t_celery -l info
windows需要装eventlet模块: celery worker -A t_celery -l info -P eventlet4、查看执行结果
from t_celery import app
from celery.result import AsyncResult
id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00'
if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')
app参数
celery配置文件参数
CELERYD_FORCE_EXECV=True
CELERYD_CONCURRENCY=4
CELERY_ACKS_LATE=True
CELERYD_MAX_TASKS_PER_CHILD=100
CELERYD_TASK_TIME_LIMIT=12*30
7、celery多任务结构
目录结构:package_celery: celery_task __init__.py celery.py order_task.py user_task.py result.py submit_tast.py
celery.py
import celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'app = celery.Celery(broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task'
])
order_task.py
from .celery import app@app.task
def cannel_order(name):return '用户{}取消订单'.format(name)
user_task.py
from .celery import app@app.task
def send_msg(phone):return '{}发送短信成功'.format(phone)
result.py
from celery_task.celery import appfrom celery.result import AsyncResult
id = '6d1e5e91-236a-449c-ad32-eac093b240bd'
if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')
submit_tast.py
from celery_task import order_task, user_taskres = order_task.cannel_order.delay('allen')
print(res)
res = user_task.send_msg.delay('13666666666')
print(res)
运行
celery worker -A celery_task -l info -P eventlet
8、延时任务
8.1、方式一
from datetime import datetime
from celery_task import order_task, user_task
v1 = datetime(2021, 1, 7, 21, 58, 55)
v2 = datetime.utcfromtimestamp(v1.timestamp())
print(v2)
res = user_task.send_msg.apply_async(args=['13666666666', ], eta=v2)
8.2、方式二
from datetime import datetime, timedeltactime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
res = user_task.send_msg.apply_async(args=['13666666666', ], eta=task_time)
print(res)
9、定时任务
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {'send-msg':{'task': 'celery_task.user_task.send_msg','schedule': timedelta(seconds=5), 'schedule': crontab(hour=8, day_of_month=1), 'args': ('13666666666',),}
}
celery beat -A celery_task -l info
celery worker -A celery_task -l info -P eventlet
10、django中使用celery()
1、celery是独立的,跟框架没有关系
2、django-celery第三方模块,兼容性不好不采用,使用通用方式
3、目录结构celery_task__init__.pycelery.pyhome_task.pyorder_task.pyuser_task.py
celery框架django项目工作流程
(1) 加载django配置环境
(2) 创建Celery框架对象app,配置broker和backend,得到的app就是worker
(3) 给worker对应的app添加可处理的任务函数,用include配置给worker的app
(4) 完成提供的任务的定时配置app.conf.beat_schedule
(5) 启动celery服务,运行worker,执行任务
(6) 启动beat服务,运行beat,添加任务重点: 由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下
路由
path('test_celery', views.test_celery),
视图函数
def test_celery(request):from celery_task.celery import appfrom celery_task.user_task import send_msgfrom celery.result import AsyncResultid = request.GET.get('id')if id:res = AsyncResult(id=id, app=app)if res.successful():result = res.get()return HttpResponse(result)id = send_msg.delay('13666666666')print(id)return HttpResponse('发送短信成功')