热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

利用Python学习RabbitMQ消息队列

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息,本篇文章给大家介绍利用Python学习RabbitMQ消息队列,对python消息队列相关知识感兴趣的朋友参考下
RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱、邮局、投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处。

RabbitMQ和邮局的主要区别就是RabbitMQ接收、存储和发送的是二进制数据----消息。

rabbitmq基本管理命令:

一步启动Erlang node和Rabbit应用:sudo rabbitmq-server

在后台启动Rabbit node:sudo rabbitmq-server -detached

关闭整个节点(包括应用):sudo rabbitmqctl stop

add_user  
delete_user 
change_password  
list_users
add_vhost 
delete_vhost 
list_vhosts
set_permissions [-p ]    
clear_permissions [-p ] 
list_permissions [-p ]
list_user_permissions 
list_queues [-p ] [ ...]
list_exchanges [-p ] [ ...]
list_bindings [-p ]
list_connections [ ...]

Demo:

producer.py

 #!/usr/bin/env python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 http://www.cnblogs.com/duanv/
 import pika
 import sys
 #创建连接connection到localhost
 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng,durable参数为真时,队列将持久化;exclusive为真时,建立临时队列
 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False)
 #创建名为yanfa,类型为fanout的exchange,其他类型还有direct和topic,如果指定durable为真,exchange将持久化
 cha.exchange_declare(durable=False,
           exchange='yanfa',
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa', 
        queue=result.method.queue,
        routing_key='',) 
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 #发送信息到队列‘anheng'
 message = ' '.join(sys.argv[:])
 #消息持久化指定delivery_mode=;
 cha.basic_publish(exchange='',
          routing_key='anheng',
          body=message,
          properties=pika.BasicProperties(
           delivery_mode = ,
         ))
 print '[x] Sent %r' % (message,)
 #关闭连接
 con.close()

consumer.py

 #!/usr/bin/env python
 # -*- coding: utf_ -*-
 # Date: 年月日
 # Author:蔚蓝行
 # 博客 http://www.cnblogs.com/duanv/
 import pika
 #建立连接connection到localhost
 con = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 #创建虚拟连接channel
 cha = con.channel()
 #创建队列anheng
 result=cha.queue_declare(queue='anheng',durable=True)
 #创建名为yanfa,类型为fanout的交换机,其他类型还有direct和topic
 cha.exchange_declare(durable=False,
           exchange='yanfa', 
           type='direct',)
 #绑定exchange和queue,result.method.queue获取的是队列名称
 cha.queue_bind(exchange='yanfa',
        queue=result.method.queue,
        routing_key='',)
 #公平分发,使每个consumer在同一时间最多处理一个message,收到ack前,不会分配新的message
 cha.basic_qos(prefetch_count=)
 print ' [*] Waiting for messages. To exit press CTRL+C'
 #定义回调函数
 def callback(ch, method, properties, body):
   print " [x] Received %r" % (body,)
   ch.basic_ack(delivery_tag = method.delivery_tag)
 cha.basic_consume(callback,
          queue='anheng',
          no_ack=False,)
 cha.start_consuming()

一、概念:

Connection: 一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。程序的起始处就是建立这个TCP连接。

Channels: 虚拟连接。建立在上述的TCP连接中。数据流动都是在Channel中进行的。一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

二、队列:

首先建立一个Connection,然后建立Channels,在channel上建立队列

建立时指定durable参数为真,队列将持久化;指定exclusive为真,队列为临时队列,关闭consumer后该队列将不再存在,一般情况下建立临时队列并不指定队列名称,rabbitmq将随机起名,通过result.method.queue来获取队列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

区别:durable是队列持久化与否,如果为真,队列将在rabbitmq服务重启后仍存在,如果为假,rabbitmq服务重启前不会消失,与consumer关闭与否无关;

而exclusive是建立临时队列,当consumer关闭后,该队列就会被删除

三、exchange和bind

Exchange中durable参数指定exchange是否持久化,exchange参数指定exchange名称,type指定exchange类型。Exchange类型有direct,fanout和topic。

Bind是将exchange与queue进行关联,exchange参数和queue参数分别指定要进行bind的exchange和queue,routing_key为可选参数。

Exchange的三种模式:

Direct:

任何发送到Direct Exchange的消息都会被转发到routing_key中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:””(该Exchange的名字为空字符串);

2.这种模式下不需要将Exchange进行任何绑定(bind)操作;

3.消息传递时需要一个“routing_key”,可以简单的理解为要发送到的队列名字;

4.如果vhost中不存在routing_key中指定的队列名,则该消息会被抛弃。

Demo中虽然声明了一个exchange='yanfa'和queue='anheng'的bind,但是在后面发送消息时并没有使用该exchange和bind,而是采用了direct的模式,没有指定exchange,而是指定了routing_key的名称为队列名,消息将发送到指定队列。

如果一个exchange 声明为direct,并且bind中指定了routing_key,那么发送消息时需要同时指明该exchange和routing_key.

Fanout:

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式

2.这种模式不需要routing_key

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

Demo中创建了一个将一个exchange和一个queue进行fanout类型的bind.但是发送信息时没有用到它,如果要用到它,只要在发送消息时指定该exchange的名称即可,该exchange就会将消息发送到所有和它bind的队列中。在fanout模式下,指定的routing_key是无效的 。

Topic:

