热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

celery(芹菜)异步任务定时任务周期任务

什么是celeryCelery是一个简单、灵活且可靠的,处理大量消息的分布式系统专注于实时处理的异步任务队列同时也支持任务调度celery架构celer

什么是celery

  Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

  专注于实时处理的异步任务队列

  同时也支持任务调度

 

celery架构

 

celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

 

消息中间件

  Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

 

任务执行单元

  Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

 

任务结果存储

  Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

 

celery能做什么  

  异步任务

  定时任务

  周期任务

 

使用场景  

  耗时操作(所有的耗时操作都可以)将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

  定时任务(每天定时推送微信服务号消息,)定时执行某件事情,比如每天数据统计

 

使用

pip install celery

 

 

celery执行异步任务:

项目结构

celery的简单实例

celery_task_s1.py代码

from celery import Celery
#不加密码
broker = 'redis://127.0.0.1:6379/0' #指定broker
backend = 'redis://127.0.0.1:6379/1'    #指定存储结果

# 加密码
# broker = 'redis://:123456@127.0.0.1:6379/2'
# backend = 'redis://:123456@127.0.0.1:6379/1'

# 一定要指定一个名字
app = Celery('test',broker=broker,backend=backend)

#任务其实就是个函数
# 需要用一个装饰器装饰,表示该任务是被celery管理的,并且可以用celery执行的
@app.task
def add(x,y) -> int:
    import time
    time.sleep(2)
    return x+y
View Code

 

 add_task.py

#用于提交任务的py文件

import celery_task_s1
#正常同步执行任务 会睡两秒在执行
# ret=celery_task_s1.add(3,4)
# print(ret)

#提交任务到消息队列中
#只是把任务提交到消息队列中,并没有执行
# ret=celery_task_s1.add.delay(3,4)
# print(ret)
#a5ea035f-0cc3-44ba-b334-f5d7c7ce681d  :任务的id号


#任务提交完成后,需要启动worker,可以用命令启动:
# celery worker -A celery_task_s1 -l info
#windows上:celery worker -A celery_task_s1 -l info -P eventlet  win上首次使用worker需要安装eventlet模块
View Code

 

 

 celery_result.py

from celery_task_s1 import app
from celery.result import AsyncResult


async  = AsyncResult(id='30314784-04bd-4ec5-acab-5cb22181cfcb',app=app)

if async.successful():
    #取出它return的值
    result = async .get()
    print(result)
    # result.forget()   将结果删除
elif async.failed():
    print('执行失败')
elif async.status =="PENDING":
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')
View Code

 

 

 写一个py文件:celery_task

  1. 指定brokey(消息中间件),指定backend(结果存储)
  2. 实例化产生一个Celery对象,app=Celery('起一个名字',broker,backend)
  3. 加装饰器绑定任务,在函数(add)上加装饰器app.task
  4. 其他程序提交任务,先导入add,  add.delay(参数,参数),会将该函数提交到消息中间件,但是并不会执行,有个返回值,直接print会打印出任务的id,以后用id去查询任务是否执行完成
  5. 启动worker去执行任务(worker可以先启动):
    1. celery worker -A celery_task_s1 -l info  (-l是艾欧)
    2. Windows下:celery worker -A celery_task_s1 -l info -P eventlet  (需要安装eventlet模块)
    3. 还有一种作为自执行文件启动,可以写在往里面放值的文件下面这样运行放值的模块的时候worker也启动了 但是win上好像不支持
      1. from celery_app_task import cel
        if __name__ == '__main__':
            cel.worker_main()
            # cel.worker_main(argv=['--loglevel=info')
  6. 查看结果:根据id去查询
    async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=app)
        if async.successful():
        #取出它return的值
            result = async.get()
            print(result)

 

 

 

 

多任务结构

pro_cel
    ├── celery_task# celery相关文件夹
    │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
    │   └── order_task.py    #  所有任务函数
    │   └── user_task.py    #  所有任务函数
    ├── celery_result.py # 检查结果
    └── add_task.py    # 触发任务

 

celery.py文件

#这个文件名必须叫celery,生成celery对象

from celery import Celery
from datetime import timedelta
from celery.schedules import  crontab
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('test',broker=broker,backend=backend,
             #包含以下链各个任务文件,去响应的py文件中找任务,对多个任务做分类
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task',
                    ]
             )

#时区
# app.conf.timezOne= 'Asia/Shanghai'
#是否使用UTC
# app.conf.enable_utc = False

 

 

order_task.py

from celery_task.celery import app
@app.task
def order_add(x,y):
    import time
    time.sleep(1)
    return x+y

 

