热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式异步任务框架之Celery定义、异步任务框架特点、架构、使用场景、安装配置、基本使用、多任务结构使用、延时任务、定时任务及django中使用celery

文章目录1、定义2、Celery异步任务框架特点3、Celery架构4、使用场景5、Celery的安装配置6、基本使用7、celery多任务结构8、延时任务8.1、方式一8.2、方

文章目录

      • 1、定义
      • 2、Celery异步任务框架特点
      • 3、Celery架构
      • 4、使用场景
      • 5、Celery的安装配置
      • 6、基本使用
      • 7、celery多任务结构
      • 8、延时任务
        • 8.1、方式一
        • 8.2、方式二
      • 9、定时任务
      • 10、django中使用celery()


1、定义

python中的一个分布式异步任务框架
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
(1) 执行异步任务(对立: 同步任务),解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
(2) 执行延时任务(比如5分钟后干一件事): 解决延迟任务
(3) 执行定时任务: 每天隔几分钟干什么事,解决周期(周期)任务,比如每天数据统计Celery 官网: http://www.celeryproject.org/
Celery 官方文档英文版: http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版: http://docs.jinkan.org/docs/celery/

2、Celery异步任务框架特点

(1) 可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket)
(2) celery服务为为其他项目服务提供异步解决任务需求的注: 会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求
人是一个独立运行的服务 | 医院也是一个独立运行的服务
正常情况下,人可以完成所有健康情况的动作,不需要医院的参与,但当人生病时,就会被医院接收,解决人生病问题
人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求

3、Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和任务执行结果存储(task result store)组成
在这里插入图片描述

1、消息中间件
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括RabbitMQ、Redis等2、任务执行单元
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中3、任务结果存储
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

4、使用场景

异步执行: 解决耗时任务,将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等
延迟执行: 解决延迟任务
定时执行: 解决周期(周期)任务,比如每天数据统计关于秒杀系统可以使用celery不能秒超,使用锁机制(mysql悲观锁,乐观锁),redis锁提高并发量 ---> 把同步做成异步 ---> 使用celery前端点击秒杀按钮,向后端发送秒杀请求 ---> 同步操作同步操作请求来到后端,判断数量是否够,如果够要生成订单(mysql),订单状态是待支付状态 请求返回,告诉前端秒杀成功异步操作请求来到后端,提交一个celery任务 ---> celery任务异步的执行判断数量是否够,如果够,要生成订单(mysql)秒杀是否成功的结果还没有,直接返回了(返回任务id)前端启动一个定时任务,每隔5s,向后台发送一个查询请求,查询秒杀任务是否执行完成(带着任务id)如果是未执行状态或者执行中 ---> 返回给前端,前端不处理,定时任务继续执行又隔了5s,发送查询,查询到秒杀成功的结果,返回给前端,秒杀成功

5、Celery的安装配置

pip install celery
消息中间件: RabbitMQ/Redis
app=Celery(‘任务名’, broker=’xxx’, backend=’xxx’)

6、基本使用

1、定义一个py文件(t_celery.py)
import celery
# 消息中间件(redis)
broker='redis://127.0.0.1:6379/1' # 1 表示使用redis 1 这个db
# 结果存储(redis)
backend='redis://127.0.0.1:6379/2' # 2 表示使用redis 2 这个db# 实例化得到对象,指定中间件和结果存储
app=celery.Celery('test',broker=broker,backend=backend)# 定义任务(可以有多个)
@app.task
def add(a,b):return a+b@app.task
def mul(a,b):return a*b2、提交任务(在其它文件中,task.py)
from t_celery import add
res=add.delay(100,4)
print(res) # 任务id号3、启动worker
非windows平台: celery worker -A t_celery -l info
windows需要装eventlet模块: celery worker -A t_celery -l info -P eventlet4、查看执行结果
from t_celery import app
from celery.result import AsyncResult
# 关键字,变量不能定义为关键字
id = '5331c70b-1b51-4a15-aa17-2fa0f7952c00' # 定义任务的id号,第二步中res的值
if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')

app参数

