热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:celery

篇首语:本文由编程笔记#小编为大家整理,主要介绍了celery相关的知识,希望对你有一定的参考价值。CeleryCele

篇首语:本文由编程笔记#小编为大家整理,主要介绍了celery相关的知识,希望对你有一定的参考价值。



Celery

Celery是一个异步任务框架,是一个独立运行的服务.(内置socket)

相当于一个生产者消费者模型的任务队列.

拥有高可用,异步,简易,等特点.

celery是一个独立的socket


官网

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/


Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend)(task result store)组成。

技术图片

RabbitMQ 异步消息队列,可编程

Redis 数据库


消息中间件

Celery本身不提供消息服务,但是可以方便的于第三方提供的消息中间件集成.包括RabbitMQ,Redis等.


任务执行单元

Worker是Celery提供的任务执行的单元.worker并发的运行在系统节点中.


任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等,一般需要支持过期时间.


使用场景

异步执行delay():解决耗时任务

# 添加任务
# result = add.delay(20, 30)
# print(result.id)

1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
2)给worker对应的app添加可处理的任务函数
3)启动celery服务,运行worker
4)书写添加任务的脚本,执行脚本添加任务到broker,worker会自己异步从broker中拿任务执行,执行结果放在backend中
5) 书写获取任务结果的脚本,明确任务id与执行的app,获取任务结果

延迟执行:解决延迟任务

# 添加延迟任务
from datetime import datetime, timedelta
result = add.apply_async(args=(10, 20), eta=datetime.utcnow() + timedelta(secOnds=10))
print(result)

1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
3)启动celery服务,运行worker
4)书写添加任务的脚本,执行脚本添加任务到broker,worker会自己异步从broker中拿任务执行,执行结果放在backend中
5) 书写获取任务结果的脚本,明确任务id与执行的app,获取任务结果

定时执行:解决周期(周期)任务

# 时区
app.conf.timezOne= 'Asia/Shanghai'
# 是否使用
UTCapp.conf.enable_utc = False
# 定时任务配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_task.tasks.add',
'schedule': timedelta(secOnds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (20, 10),
},
'low-task': {
'task': 'celery_task.tasks.low',
'schedule': timedelta(secOnds=6),
'args': (20, 10), },
}

1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
3)完成提供的任务的定时配置app.conf.beat_schedule
4)启动celery服务,运行worker,执行任务
5)启动beat服务,运行beat,添加任务

Celery安装与配置

安装

pip install celery

消息中间件

RaddbiMQ/Redis


实例化对象

app = Celert('任务名',broker="xxx",backend="xxx")

Celery执行异步任务

包架构封装

project
├── celery_task # celery包
│ ├── __init__.py # 包文件
│ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py
│ └── tasks.py # 所有任务函数
├── add_task.py # 添加任务
└── get_result.py # 获取结果

异步执行


celery.py

# 1)创建app + 任务
# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
# 3)添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
# 4)获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
from celery import Celery
#broker
broker = 'redis://127.0.0.1:6379/1'
#backend
backend = 'redis://127.0.0.1:6379/2'
#worker
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks']) #include是任务tasks

task.py

from .celery import app
import time
@app.task
def add(n, m):
print(n)
print(m)
time.sleep(10)
print('n+m的结果:%s' % (n + m))
return n + m
@app.task
def low(n, m):
print(n)
print(m)
print('n-m的结果:%s' % (n - m))
return n - m

add_task.py

from celery_task import tasks
# 添加立即执行任务
t1 = tasks.add.delay(10, 20)
t2 = tasks.low.delay(100, 50)
print(t1.id)
# 添加延迟任务
from datetime import datetime, timedelta
eta=datetime.utcnow() + timedelta(secOnds=10)
tasks.low.apply_async(args=(200, 50), eta=eta)

get_result.py

from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

定时任务


celery.py

