热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

django使用celery异步任务队列

celery官方文档:Celery-DistributedTaskQueue—Celery5.3.0b1documentationDjango使用celery官方文

celery官方文档:Celery - Distributed Task Queue — Celery 5.3.0b1 documentation

Django使用celery官方文档:First steps with Django — Celery 5.3.0b1 documentation 

本次使用的是python3.9环境,以这个项目为例 GitHub - nineaiyu/xshare: 基于阿里云盘的文件分享平台


1.安装Django和celery


可以不用填写Django,安装celery的时候,会自动安装对应版本的Django

pip install celery django-celery-beat django-celery-results celery[redis]

 2.配置celery


编辑Django配置setting.py添加如下内容

# https://docs.celeryq.dev/en/stable/userguide/configuration.html?
CELERY_TIMEZOnE= "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60CELERY_RESULT_BACKEND = 'django-db'
CELERY_CACHE_BACKEND = 'default'# broker redis
DJANGO_DEFAULT_CACHES = CACHES['default']
CELERY_BROKER_URL = 'redis://:%s@%s/2' % (DJANGO_DEFAULT_CACHES["OPTIONS"]["PASSWORD"], DJANGO_DEFAULT_CACHES["LOCATION"].split("/")[2])CELERY_WORKER_COnCURRENCY= 10 # worker并发数
CELERYD_FORCE_EXECV = True # 非常重要,有些情况下可以防止死
CELERY_RESULT_EXPIRES = 3600 # 任务结果过期时间CELERY_WORKER_DISABLE_RATE_LIMITS = True # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
CELERY_WORKER_PREFETCH_MULTIPLIER = 60 # celery worker 每次去redis取任务的数量CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行了多少任务就会死掉,我建议数量可以大一些,比如200CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = True# CELERY_ACCEPT_COnTENT= ['json']
# CELERY_TASK_SERIALIZER = 'json'# celery消息的序列化方式,由于要把对象当做参数所以使用pickle,使用pickle,worker必须非root用户启动
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_COnTENT= ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'

更加详细的配置参考文档:Configuration and defaults — Celery 5.2.7 documentation 


3,创建celery实例


需要在项目和settings.py同级文件 celery.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# project : server
# filename : celery
# author : ly_13
# date : 2022/9/23import osfrom celery import Celery# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'xshare.settings')app = Celery('xshare')# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')# Load task modules from all registered Django apps.
app.autodiscover_tasks()@app.task(bind=True, ignore_result=True)
def debug_task(self):print(f'Request: {self.request!r}')

 还需要在xshare/__init__.py添加如下内容

from .celery import app as celery_app__all__ = ('celery_app',)

4,在任意一个app下创建一个 tasks.py文件,目录结构类似如下

- app1/- tasks.py- models.py
- app2/- tasks.py- models.py

5,创建一个定时任务,定时任务需要用到 app.task装饰器


在tasks.py文件添加如下内容

from xshare.celery import app
from datetime import datetime, timedelta
from django.utils import timezone
from api.models import ShareCode@app.task
def auth_clean_invalid_share():default_timezOne= timezone.get_default_timezone()value = timezone.make_aware(datetime.now(), default_timezone)deleted, _ = ShareCode.objects.filter(file_id__isnull=True, expired_time__lt=value).delete()

在settings.py 增加如下内容,意思每天凌晨3点3分执行该任务

from celery.schedules import crontabCELERY_BEAT_SCHEDULE = {
'auth_clean_invalid_share_job': {'task': 'api.tasks.auth_clean_invalid_share','schedule': crontab(hour=3, minute=3),'args': ()}
}

6,定义一个异步任务,需要用到shared_task


同样在tasks.py文件添加内容如下

@shared_task
def sync_drive_size(batch_queryset):for drive_obj in batch_queryset:try:ali_obj = get_aliyun_drive(drive_obj)default_drive_obj = ali_obj.get_default_drive()drive_obj.total_size = default_drive_obj.total_sizedrive_obj.used_size = default_drive_obj.used_sizedrive_obj.active = Truedrive_obj.save(update_fields=['total_size', 'used_size', 'active', 'updated_time'])logger.info(f'{drive_obj} update size success')except Exception as e:logger.warning(f'{drive_obj} update drive size failed:{e}')

使用的时候,直接导包调用,调用方式如下

sync_drive_size.apply_async(args=(batch_queryset,))

如果需要延时执行,比如延迟600秒执行,可以这样定义