celery配置文件参数
# 有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_CONCURRENCY=4
# 允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

7、celery多任务结构

目录结构:package_celery: # 项目名celery_task # celery包名__init__.py celery.py # celery 的app,必须叫celeryorder_task.py # 任务user_task.py # 任务result.py # 结果查询submit_tast.py # 提交任务

celery.py

import celerybroker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'app = celery.Celery(broker=broker, backend=backend, include=['celery_task.order_task', 'celery_task.user_task' # 一定要记得注册一下
])

order_task.py

from .celery import app@app.task
def cannel_order(name):return '用户{}取消订单'.format(name)

user_task.py

from .celery import app@app.task
def send_msg(phone):return '{}发送短信成功'.format(phone)

result.py

from celery_task.celery import appfrom celery.result import AsyncResult# 关键字,变量不能定义为关键字
# 发短信任务: 39744a3f-02ec-4b8b-b855-111025e4abe4
# 取消订单任务:6d1e5e91-236a-449c-ad32-eac093b240bdid = '6d1e5e91-236a-449c-ad32-eac093b240bd'
if __name__ == '__main__':res = AsyncResult(id=id, app=app)if res.successful():result = res.get()print(result)elif res.failed():print('任务失败')elif res.status == 'PENDING':print('任务等待中被执行')elif res.status == 'RETRY':print('任务异常后正在重试')elif res.status == 'STARTED':print('任务已经开始被执行')

submit_tast.py

from celery_task import order_task, user_taskres = order_task.cannel_order.delay('allen')
print(res) # 返回任务id
res = user_task.send_msg.delay('13666666666')
print(res) # 返回任务id

运行

# 运行worker(在package_celery目录下执行)
celery worker -A celery_task -l info -P eventlet

8、延时任务


8.1、方式一

# 发送短信为例: 2021年1月7日21点58分55秒发送短信
from datetime import datetime
from celery_task import order_task, user_task# eta: 延迟多长时间执行,eta需要传时间对象,并且是utc时间
v1 = datetime(2021, 1, 7, 21, 58, 55)
v2 = datetime.utcfromtimestamp(v1.timestamp()) # 转成utc时间,比正常时间慢8个小时
print(v2)
res = user_task.send_msg.apply_async(args=['13666666666', ], eta=v2)

8.2、方式二

# 发送短信为例: 以当前时间为基准,过10秒后发送短信(隔几秒后执行)
from datetime import datetime, timedeltactime = datetime.now()
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
res = user_task.send_msg.apply_async(args=['13666666666', ], eta=task_time)
print(res)

9、定时任务

