作者:mobiledu2502931987 | 来源:互联网 | 2023-09-10 21:20
Beanstalkd工作队列Beanstalkd是什么Beanstalkd是目前一个绝对可靠,易于安装的消息传递服务,主要用例是管理不同部分和工人之间的工作流应用程
Beanstalkd工作队列 Beanstalkd 是什么 Beanstalkd是目前一个绝对可靠,易于安装的消息传递服务,主要用例是管理不同部分和工人之间的工作流应用程序的部署通过工作队列和消息堆栈,类似于其他受欢迎的解决方案,比如RabbitMQ。然而,创建Beanstalkd使它有别于其他工作。Beanstalkd旨在成为一个工作队列,而不是一把雨伞工具来满足许多需求。为了实现这一目的,它作为一种轻量级的、快速有效的应用程序基于C编程语言。精益建筑还允许它是安装和使用非常简单,使它适合大多数用例。
Beanstalkd 的 Features(特性) 持久性:Beanstalkd运行使用内存,但也提供了持久性支持。 优先级:与大多数选择一样,Beanstalkd提供了不同的任务的优先级来处理紧急事情时需要。 分布 : 不同的服务器实例可以分布类似于Memcached是如何工作的。 掩盖 :有可能通过掩盖它无限期延期的作业(即任务)。 第三方工具:Beanstalkd附带各种第三方工具包括综合领先指标和基于web的管理控制台。 .过期 :工作可以设置为过期,auto-queue之后(TTR – Time To Run) Beanstalkd使用案例 允许web服务器快速响应请求,而不是被迫当场曾推高程序执行 在指定的时间间隔执行某些工作(即爬行web) 分发到多个工作人员进行处理 让离线客户端(例如一个断开连接的用户)获取数据在稍后的时间,而不是让它永久失去 极大地提高应用程序的可靠性和正常运行时间 Beanstalkd 的安装 ################# 第一种安装方法,但是要安装git yum install -y git git clone https://github.com /kr/beanstalkd cd beanstalkd make make install ################### 第二种源码安装 wget http://cloud.github .com /downloads/kr/beanstalkd/beanstalkd-1.4 .6 .tar .gz tar xzf beanstalkd-1.4 .6 .tar .gz cd beanstalkd-1.4 .6 ./configure make make instal############# 查看安装的版本 beanstalkd -v
Beanstalkd 的使用
# Usage: beanstalkd -l [ip address] -p [port #] beanstalkd -l 127.0 .0 .1 -p 11301 & # ip可以使用这台机器的ip,端口可以更改; #如果想切换到后台运行: nohup beanstalkd -l 127.0 .0 .1 -p 11301 & #----------------------------如果不适用源文件安装的启动 # To start the service: service beanstalkd start # To stop the service: service beanstalkd stop # To restart the service: service beanstalkd restart # To check the status: service beanstalkd status
python 调用 beanstalkd 安装beanstalkc pip install pyyaml pip install beanstalkc
基本操作 import beanstalkc beanstalk = beanstalkc.Connection(host='localhost' , port=11301 )
put(建立一个job) 将一个job加入到当前tube中、然后马上设置job状态为“Ready”比如 beanstalk.put ('hey!' ) beanstalk.put ('yes!' , delay=1 ) beanstalk.put ('foo' , priority=42 ) beanstalk.put ('bar' , priority=21 ) beanstalk.put ('qux' , priority=21 )
release(提前释放一个delay的job到Ready队列) 如果你使用了delay来延迟一个job、后来又反悔了、那么你可以使用release 来提前释放这个job、让它早日回归当前tube并设置为“Ready”状态、但迩不可以对非当前tube的job使用release 命令、要获取非当前tube的job可以通过peek命令beanstalk.put('yes !', delay=900 )job.release ()job.stats()['state '] ##################### 输出'ready '
put 是往tube里加入job、那么reserve指令则是从tube里取出job、默认情况下、reserve是按job的序号从小取到大、如果像上面使用put 设置了job的优先级那么是按优先级高的job取值、如果优先级同等级、那么再按序号从小取到大 beanstalk.reserve(timeout =0 ) is None True 另外reserve可以如上所示设置timeout 的时间参数、如果把时间设置为0 、则任何时候都直接报timeout 而造成获取job失败、迩可以设置任何一个非负数作为timeout 的时间、如果超过timeout 的时间、那么这次的获取job的动作就会结束、等待下次再次调用reserve指令来获取job、而没有被获取的job则将置于“Ready”状态等待下次再次被reserve所调用
任何时间job都可以调用delete 指令来删除这个job job.delete ()
和使用delay参数不同、如果你不使用kick把它踢醒、它将永远处于沉睡状态、但迩不可以对非当前tube的job使用bury命令、要获取非当前tube的job可以通过peek命令 job = beanstalk.reserve() job.bury() job.stats()['state' ]'buried'
默认情况下、先叫醒最早睡觉的那个job beanstalk.kick()#这个命令还可以加个参数设置唤醒的数量、比如加上42、一次过唤醒42个job、还按照最先睡觉的那个job开始、先就是先进先出原则 beanstalk.kick(42 )###################### 输出 1 0
peek(查找并切换成目标job) #查找序号为3的job、也就是第三个建立的job、如果该序号不存在、那么返回空值None job = beanstalk.peek(3 ) job.body###################### 输出 'yes!'
peek_ready (查找并切换当前tube最先进入准备状态的job) #使用beanstalk获取最早一个处于“Ready”状态的job、注意、只能获取当前tube的job job = beanstalk.peek_ready() job.body####################### 输出 'yes!'
peek_delayed(查找并切换当前tube延迟时间最短的job) beanstalk.put('o tempores' , delay=120 ) job = beanstalk.peek_delayed() job.stats()['state' ] 输出4 'delayed'
peek_buried(查找并切换当前tube最先睡觉的job) beanstalk.put('o mores!' ) job = beanstalk.reserve() job.bury() job = beanstalk.peek_buried() job.stats()['state' ] 输出5 'buried'
tube(队列)的操作 #tubes或watching(查看所有tube):查看所有的tube beanstalk.tubes() ['default' ]#默认情况下、只存在一个tube、而这个tube的名称叫作“default”、这个管道一定会存在、不能被删除、其它的tube你可以任意删除 #ignore(删除tube):你可以使用ignore命令来删除任意非“default”的管道 beanstalk.ignore('weibo' )################输出 1 # use或watch(添加或者切换一个tube):你可以使用use或watch来添加或者切换一个tube、如果tube名称不存在、则创建一个同名tube、如果存在就切换到该tube下 beanstalk.use('foo' )##############输出 'foo' #using(查看当前使用的tube) :你可以使用using来查看当前使用的tube是什么 beanstalk.using ()################输出 'default'
eanstalk.close ()
import beanstalkcimport jsonimport sys bean = beanstalkc.Connection('172.31.25.12' ,11300 ) bean.watch('test' ) job = bean.reserve()while job:print job.bodyjob.delete()job = bean.reserve(timeout = 60 )if job is None :sys.exit(0 )
beanstalkc的文档