热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Celery从入门到放弃

1.定义:Celery是一个异步的任务队列(也叫做分布式任务队列)2.工作结构Celery分为3个部分(1)w

1.定义:

    Celery是一个异步的任务队列(也叫做分布式任务队列)




2.工作结构

    Celery分为3个部分

    (1)worker部分负责任务的处理,即工作进程(我的理解工作进程就是你写的python代码,当然还包括python调用系统工具功能)

    (2)broker部分负责任务消息的分发以及任务结果的存储,这部分任务主要由中间数据存储系统完成,比如消息队列服务器RabbitMQ、redis、

Amazon SQS、MongoDB、IronMQ等或者关系型数据库,使用关系型数据库依赖sqlalchemy或者django的ORM

    (3)Celery主类,进行任务最开始的指派与执行控制,他可以是单独的python脚本,也可以和其他程序结合,应用到django或者flask等web框架里面以及你能想到的任何应用




3.话不多说,用起来

    (1)安装Celery(要安装celery3版本,4版本改动较大没测试)


#pip install celery==3.1.17


    (2)broker部分此处使用安装好的redis服务6380端口的db0作为消息队列,普通redis服务的安装此处不做介绍

  (3)Celery的使用一(单独脚本调用,简单方便)

        #注:不考虑任务的结果存储情况

         <1>/tmp/tasks.py&#xff08;实际脚本中不要写中文注释&#xff09;


#!/usr/bin/env python

# -*- coding&#61;utf-8 -*-

from celery import Celery

from celery import platforms

#用于开启root也可以启动celery服务&#xff0c;默认是不允许root启动celery的

platforms.C_FORCE_ROOT &#61; True

#创建一个celery实例&#xff0c;传递进去的第一个参数tasks必须是本文件的文件名tasks&#xff0c;指定broker为本机redis6380服务

celery &#61; Celery(&#39;tasks&#39;, broker&#61;&#39;redis://localhost:6380/0&#39;)

#使用celery实例的task装饰器装饰add函数&#xff0c;此处的add函数可以当作后期的耗时任务对待

&#64;celery.task

def add(x,y):

    return x &#43; y


       <2>启动celery服务


#cd /tmp

#celery -A tasks worker --loglevel&#61;info


       <3>验证执行任务

       #导入模块,执行add函数,此处使用add.delay(3,4)而不是add(3,4)&#xff0c;因为被celery封装了&#xff0c;要异步执行需要额外使用add.delay(3,4)

       #需要注意&#xff0c;如果把返回值赋值给一个变量&#xff0c;那么原来的应用程序也会被阻塞&#xff0c;需要等待异步任务返回的结果。因此&#xff0c;实际使用中&#xff0c;不需要把结果赋值。


#cd /tmp

#python

>>>from tasks import add

>>>add.delay(3,4)


       #celery服务的窗口会刷出任务的信息&#xff0c;以及是否处理成功&#xff0c;以及结果

       #将来只要在别的程序中引入tasks中的add函数&#xff0c;就是异步的了&#xff0c;是不是有点屌。。。。。

       <4>扩展知识&#xff0c;指定队列名

       传入redis中的指定队列testq怎么玩&#xff1f;&#xff08;其他broker引擎也支持&#xff09;

       启动celery服务的时候添加额外参数-Q &#39;队列名&#39;


#cd /tmp

#celery -A tasks.tasks worker --loglevel&#61;info -Q &#39;testq&#39;


       跑任务的时候指定testq队列名


#cd /tmp

#python

>>>from tasks import add