# 在celery.py中配置
# 时区
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontabapp.conf.beat_schedule = {'send-msg':{'task': 'celery_task.user_task.send_msg','schedule': timedelta(seconds=5), # 这里是timedelta# 'schedule': timedelta(hours=24*10), # # 这里是timedelta# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点,这里是crontab'schedule': crontab(hour=8, day_of_month=1), # 每月一号早八点,这里是crontab'args': ('13666666666',),}
}# 添加任务: 自动添加任务,所以要启动一个添加任务的服务
# 启动beat,负责每隔3s提交一个任务
celery beat -A celery_task -l info# 启动worker
celery worker -A celery_task -l info -P eventlet

10、django中使用celery()

1、celery是独立的,跟框架没有关系
2、django-celery第三方模块,兼容性不好不采用,使用通用方式
3、目录结构celery_task__init__.pycelery.pyhome_task.pyorder_task.pyuser_task.py

celery框架django项目工作流程
(1) 加载django配置环境
(2) 创建Celery框架对象app,配置broker和backend,得到的app就是worker
(3) 给worker对应的app添加可处理的任务函数,用include配置给worker的app
(4) 完成提供的任务的定时配置app.conf.beat_schedule
(5) 启动celery服务,运行worker,执行任务
(6) 启动beat服务,运行beat,添加任务重点: 由于采用了django的反射机制,使用celery.py所在的celery_task包必须放置项目的根目录下

路由

path('test_celery', views.test_celery),

视图函数

def test_celery(request):from celery_task.celery import appfrom celery_task.user_task import send_msgfrom celery.result import AsyncResultid = request.GET.get('id')if id:res = AsyncResult(id=id, app=app)if res.successful():result = res.get()return HttpResponse(result)id = send_msg.delay('13666666666')print(id)return HttpResponse('发送短信成功')


推荐阅读
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • 31.项目部署
    目录1一些概念1.1项目部署1.2WSGI1.3uWSGI1.4Nginx2安装环境与迁移项目2.1项目内容2.2项目配置2.2.1DEBUG2.2.2STAT ... [详细]
  • Python操作MySQL(pymysql模块)详解及示例代码
    本文介绍了使用Python操作MySQL数据库的方法,详细讲解了pymysql模块的安装和连接MySQL数据库的步骤,并提供了示例代码。内容涵盖了创建表、插入数据、查询数据等操作,帮助读者快速掌握Python操作MySQL的技巧。 ... [详细]
  • Python已成为全球最受欢迎的编程语言之一,然而Python程序的安全运行存在一定的风险。本文介绍了Python程序安全运行需要满足的三个条件,即系统路径上的每个条目都处于安全的位置、"主脚本"所在的目录始终位于系统路径中、若python命令使用-c和-m选项,调用程序的目录也必须是安全的。同时,文章还提出了一些预防措施,如避免将下载文件夹作为当前工作目录、使用pip所在路径而不是直接使用python命令等。对于初学Python的读者来说,这些内容将有所帮助。 ... [详细]
  • Django学习笔记之djangodebugtoolbar使用指南
    介绍django-debug-toolbar是一组可配置的面板,可显示有关当前请求响应的各种调试信息,并在单击时显示有关面板内容的更多详细信息。github地址文档地址安装配置1. ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • 本文介绍了如何通过conda安装Selenium的wheel文件,包括查看环境、卸载旧版本、下载新版本的wheel文件以及安装操作的步骤。同时提供了使用清华源的方法。 ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • EzPP 0.2发布,新增YAML布局渲染功能
    EzPP发布了0.2.1版本,新增了YAML布局渲染功能,可以将YAML文件渲染为图片,并且可以复用YAML作为模版,通过传递不同参数生成不同的图片。这个功能可以用于绘制Logo、封面或其他图片,让用户不需要安装或卸载Photoshop。文章还提供了一个入门例子,介绍了使用ezpp的基本渲染方法,以及如何使用canvas、text类元素、自定义字体等。 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • Python使用Pillow包生成验证码图片的方法
    本文介绍了使用Python中的Pillow包生成验证码图片的方法。通过随机生成数字和符号,并添加干扰象素,生成一幅验证码图片。需要配置好Python环境,并安装Pillow库。代码实现包括导入Pillow包和随机模块,定义随机生成字母、数字和字体颜色的函数。 ... [详细]
  • 本文介绍了在Windows系统下安装Python、setuptools、pip和virtualenv的步骤,以及安装过程中需要注意的事项。详细介绍了Python2.7.4和Python3.3.2的安装路径,以及如何使用easy_install安装setuptools。同时提醒用户在安装完setuptools后,需要继续安装pip,并注意不要将Python的目录添加到系统的环境变量中。最后,还介绍了通过下载ez_setup.py来安装setuptools的方法。 ... [详细]
  • 本文总结了使用不同方式生成 Dataframe 的方法,包括通过CSV文件、Excel文件、python dictionary、List of tuples和List of dictionary。同时介绍了一些注意事项,如使用绝对路径引入文件和安装xlrd包来读取Excel文件。 ... [详细]
  • python3 logging
    python3logginghttps:docs.python.org3.5librarylogging.html,先3.5是因为我当前的python版本是3.5之所 ... [详细]
  • 一:跨域问题1、同源策略(浏览器的安全策略)    只允许当前页面朝当前域下发请求,如果向其他域发请求,请求可以正常发送,数据也可以拿回,但是被浏览器拦截了  2、c ... [详细]
author-avatar
ndo2205188
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有