# 1)创建app + 任务
# 2)启动celery(app)服务:
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务
# 命令:celery beat -A celery_task -l info
# 4)获取结果
from celery import Celery
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
# 时区
app.conf.timezOne= 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False
# 任务的定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'low-task': {
'task': 'celery_task.tasks.low',
'schedule': timedelta(secOnds=3),
# 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点
'args': (300, 150),
}
}

tasks.py

from .celery import app
import time
@app.task
def add(n, m):
print(n)
print(m)
time.sleep(10)
print('n+m的结果:%s' % (n + m))
return n + m
@app.task
def low(n, m):
print(n)
print(m)
print('n-m的结果:%s' % (n - m))
return n - m

get_result.py

from celery_task.celery import app
from celery.result import AsyncResult
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

推荐阅读
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • 本文作者分享了在阿里巴巴获得实习offer的经历,包括五轮面试的详细内容和经验总结。其中四轮为技术面试,一轮为HR面试,涵盖了大量的Java技术和项目实践经验。 ... [详细]
  • 深入解析Redis内存对象模型
    本文详细介绍了Redis内存对象模型的关键知识点,包括内存统计、内存分配、数据存储细节及优化策略。通过实际案例和专业分析,帮助读者全面理解Redis内存管理机制。 ... [详细]
  • Java项目分层架构设计与实践
    本文探讨了Java项目中应用分层的最佳实践,不仅介绍了常见的三层架构(Controller、Service、DAO),还深入分析了各层的职责划分及优化建议。通过合理的分层设计,可以提高代码的可维护性、扩展性和团队协作效率。 ... [详细]
  • 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
    Java并发编程实践目录并发编程01——ThreadLocal并发编程02——ConcurrentHashMap并发编程03——阻塞队列和生产者-消费者模式并发编程04——闭锁Co ... [详细]
  • 优化SQL Server批量数据插入存储过程的实现
    本文介绍了一种改进的SQL Server存储过程,用于生成批量插入语句。该方法不仅提高了性能,还支持单行和多行模式,适用于SQL Server 2005及以上版本。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • 本文详细介绍了C语言的起源、发展及其标准化过程,涵盖了从早期的BCPL和B语言到现代C语言的演变,并探讨了其在操作系统和跨平台编程中的重要地位。 ... [详细]
  • Netflix利用Druid实现高效实时数据分析
    本文探讨了全球领先的在线娱乐公司Netflix如何通过采用Apache Druid,实现了高效的数据采集、处理和实时分析,从而显著提升了用户体验和业务决策的准确性。文章详细介绍了Netflix在系统架构、数据摄取、管理和查询方面的实践,并展示了Druid在大规模数据处理中的卓越性能。 ... [详细]
  • 深入理解Lucene搜索机制
    本文旨在帮助读者全面掌握Lucene搜索的编写步骤、核心API及其应用。通过详细解析Lucene的基本查询和查询解析器的使用方法,结合架构图和代码示例,带领读者深入了解Lucene搜索的工作流程。 ... [详细]
  • 嵌入式开发环境搭建与文件传输指南
    本文详细介绍了如何为嵌入式应用开发搭建必要的软硬件环境,并提供了通过串口和网线两种方式将文件传输到开发板的具体步骤。适合Linux开发初学者参考。 ... [详细]
  • PostgreSQL 最新动态 —— 2022年4月6日
    了解 PostgreSQL 社区的最新进展和技术分享 ... [详细]
  • 优化Flask应用的并发处理:解决Mysql连接过多问题
    本文探讨了在Flask应用中通过优化后端架构来应对高并发请求,特别是针对Mysql 'too many connections' 错误的解决方案。我们将介绍如何利用Redis缓存、Gunicorn多进程和Celery异步任务队列来提升系统的性能和稳定性。 ... [详细]
  • 对于许多初学者而言,遇到总线错误(bus error)或段错误(segmentation fault/core dump)是极其令人困扰的。本文详细探讨了这两种错误的成因、表现形式及解决方法,并提供了实用的调试技巧。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
author-avatar
南昌思锐
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有