任何发送到Topic Exchange的消息都会被转发到所有关心routing_key中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(routing_key),Exchange会将消息转发到所有关注主题能与routing_key模糊匹配的队列。

2.这种模式需要routing_key,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个routing_key为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与routing_key匹配的Queue,则会抛弃此消息。

四、任务分发

1.Rabbitmq的任务是循环分发的,如果开启两个consumer,producer发送的信息是轮流发送到两个consume的。

2.在producer端使用cha.basic_publish()来发送消息,其中body参数就是要发送的消息,properties=pika.BasicProperties(delivery_mode = 2,)启用消息持久化,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。

3.在接收端使用cha.basic_consume()无限循环监听,如果设置no-ack参数为真,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告诉rabbitmq消息已经正确处理。如果没有这条代码,Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分发:设置cha.basic_qos(prefetch_count=1),这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。

五、注意:

生产者和消费者都应该声明建立队列,网上教程上说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是queue的属性并不会被修改。

可能因为版本问题,在我的测试中如果第二次声明建立的队列属性和第一次不完全相同,将报类似这种错406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次创建属性不同,将报这种错406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次声明建立队列也出现这个错误,说明之前存在名字相同的队列且本次声明的某些属性和之前声明不同,可通过命令sudo rabbitmqctl list_queues查看当前有哪些队列。解决方法是声明建立另一名称的队列或删除原有队列,如果原有队列是非持久化的,可通过重启rabbitmq服务删除原有队列,如果原有队列是持久化的,只能删除它所在的vhost,然后再重建vhost,再设置vhost的权限(先确认该vhost中没有其他有用队列)。

sudo rabbitmqctl delete_vhost /
sudo rabbitmqctl add_vhost /
sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

以上内容是小编给大家介绍的利用Python学习RabbitMQ消息队列,希望大家喜欢。

推荐阅读
  • Windows下安装RaibbitMQ
    1.软件准备1.1erlang语言包到http:www.erlang.orgdownload.html下载,并且运行!安装目录C:\ProgramFil ... [详细]
  • 目录一、salt-job管理#job存放数据目录#缓存时间设置#Others二、returns模块配置job数据入库#配置returns返回值信息#mysql安全设置#创建模块相关 ... [详细]
  • .Net下RabbitMQ发布订阅模式实践
    一、概念AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的 ... [详细]
  • RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP,SMTP,STOMP,也 ... [详细]
  • 雨林木风 GHOST XP SP3 经典珍藏版 V2017.11
    雨林木风 GHOST XP SP3 经典珍藏版 V2017.11 ... [详细]
  • 本文详细介绍了浏览器的同源策略及其重要性,并探讨了多种实现跨域访问的方法。同源策略是浏览器的一项核心安全机制,确保不同源的客户端脚本无法在未经授权的情况下读取或修改其他来源的资源。例如,a.com下的JavaScript代码通过AJAX请求获取b.com的数据将被阻止。 ... [详细]
  • 本文详细介绍了Flask项目的配置方法,包括DEBUG模式的设置和配置文件的使用,帮助开发者更好地理解和应用Flask框架。 ... [详细]
  • Hadoop发行版本选择指南:技术解析与应用实践
    本文详细介绍了Hadoop的不同发行版本及其特点,帮助读者根据实际需求选择最合适的Hadoop版本。内容涵盖Apache Hadoop、Cloudera CDH等主流版本的特性及应用场景。 ... [详细]
  • 采用IKE方式建立IPsec安全隧道
    一、【组网和实验环境】按如上的接口ip先作配置,再作ipsec的相关配置,配置文本见文章最后本文实验采用的交换机是H3C模拟器,下载地址如 ... [详细]
  • 本文介绍了数据库体系的基础知识,涵盖关系型数据库(如MySQL)和非关系型数据库(如MongoDB)的基本操作及高级功能。通过三个阶段的学习路径——基础、优化和部署,帮助读者全面掌握数据库的使用和管理。 ... [详细]
  • 福克斯新闻数据库配置失误导致1300万条敏感记录泄露
    由于数据库配置错误,福克斯新闻暴露了一个58GB的未受保护数据库,其中包含约1300万条网络内容管理记录。任何互联网用户都可以访问这些数据,引发了严重的安全风险。 ... [详细]
  • ZooKeeper集群脑裂问题及其解决方案
    本文深入探讨了ZooKeeper集群中可能出现的脑裂问题,分析其成因,并提供了多种有效的解决方案,确保集群在高可用性环境下的稳定运行。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • Python 工具推荐 | PyHubWeekly 第二十一期:提升命令行体验的五大工具
    本期 PyHubWeekly 为大家精选了 GitHub 上五个优秀的 Python 工具,涵盖金融数据可视化、终端美化、国际化支持、图像增强和远程 Shell 环境配置。欢迎关注并参与项目。 ... [详细]
  • 本文深入解析了 Golang 中的异步 API,并通过具体的应用实例展示了其强大功能。文章不仅探讨了 Golang 和 Erlang 在并行处理方面的核心理念,还详细介绍了如何利用通道(channel)和 goroutine 实现高效的并发编程。例如,通过 `ch := make(chan int)` 创建通道,并使用 `go func(ch chan int)` 启动 goroutine 来处理异步任务。此外,文章还提供了多个实际案例,帮助读者更好地理解和应用这些概念。 ... [详细]
author-avatar
手机用户2502939965
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有