>>>add.delay(3,4,queue&#61;&#39;testq&#39;)


       <5>扩展知识&#xff0c;指定开启的worker进程数(底层是调用的Python的multiprocessing模块中的Pool进程池思想来做)

       -c 5 开启5个worker进程来同时抢任务&#xff0c;跑任务


#cd /tmp

#celery -A tasks.tasks worker --loglevel&#61;info -c 5


       <6>扩展知识&#xff0c;管理broker里面的数据&#xff0c;查看任务状态&#xff0c;以及任务的详细信息

       安装一个叫flower的webui&#xff0c;提供任务查询&#xff0c;worker生命管理&#xff0c;以及路由管理等(底层是通过tornado框架封装的)


#pip install flower

#任意目录下执行都可以

#celery flower --port&#61;5555 --broker&#61;redis://localhost:6380/0


       #里面可以看到任务参数&#xff0c;结果&#xff0c;接受任务时间&#xff0c;任务开始时间&#xff0c;任务状态&#xff0c;Started是任务进行中&#xff0c;Success是任务跑完执行成功

    &#xff08;4&#xff09;Celery的使用二&#xff08;项目方式&#xff0c;也叫做Python包方式&#xff0c;结构清晰&#xff0c;低耦合&#xff1b;相比纯脚本方式略复杂&#xff0c;用不用由你&#xff09;

    创建一个叫做proj的Python包&#xff08;创建Python包的操作此处不做详细说明,tree /tmp/proj&#xff09;

 

       <1>proj/celery.py

      #from __future__ import absolute_import据说添加此行可以低降低出错的概率哦&#xff08;阿门保佑&#xff1b;其实就是兼容Python版本的一个东东&#xff09;

      #创建一个celery的实例&#xff0c;名字叫做app&#xff0c;传递进去的第一个参数是Python包的名字&#xff0c;include加载任务文件&#xff0c;config_from_object指定celery的配置文件(好吧&#xff0c;看起来比单纯使用脚本方式麻烦点&#xff0c;请继续往下看)


#!/usr/bin/env python

# -*- coding&#61;utf-8 -*-

from __future__ import absolute_import

from celery import Celery

app &#61; Celery(&#39;proj&#39;, include&#61;[&#39;proj.tasks&#39;])

app.config_from_object(&#39;proj.config&#39;)

if __name__ &#61;&#61; &#39;__main__&#39;:

    app.start()


       <2>proj/config.py

       #配置文件里指定broker


#!/usr/bin/env python

# -*- coding:utf-8 -*-

from __future__ import absolute_import

BROKER_URL &#61; &#39;redis://127.0.0.1:6380/0&#39;


       <3>proj/tasks.py

       #导入celery实例&#xff0c;实例绑定任务


#!/usr/bin/env python

# -*- coding:utf-8 -*-

from __future__ import absolute_import

from proj.celery import app

&#64;app.task

def add(x, y):

    return x &#43; y


       <4>开启celery服务(特定目录指定包名字启动)


#cd /tmp/

#celery -A proj worker -l info


       <5>扩展功能&#xff0c;指定队列名&#xff0c;调整worker进程数&#xff0c;页面管理celery同上&#xff0c;不再做说明

 &#xff08;5&#xff09;Celery的使用三&#xff08;django-celery模式;#反正我喜欢用这种&#xff09;

       django调用celery跑异步任务&#xff0c;常见场景有注册成功&#xff0c;发送邮件可以异步来防止网络IO阻塞&#xff0c;以及耗时间的任务&#xff0c;例如需要去跑9000台IP的某些配置参数任务&#xff0c;或者下发任务执行&#xff0c;可能需要10几分钟才能跑完&#xff0c;就可以WEB应用中使用这种异步方式

       <1>安装django-celery软件包

       #一定要注意celery的版本和django-celery的小版本要保持一致&#xff0c;否则会有各种杂七杂八的小问题&#xff08;都是泪.......&#xff09;


#pip install celery&#61;&#61;3.1.17

#pip install django-celery&#61;&#61;3.1.17


       <2>创建celery必须的数据库表结构


#cd Python_20161203

#python manage.py migrate


       <3>django项目的settings.py文件中追加如下内容&#xff1b;app呢是django项目里面的应用名字

       settings.py


import djcelery

djcelery.setup_loader()

BROKER_URL &#61; &#39;redis://127.0.0.1:6380/0&#39;

CELERY_RESULT_BACKEND &#61; &#39;redis://127.0.0.1:6380/1&#39;

CELERY_TASK_SERIALIZER &#61; &#39;json&#39;

CELERY_RESULT_SERIALIZER &#61; &#39;json&#39;

CELERY_ACCEPT_CONTENT &#61; [&#39;json&#39;]

CELERY_IMPORTS &#61; (&#39;app.tasks&#39;, )

CELERYBEAT_SCHEDULER &#61; &#39;djcelery.schedulers.DatabaseScheduler&#39;

CELERYD_CONCURRENCY &#61; 20


参数说明(可以根据自己的需求添加自己的参数)&#xff1a;

CELERY_RESULT_BACKEND &#61; "redis://127.0.0.1:6380/1&#39;" #结果存储

CELERY_TASK_RESULT_EXPIRES &#61; 1200 # celery任务执行结果的超时时间&#xff0c;我的任务都不需要返回结果,只需要正确执行就行

CELERYD_CONCURRENCY &#61; 20 # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以

CELERYD_PREFETCH_MULTIPLIER &#61; 4 # celery worker 每次去redis取任务的数量&#xff0c;我这里预取了4个慢慢执行,因为任务有长有短没有预取太多

CELERYD_MAX_TASKS_PER_CHILD &#61; 200 # 每个worker执行了多少任务就会死掉&#xff0c;我建议数量可以大一些&#xff0c;比如200

CELERYBEAT_SCHEDULER &#61; &#39;djcelery.schedulers.DatabaseScheduler&#39; # 这是使用了django-celery默认的数据库调度模型,任务执行周期都被存在你指定的orm数据库中

 

       <4>app/tasks.py&#xff08;在django的app应用目录下创建tasks.py任务文件&#xff0c;里面调用复杂的任务函数&#xff09;


#!/usr/bin/env python

# -*- coding&#61;utf-8 -*-

###############################

from __future__ import absolute_import

from celery import task

import time

#task装饰器封装了celery函数&#xff0c;为耗时的操作

&#64;task

def add(x,y):

    for i in range(30):

       print i

       time.sleep(1)

    return x &#43; y


       <5>添加验证功能&#xff0c;查看实际效果

       app/urls.py


urlpatterns &#61; [

url(r&#39;^celery_test/,views.celery_test),

]


       app/views.py


def celery_test(request):

    from tasks import add

    add.delay(4,8)

    return HttpResponse(&#39;Celery testing666&#39;)


       <6>开启djanog服务和celery服务&#xff08;虽然耦合了&#xff0c;但是还是需要额外开启&#xff09;


#python manage.py runserver 0.0.0.0:8000

#另一个窗口开启celery服务

#python manage.py celery worker --loglevel&#61;info

 


       <7>发送http的GET请求&#xff0c;调用celery去执行异步任务&#xff08;大功告成&#xff09;


curl http://127.0.0.1:8000/index/celery_test/


        celery那端的屏幕输出如下&#xff1a;

[2016-12-01 16:16:00,940: INFO/MainProcess] Received task: app.tasks.add[06a8d603-a7d3-4732-b8f3-ad010d531200]

[2016-12-01 16:16:00,941: WARNING/Worker-1] 0

[2016-12-01 16:16:01,943: WARNING/Worker-1] 1

[2016-12-01 16:16:02,945: WARNING/Worker-1] 2

[2016-12-01 16:16:03,947: WARNING/Worker-1] 3

[2016-12-01 16:16:04,948: WARNING/Worker-1] 4

[2016-12-01 16:16:05,950: WARNING/Worker-1] 5

[2016-12-01 16:16:06,952: WARNING/Worker-1] 6

[2016-12-01 16:16:07,954: WARNING/Worker-1] 7

[2016-12-01 16:16:08,955: WARNING/Worker-1] 8

[2016-12-01 16:16:09,957: WARNING/Worker-1] 9

[2016-12-01 16:16:10,958: WARNING/Worker-1] 10

[2016-12-01 16:16:11,959: WARNING/Worker-1] 11

[2016-12-01 16:16:12,961: WARNING/Worker-1] 12

[2016-12-01 16:16:13,962: WARNING/Worker-1] 13

[2016-12-01 16:16:14,964: WARNING/Worker-1] 14

[2016-12-01 16:16:15,964: WARNING/Worker-1] 15

[2016-12-01 16:16:16,966: WARNING/Worker-1] 16

[2016-12-01 16:16:17,968: WARNING/Worker-1] 17

[2016-12-01 16:16:18,969: WARNING/Worker-1] 18

[2016-12-01 16:16:19,971: WARNING/Worker-1] 19

[2016-12-01 16:16:20,973: WARNING/Worker-1] 20

[2016-12-01 16:16:21,974: WARNING/Worker-1] 21

[2016-12-01 16:16:22,976: WARNING/Worker-1] 22

[2016-12-01 16:16:23,978: WARNING/Worker-1] 23

[2016-12-01 16:16:24,979: WARNING/Worker-1] 24

[2016-12-01 16:16:25,981: WARNING/Worker-1] 25

[2016-12-01 16:16:26,982: WARNING/Worker-1] 26

[2016-12-01 16:16:27,984: WARNING/Worker-1] 27

[2016-12-01 16:16:28,986: WARNING/Worker-1] 28

[2016-12-01 16:16:29,987: WARNING/Worker-1] 29

[2016-12-01 16:16:30,990: INFO/MainProcess] Task app.tasks.add[06a8d603-a7d3-4732-b8f3-ad010d531200] succeeded in 30.049149203s: 12

    &#xff08;6&#xff09;Celery的使用四&#xff08;celery模式;#反正我不喜欢用&#xff0c;那索性不写了&#xff0c;想学习的点击下面的链接去自助学习吧&#xff0c;加油少年&#xff09;

不安装django-celery版本使用方法



 


推荐阅读
  • 31.项目部署
    目录1一些概念1.1项目部署1.2WSGI1.3uWSGI1.4Nginx2安装环境与迁移项目2.1项目内容2.2项目配置2.2.1DEBUG2.2.2STAT ... [详细]
  • Python操作MySQL(pymysql模块)详解及示例代码
    本文介绍了使用Python操作MySQL数据库的方法,详细讲解了pymysql模块的安装和连接MySQL数据库的步骤,并提供了示例代码。内容涵盖了创建表、插入数据、查询数据等操作,帮助读者快速掌握Python操作MySQL的技巧。 ... [详细]
  • python中安装并使用redis相关的知识
    本文介绍了在python中安装并使用redis的相关知识,包括redis的数据缓存系统和支持的数据类型,以及在pycharm中安装redis模块和常用的字符串操作。 ... [详细]
  • 1.淘宝模拟登录2.天猫商品数据爬虫3.爬取淘宝我已购买的宝贝数据4.每天不同时间段通过微信发消息提醒女友5.爬取5K分辨率超清唯美壁纸6.爬取豆瓣排行榜电影数据(含GUI界面版) ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • EzPP 0.2发布,新增YAML布局渲染功能
    EzPP发布了0.2.1版本,新增了YAML布局渲染功能,可以将YAML文件渲染为图片,并且可以复用YAML作为模版,通过传递不同参数生成不同的图片。这个功能可以用于绘制Logo、封面或其他图片,让用户不需要安装或卸载Photoshop。文章还提供了一个入门例子,介绍了使用ezpp的基本渲染方法,以及如何使用canvas、text类元素、自定义字体等。 ... [详细]
  • 开源Keras Faster RCNN模型介绍及代码结构解析
    本文介绍了开源Keras Faster RCNN模型的环境需求和代码结构,包括FasterRCNN源码解析、RPN与classifier定义、data_generators.py文件的功能以及损失计算。同时提供了该模型的开源地址和安装所需的库。 ... [详细]
  • Python使用Pillow包生成验证码图片的方法
    本文介绍了使用Python中的Pillow包生成验证码图片的方法。通过随机生成数字和符号,并添加干扰象素,生成一幅验证码图片。需要配置好Python环境,并安装Pillow库。代码实现包括导入Pillow包和随机模块,定义随机生成字母、数字和字体颜色的函数。 ... [详细]
  • Python已成为全球最受欢迎的编程语言之一,然而Python程序的安全运行存在一定的风险。本文介绍了Python程序安全运行需要满足的三个条件,即系统路径上的每个条目都处于安全的位置、"主脚本"所在的目录始终位于系统路径中、若python命令使用-c和-m选项,调用程序的目录也必须是安全的。同时,文章还提出了一些预防措施,如避免将下载文件夹作为当前工作目录、使用pip所在路径而不是直接使用python命令等。对于初学Python的读者来说,这些内容将有所帮助。 ... [详细]
  • 本文介绍了协程的概念和意义,以及使用greenlet、yield、asyncio、async/await等技术实现协程编程的方法。同时还介绍了事件循环的作用和使用方法,以及如何使用await关键字和Task对象来实现异步编程。最后还提供了一些快速上手的示例代码。 ... [详细]
  • 在本教程中,我们将看到如何使用FLASK制作第一个用于机器学习模型的RESTAPI。我们将从创建机器学习模型开始。然后,我们将看到使用Flask创建AP ... [详细]
  • Django学习笔记之djangodebugtoolbar使用指南
    介绍django-debug-toolbar是一组可配置的面板,可显示有关当前请求响应的各种调试信息,并在单击时显示有关面板内容的更多详细信息。github地址文档地址安装配置1. ... [详细]
  • 都说Python处理速度慢,为何月活7亿的 Instagram依然在使用Python?
    点击“Python编程与实战”,选择“置顶公众号”第一时间获取Python技术干货!来自|简书作者|我爱学python链接|https:www.jian ... [详细]
  • 【云计算】Dockerfile、镜像、容器快速入门 ... [详细]
author-avatar
波Z-
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有