什么是celery
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
celery架构
celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
celery能做什么
异步任务
定时任务
周期任务
使用场景
耗时操作(所有的耗时操作都可以)将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务(每天定时推送微信服务号消息,)定时执行某件事情,比如每天数据统计
使用
pip install celery
项目结构
celery的简单实例
celery_task_s1.py代码
from celery import Celery #不加密码 broker = 'redis://127.0.0.1:6379/0' #指定broker backend = 'redis://127.0.0.1:6379/1' #指定存储结果 # 加密码 # broker = 'redis://:123456@127.0.0.1:6379/2' # backend = 'redis://:123456@127.0.0.1:6379/1' # 一定要指定一个名字 app = Celery('test',broker=broker,backend=backend) #任务其实就是个函数 # 需要用一个装饰器装饰,表示该任务是被celery管理的,并且可以用celery执行的 @app.task def add(x,y) -> int: import time time.sleep(2) return x+y
add_task.py
#用于提交任务的py文件 import celery_task_s1 #正常同步执行任务 会睡两秒在执行 # ret=celery_task_s1.add(3,4) # print(ret) #提交任务到消息队列中 #只是把任务提交到消息队列中,并没有执行 # ret=celery_task_s1.add.delay(3,4) # print(ret) #a5ea035f-0cc3-44ba-b334-f5d7c7ce681d :任务的id号 #任务提交完成后,需要启动worker,可以用命令启动: # celery worker -A celery_task_s1 -l info #windows上:celery worker -A celery_task_s1 -l info -P eventlet win上首次使用worker需要安装eventlet模块
celery_result.py
from celery_task_s1 import app from celery.result import AsyncResult async = AsyncResult(id='30314784-04bd-4ec5-acab-5cb22181cfcb',app=app) if async.successful(): #取出它return的值 result = async .get() print(result) # result.forget() 将结果删除 elif async.failed(): print('执行失败') elif async.status =="PENDING": print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
写一个py文件:celery_task
from celery_app_task import cel if __name__ == '__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')
async = AsyncResult(id="a5ea035f-0cc3-44ba-b334-f5d7c7ce681d", app=app) if async.successful(): #取出它return的值 result = async.get() print(result)
pro_cel ├── celery_task# celery相关文件夹 │ ├── celery.py # celery连接和配置相关文件,必须叫这个名字 │ └── order_task.py # 所有任务函数 │ └── user_task.py # 所有任务函数 ├── celery_result.py # 检查结果 └── add_task.py # 触发任务
celery.py文件
#这个文件名必须叫celery,生成celery对象 from celery import Celery from datetime import timedelta from celery.schedules import crontab broker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/1' app = Celery('test',broker=broker,backend=backend, #包含以下链各个任务文件,去响应的py文件中找任务,对多个任务做分类 include=[ 'celery_task.order_task', 'celery_task.user_task', ] ) #时区 # app.conf.timezOne= 'Asia/Shanghai' #是否使用UTC # app.conf.enable_utc = False
order_task.py
from celery_task.celery import app @app.task def order_add(x,y): import time time.sleep(1) return x+y
user_task.py
from celery_task.celery import app @app.task def user_add(x,y): import time time.sleep(1) return x+y
add_task.py
from celery_task.order_task import order_add from celery_task.user_task import user_add # ret=order_add.delay(5,6) ret=user_add.delay(10,60) print(ret)
celery_result.py
from celery.result import AsyncResult from celery_task.celery import app from add_task import ret async = AsyncResult(id='c951c4f7-dcfc-46fc-b60e-ee67b61211ec',app=app) if async.successful(): result = async.get() print(result) # result.forget() # 将结果删除,执行完成,结果不会自动删除 # async.revoke(terminate=True) # 无论现在是什么时候,都要终止 # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。 elif async.failed(): print('执行失败') elif async.status == 'PENDING': print('任务等待中被执行') elif async.status == 'RETRY': print('任务异常后正在重试') elif async.status == 'STARTED': print('任务已经开始被执行')
启动worker: celery_task是包的名字(文件夹名字)
celery worker -A celery_task -l info -P eventlet #放值的时候手动启动add_task文件
设定时间让celery执行一个任务
add_task.py 就是上面的执行异步任务的那个文件(往里面放值的时候使用定时方式)
#用于提交任务的py文件 import celery_task_s1 #执行定时任务:3s钟以后执行add任务 from datetime import datetime # v1 = datetime(2019, 7, 12, 11, 13, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # #取出要执行任务的时间对象,调用apply_async方法,args是参数,eta是执行的时间 # result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2) # print(result.id) #这里需要注意的是result是个对象,你可以直接打印因为内部写了__str__ #第二种获取时间的方法 ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta #取10s之后的时间对象 time_delay = timedelta(secOnds=3) task_time = utc_ctime + time_delay result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time) print(result.id) #这里需要注意的是result是个对象,你可以直接打印因为内部写了__str__ #任务提交完成后,需要启动worker,可以用命令启动: # celery worker -A celery_task_s1 -l info #windows上:celery worker -A celery_task_s1 -l info -P eventlet
多任务结构中celery.py修改如下
#这个文件名必须叫celery,生成celery对象 from celery import Celery from datetime import timedelta from celery.schedules import crontab broker = 'redis://127.0.0.1:6379/0' backend = 'redis://127.0.0.1:6379/1' app = Celery('test',broker=broker,backend=backend, #包含以下链各个任务文件,去响应的py文件中找任务,对多个任务做分类 include=[ 'celery_task.order_task', 'celery_task.user_task', ] ) #时区问题 每天的什么时候去执行任务 #时区 # app.conf.timezOne= 'Asia/Shanghai' #是否使用UTC # app.conf.enable_utc = False #beat_schedule 定义定时任务的 app.conf.beat_schedule = { #名字随意命令 'add-every-2-seconds':{ #执行tasks1下的test_celery函数 'task':'celery_task.order_task.order_add', #每隔2秒执行一次 # 'schedule':1.0, 几秒钟几分钟几小时后schedule对应的参数 # 'schedule':crontab(minute='*/1'), 'schedule':timedelta(secOnds=10), #传递参数 'args':(5,6) }, # 'add-every-12-seconds':{ # 'task':'celery_task.order_task.user_add', # #每年4月11号,8点42分执行 # # 'schedule':crontab(minute=42,hour=8,day_of_mOnth=11,month_of_year=4), # 'schedule':crontab(minute=42,hour=8,day_of_mOnth=11,month_of_year=4), # 'args':(16,16) # } }
创建worker的方式并没有发生变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给worker去执行,这里需要一个生产者beat
启动一个beat:(这个负责定时周期往里面放值)
celery beat -A celery_task -l info
启动worker执行(winds)
celery worker -A celery_task -l info -P eventlet
需要安装
celery==3.1.25
django-celery==3.1.20
在项目目录下创建celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些情况可以防止死锁 CELERYD_FORCE_EXECV=True # 设置并发worker数量 CELERYD_COnCURRENCY=4 #允许重试 CELERY_ACKS_LATE=True # 每个worker最多执行100个任务被销毁,可以防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超时时间 CELERYD_TASK_TIME_LIMIT=12*30
第一种 直接把多任务结构文件直接拷贝在项目目录下
注意在celery的任务函数中不能直接调用django的环境,需要手动添加
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings") import django django.setup()
第二种 在app01目录下创建tasks.py
from celery import task @task def add(a,b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a+b)
视图函数views.py(定时任务)
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(secOnds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from djagocele import celeryconfig #from 这个django项目 import celeryconfig就是上面配置的文件 BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
https://www.cnblogs.com/znicy/p/5626040.html
http://www.manongjc.com/article/7864.html