user_task.py

from celery_task.celery import app
@app.task
def user_add(x,y):
    import time
    time.sleep(1)
    return x+y

 

add_task.py

from celery_task.order_task import order_add
from celery_task.user_task import user_add

# ret=order_add.delay(5,6)
ret=user_add.delay(10,60)
print(ret)

 

celery_result.py

from celery.result import  AsyncResult
from celery_task.celery import  app
from add_task import ret

async  = AsyncResult(id='c951c4f7-dcfc-46fc-b60e-ee67b61211ec',app=app)
if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除,执行完成,结果不会自动删除
    # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
    # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
elif async.failed():
    print('执行失败')
elif async.status == 'PENDING':
    print('任务等待中被执行')
elif async.status == 'RETRY':
    print('任务异常后正在重试')
elif async.status == 'STARTED':
    print('任务已经开始被执行')
View Code

 

 

 

启动worker:  celery_task是包的名字(文件夹名字)

celery worker -A celery_task -l info -P eventlet    #放值的时候手动启动add_task文件

 

 

 

celery执行定时任务

设定时间让celery执行一个任务

add_task.py 就是上面的执行异步任务的那个文件(往里面放值的时候使用定时方式)

#用于提交任务的py文件
import celery_task_s1
#执行定时任务:3s钟以后执行add任务
from datetime import datetime
# v1 = datetime(2019, 7, 12, 11, 13, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# #取出要执行任务的时间对象,调用apply_async方法,args是参数,eta是执行的时间
# result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2)
# print(result.id) #这里需要注意的是result是个对象,你可以直接打印因为内部写了__str__

#第二种获取时间的方法
ctime = datetime.now()
# 默认用utc时间
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
#取10s之后的时间对象
time_delay = timedelta(secOnds=3)
task_time = utc_ctime + time_delay
result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time)
print(result.id) #这里需要注意的是result是个对象,你可以直接打印因为内部写了__str__

#任务提交完成后,需要启动worker,可以用命令启动:
# celery worker -A celery_task_s1 -l info
#windows上:celery worker -A celery_task_s1 -l info -P eventlet

 

 

 

celery周期任务

多任务结构中celery.py修改如下

#这个文件名必须叫celery,生成celery对象

from celery import Celery
from datetime import timedelta
from celery.schedules import  crontab
broker = 'redis://127.0.0.1:6379/0'
backend = 'redis://127.0.0.1:6379/1'
app = Celery('test',broker=broker,backend=backend,
             #包含以下链各个任务文件,去响应的py文件中找任务,对多个任务做分类
             include=[
                 'celery_task.order_task',
                 'celery_task.user_task',
                    ]
             )

#时区问题  每天的什么时候去执行任务
#时区
# app.conf.timezOne= 'Asia/Shanghai'
#是否使用UTC
# app.conf.enable_utc = False

#beat_schedule 定义定时任务的
app.conf.beat_schedule = {
    #名字随意命令
    'add-every-2-seconds':{
        #执行tasks1下的test_celery函数
        'task':'celery_task.order_task.order_add',
        #每隔2秒执行一次
        # 'schedule':1.0,  几秒钟几分钟几小时后schedule对应的参数
        # 'schedule':crontab(minute='*/1'),
        'schedule':timedelta(secOnds=10),
        #传递参数
        'args':(5,6)
    },
    # 'add-every-12-seconds':{
    #     'task':'celery_task.order_task.user_add',
    #     #每年4月11号,8点42分执行
    #     # 'schedule':crontab(minute=42,hour=8,day_of_mOnth=11,month_of_year=4),
    #     'schedule':crontab(minute=42,hour=8,day_of_mOnth=11,month_of_year=4),
    #     'args':(16,16)
    # }
}

 

创建worker的方式并没有发生变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给worker去执行,这里需要一个生产者beat

 

启动一个beat:(这个负责定时周期往里面放值)

celery beat -A celery_task -l info

 

启动worker执行(winds)

celery worker -A celery_task -l info -P  eventlet

 

 

django中使用celery

需要安装

celery==3.1.25
django-celery==3.1.20 

在项目目录下创建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情况可以防止死锁
CELERYD_FORCE_EXECV=True
# 设置并发worker数量
CELERYD_COnCURRENCY=4
#允许重试
CELERY_ACKS_LATE=True
# 每个worker最多执行100个任务被销毁,可以防止内存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超时时间
CELERYD_TASK_TIME_LIMIT=12*30

 

第一种 直接把多任务结构文件直接拷贝在项目目录下

  注意在celery的任务函数中不能直接调用django的环境,需要手动添加

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
    import django
    django.setup()

 

