结合django的定时任务
应用场景:
1、异步执行,如耗时任务;
2、定时任务
broker:redis
执行任务:worker
celery_pro目录:
celeryf.py __init__.py config.py tasks1.py tasks2.py
celeryf.py
from __future__ import absolute_import,unicode_literals
from celery import Celeryapp = Celery(main='mycelery', #主模块名broker='redis://:lyb@localhost:6379/0', #消息代理backend='redis://:lyb@localhost:6379/0', #结果存储include=['celery_pro.tasks1','celery_pro.tasks2'], #定义worker的文件,不含.py
)app.config_from_object('celery_pro.config') #读取配置文件,可以是导入的模块,也可以是点分隔的文件字符串表示if __name__ == '__main__':app.start()
__init__.py #空
config.py #http://docs.celeryproject.org/en/latest/userguide/configuration.html
import os
from datetime import timedelta
from celery.schedules import crontabBASE_DIR = os.path.abspath('')
# PATH_TEST = 'path_test'CELERY_ENABLE_UTC = False # 不是用UTC
CELERY_TIMEZONE = 'Asia/Shanghai'CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务结果的时效时间
# CELERYD_LOG_FILE = BASE_DIR + "/log/celery/celery.log" # log路径
# CELERYBEAT_LOG_FILE = BASE_DIR + "/log/celery/beat.log" # beat log路径
# CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # 允许接受的格式 一般采用msgpack(快)和json(跨平台)
# CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果格式
# CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化反序列化采用msgpackCELERYBEAT_SCHEDULE = {'add-every-30-seconds': {'task': 'celery_pro.tasks2.add_2','schedule': 30.0,'args': (16, 16)},'add-every-10-seconds': {'task': 'celery_pro.tasks1.test','schedule': 10.0,'args': ['nihao']},# Executes every Monday morning at 7:30 a.m.'add-every-monday-morning': {'task': 'celery_pro.tasks2.add_2','schedule': crontab(day_of_week='monday'),'args': (16, 16),},
}
tasks1.py
from __future__ import absolute_import,unicode_literals
from celery.schedules import crontab
from .celeryf import app#1、第一种形式
# @app.on_after_configure.connect
# def setup_periodic_tasks(sender, **kwargs):
# # Calls test('hello') every 10 seconds.
# sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
#
# # Calls test('world') every 30 seconds
# sender.add_periodic_task(30.0, test.s('world'), expires=10)
#
# # Executes every Monday morning at 7:30 a.m.
# sender.add_periodic_task(
# crontab(hour=7, minute=30, day_of_week=1),
# test.s('Happy Mondays!'),
# )#2、第二种形式
# app.conf.beat_schedule = {# 'add-every-30-seconds': {# 'task': 'celery_pro.tasks2.add_2',# 'schedule': 30.0,# 'args': (16, 16)# },# 'add-every-10-seconds': {# 'task': 'celery_pro.tasks1.test',# 'schedule': 10.0,# 'args': ['nihao']# },# # Executes every Monday morning at 7:30 a.m.# 'add-every-monday-morning': {# 'task': 'celery_pro.tasks2.add_2',# 'schedule': crontab(),# 'args': (16, 16),# },
# }@app.task
def test(arg):print(arg)
tasks2.py
from __future__ import unicode_literals,absolute_import
from .celery import app
import time@app.task
def add_2(x,y):print('x + y :')return x+y@app.task
def mul(x,y):time.sleep(20)return time.time()
启动worker:
(venv) l@l:~/myfile/pycharm/py3$ python celery_pro/celeryf.py worker -l debug
启动celery beat即调度器:
celery -A celery_pro.tasks1 beat -l debug