作者:哪来的咸鱼 | 来源:互联网 | 2023-09-14 18:54
celery官方文档:Celery-DistributedTaskQueue—Celery5.3.0b1documentationDjango使用celery官方文
celery官方文档:Celery - Distributed Task Queue — Celery 5.3.0b1 documentation
Django使用celery官方文档:First steps with Django — Celery 5.3.0b1 documentation
本次使用的是python3.9环境,以这个项目为例 GitHub - nineaiyu/xshare: 基于阿里云盘的文件分享平台
1.安装Django和celery
可以不用填写Django,安装celery的时候,会自动安装对应版本的Django
pip install celery django-celery-beat django-celery-results celery[redis]
2.配置celery
编辑Django配置setting.py添加如下内容
# https://docs.celeryq.dev/en/stable/userguide/configuration.html?
CELERY_TIMEZOnE= "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'default'# broker redis
DJANGO_DEFAULT_CACHES = CACHES['default']
CELERY_BROKER_URL = 'redis://:%s@%s/2' % (DJANGO_DEFAULT_CACHES["OPTIONS"]["PASSWORD"], DJANGO_DEFAULT_CACHES["LOCATION"].split("/")[2])CELERY_WORKER_COnCURRENCY= 10 # worker并发数
CELERYD_FORCE_EXECV = True # 非常重要,有些情况下可以防止死
CELERY_RESULT_EXPIRES = 3600 # 任务结果过期时间CELERY_WORKER_DISABLE_RATE_LIMITS = True # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
CELERY_WORKER_PREFETCH_MULTIPLIER = 60 # celery worker 每次去redis取任务的数量CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = True# CELERY_ACCEPT_COnTENT= ['json']
# CELERY_TASK_SERIALIZER = 'json'# celery消息的序列化方式,由于要把对象当做参数所以使用pickle,使用pickle,worker必须非root用户启动
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_COnTENT= ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
更加详细的配置参考文档:Configuration and defaults — Celery 5.2.7 documentation
3,创建celery实例
需要在项目和settings.py同级文件 celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project : server
# filename : celery
# author : ly_13
# date : 2022/9/23import osfrom celery import Celery# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xshare.settings')app = Celery('xshare')# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django apps.
app.autodiscover_tasks()@app.task(bind=True, ignore_result=True)
def debug_task(self):print(f'Request: {self.request!r}')
还需要在xshare/__init__.py添加如下内容
from .celery import app as celery_app__all__ = ('celery_app',)
4,在任意一个app下创建一个 tasks.py文件,目录结构类似如下
- app1/- tasks.py- models.py
- app2/- tasks.py- models.py
5,创建一个定时任务,定时任务需要用到 app.task装饰器
在tasks.py文件添加如下内容
from xshare.celery import app
from datetime import datetime, timedelta
from django.utils import timezone
from api.models import ShareCode@app.task
def auth_clean_invalid_share():default_timezOne= timezone.get_default_timezone()value = timezone.make_aware(datetime.now(), default_timezone)deleted, _ = ShareCode.objects.filter(file_id__isnull=True, expired_time__lt=value).delete()
在settings.py 增加如下内容,意思每天凌晨3点3分执行该任务
from celery.schedules import crontabCELERY_BEAT_SCHEDULE = {
'auth_clean_invalid_share_job': {'task': 'api.tasks.auth_clean_invalid_share','schedule': crontab(hour=3, minute=3),'args': ()}
}
6,定义一个异步任务,需要用到shared_task
同样在tasks.py文件添加内容如下
@shared_task
def sync_drive_size(batch_queryset):for drive_obj in batch_queryset:try:ali_obj = get_aliyun_drive(drive_obj)default_drive_obj = ali_obj.get_default_drive()drive_obj.total_size = default_drive_obj.total_sizedrive_obj.used_size = default_drive_obj.used_sizedrive_obj.active = Truedrive_obj.save(update_fields=['total_size', 'used_size', 'active', 'updated_time'])logger.info(f'{drive_obj} update size success')except Exception as e:logger.warning(f'{drive_obj} update drive size failed:{e}')
使用的时候,直接导包调用,调用方式如下
sync_drive_size.apply_async(args=(batch_queryset,))
如果需要延时执行,比如延迟600秒执行,可以这样定义
def eta_second(second):ctime = datetime.now()utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())time_delay = timedelta(secOnds=second)return utc_ctime + time_delaytask = sync_drive_size.apply_async(args=([drive_obj],), eta=eta_second(60 * 10))
获取异步执行结果,超时1秒,若在超时范围内未获取到结果,则会抛 TimeoutError异常
result = c_task.get(propagate=False, timeout=1)
如果任务状态已经完成,清理任务数据
if c_task.successful():c_task.forget()
from celery.exceptions import TimeoutErrorc_task = sync_drive_size.apply_async(args=(batch_queryset,))
try:result = c_task.get(propagate=False, timeout=1)logger.info(f"task_id:{task_id} result:{result}")
except TimeoutError:logger.error(f"task_id:{task_id} result timeout.")result = {'task_id': task_id}
if c_task.successful():c_task.forget()
7,启动celery-work和beat
## 开发环境全部启动work和beat
celery -A xshare worker --uid=nginx --beat --scheduler django --loglevel=debug## 只启动worker,配置使用 celery消息的序列化方式为pickle, 因此启动的使用,为了安全,必须指定uid
celery -A xshare worker -l INFO --uid=nginx ## 只启动beat
celery -A xshare beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
基础操作大概就这些,更进阶操作请查看官方文档 Celery - Distributed Task Queue — Celery 5.3.0b1 documentation