第二种 在app01目录下创建tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

视图函数views.py(定时任务)

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(secOnds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

 

settings.py

INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig    #from 这个django项目 import celeryconfig就是上面配置的文件
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

 

https://www.cnblogs.com/znicy/p/5626040.html

http://www.manongjc.com/article/7864.html

 

 


推荐阅读
  • 修复一个 Bug 竟耗时两天?真的有那么复杂吗?
    修复一个 Bug 竟然耗费了两天时间?这背后究竟隐藏着怎样的复杂性?本文将深入探讨这个看似简单的 Bug 为何会如此棘手,从代码层面剖析问题根源,并分享解决过程中遇到的技术挑战和心得。 ... [详细]
  • 在C语言中,定义一个包含学号、姓名和年龄的学生信息结构体,并遵循严格的命名规范。首先,初始化结构体变量的所有成员为默认值,然后将其学号设为88,姓名设为“liming”,年龄设为25。最后,在控制台上输出该结构体变量的详细信息,以验证数据的正确性。例如,使用 `typedef struct Student` 定义结构体类型。 ... [详细]
  • 【并发编程】全面解析 Java 内存模型,一篇文章带你彻底掌握
    本文深入解析了 Java 内存模型(JMM),从基础概念到高级特性进行全面讲解,帮助读者彻底掌握 JMM 的核心原理和应用技巧。通过详细分析内存可见性、原子性和有序性等问题,结合实际代码示例,使开发者能够更好地理解和优化多线程并发程序。 ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 工程项目管理系统源码简洁+好用+全面工程项目管理系统
    ​​工程项目管理系统是指从事工程项目管理的企业(以下简称工程项目管理企业)受业主委托,按照合同约定,代表业主对工程项目的组织 ... [详细]
  • 分布式一致性算法:Paxos 的企业级实战
    一、简介首先我们这个平台是ES专题技术的分享平台,众所周知,ES是一个典型的分布式系统。在工作和学习中,我们可能都已经接触和学习过多种不同的分布式系统了,各 ... [详细]
  • 本文详细解析了使用C++实现的键盘输入记录程序的源代码,该程序在Windows应用程序开发中具有很高的实用价值。键盘记录功能不仅在远程控制软件中广泛应用,还为开发者提供了强大的调试和监控工具。通过具体实例,本文深入探讨了C++键盘记录程序的设计与实现,适合需要相关技术的开发者参考。 ... [详细]
  • 本文介绍了一种自定义的Android圆形进度条视图,支持在进度条上显示数字,并在圆心位置展示文字内容。通过自定义绘图和组件组合的方式实现,详细展示了自定义View的开发流程和关键技术点。示例代码和效果展示将在文章末尾提供。 ... [详细]
  • 本文深入探讨了URAL 1297问题,重点分析了使用后缀数组求解最长回文子串的方法。通过详细解析算法的实现步骤和优化策略,本文提供了高效的解决方案,并结合实际案例进行了验证。此外,文章还讨论了后缀数组在字符串处理中的广泛应用及其性能优势。 ... [详细]
  • 通过使用七牛云存储服务,本文详细介绍了如何将本地图片高效上传至云端,并实现了内容的便捷管理。借助七牛云的 Python SDK,文章提供了从认证到文件上传的具体代码示例,包括导入必要的库、生成上传凭证以及处理文件路径等关键步骤。此外,还探讨了如何利用七牛云的 URL 安全编码功能,确保数据传输的安全性和可靠性。 ... [详细]
  • 数据结构与算法:HyperLogLog 统计、布隆过滤器应用、缓存机制挑战及解决方案、Redis 性能优化与监控、哨兵模式、版本控制工具 Git
    本文探讨了数据结构与算法在实际应用中的多个方面。首先介绍了HyperLogLog算法,用于高效地进行基数统计,能够准确估算大规模数据集中的唯一元素数量。接着讨论了布隆过滤器的应用,该过滤器在空间效率和查询速度上具有显著优势,适用于大数据场景下的快速成员检测。此外,文章分析了缓存机制面临的挑战及其解决方案,包括LRU和LFU等策略,并详细阐述了Redis的性能优化与监控方法,如使用哨兵模式实现高可用性。最后,介绍了版本控制工具Git的基本操作和最佳实践,帮助开发者有效管理代码版本。 ... [详细]
  • 背景最近面试面得心力交瘁,由于没有高并发架构的实际项目经验,经常是在场景设计的面试题目上面栽跟头。上次就被问到了关于秒杀系统的设计,竟无 ... [详细]
author-avatar
手机电视2602907765
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有