def eta_second(second):ctime = datetime.now()utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())time_delay = timedelta(secOnds=second)return utc_ctime + time_delaytask = sync_drive_size.apply_async(args=([drive_obj],), eta=eta_second(60 * 10))

获取异步执行结果,超时1秒,若在超时范围内未获取到结果,则会抛 TimeoutError异常

result = c_task.get(propagate=False, timeout=1)

如果任务状态已经完成,清理任务数据

if c_task.successful():c_task.forget()

from celery.exceptions import TimeoutErrorc_task = sync_drive_size.apply_async(args=(batch_queryset,))
try:result = c_task.get(propagate=False, timeout=1)logger.info(f"task_id:{task_id} result:{result}")
except TimeoutError:logger.error(f"task_id:{task_id} result timeout.")result = {'task_id': task_id}
if c_task.successful():c_task.forget()

7,启动celery-work和beat


## 开发环境全部启动work和beat
celery -A xshare worker --uid=nginx --beat --scheduler django --loglevel=debug## 只启动worker,配置使用 celery消息的序列化方式为pickle, 因此启动的使用,为了安全,必须指定uid
celery -A xshare worker -l INFO --uid=nginx ## 只启动beat
celery -A xshare beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

 

基础操作大概就这些,更进阶操作请查看官方文档 Celery - Distributed Task Queue — Celery 5.3.0b1 documentation


推荐阅读
  • Jenkins API当前未直接提供获取任务构建队列长度的功能,因此需要通过解析HTML页面来间接实现这一需求。 ... [详细]
  • 关于进程的复习:#管道#数据的共享Managerdictlist#进程池#cpu个数1#retmap(func,iterable)#异步自带close和join#所有 ... [详细]
  • Spring Boot + RabbitMQ 消息确认机制详解
    本文详细介绍如何在 Spring Boot 项目中使用 RabbitMQ 的消息确认机制,包括消息发送确认和消息接收确认,帮助开发者解决在实际操作中可能遇到的问题。 ... [详细]
  • SDWebImage第三方库学习
    1、基本使用方法异步下载并缓存-(void)sd_setImageWithURL:(nullableNSURL*)urlNS_REFINED_FOR_SWIFT;使用占位图片& ... [详细]
  • 使用Tkinter构建51Ape无损音乐爬虫UI
    本文介绍了如何使用Python的内置模块Tkinter来构建一个简单的用户界面,用于爬取51Ape网站上的无损音乐百度云链接。虽然Tkinter入门相对简单,但在实际开发过程中由于文档不足可能会带来一些不便。 ... [详细]
  • 本文详细介绍了Linux系统中用于管理IPC(Inter-Process Communication)资源的两个重要命令:ipcs和ipcrm。通过这些命令,用户可以查看和删除系统中的消息队列、共享内存和信号量。 ... [详细]
  • 我的读书清单(持续更新)201705311.《一千零一夜》2006(四五年级)2.《中华上下五千年》2008(初一)3.《鲁滨孙漂流记》2008(初二)4.《钢铁是怎样炼成的》20 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 管理UINavigationController中的手势返回 - Managing Swipe Back Gestures in UINavigationController
    本文介绍了如何在一个简单的闪存卡片应用中实现平滑的手势返回功能,以增强用户体验。 ... [详细]
  • 题目描述:计算从起点到终点的最小能量消耗。如果下一个单元格的风向与当前单元格相同,则消耗为0,否则为1。共有8个可能的方向。 ... [详细]
  • 深入解析Dubbo:使用与源码分析
    本文详细介绍了Dubbo的使用方法和源码分析,涵盖其架构设计、核心特性和调用流程。 ... [详细]
  • 在运行于MS SQL Server 2005的.NET 2.0 Web应用中,我偶尔会遇到令人头疼的SQL死锁问题。过去,我们主要通过调整查询来解决这些问题,但这既耗时又不可靠。我希望能找到一种确定性的查询模式,确保从设计上彻底避免SQL死锁。 ... [详细]
  • centos 7.0 lnmp成功安装过程(很乱)
    下载nginx[rootlocalhostsrc]#wgethttp:nginx.orgdownloadnginx-1.7.9.tar.gz--2015-01-2412:55:2 ... [详细]
  • 本文介绍了多种开源数据库及其核心数据结构和算法,包括MySQL的B+树、MVCC和WAL,MongoDB的tokuDB和cola,boltDB的追加仅树和mmap,levelDB的LSM树,以及内存缓存中的一致性哈希。 ... [详细]
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
author-avatar
哪来的咸鱼
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有