热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Python云计算框架Openstack源码分析之RabbitMQ(一)

在Openstack中,各个组件内部使用消息队列进行通信,其中,RabbitMQ是常用的一种开源消息代理软件。这里作一个简要介绍。RabbitMQ介绍RabbitMQ实现了高级消息


在Openstack中,各个组件内部使用消息队列进行通信,其中,RabbitMQ是常用的一种开源消息代理软件。这里作一个简要介绍。


RabbitMQ介绍


RabbitMQ实现了高级消息队列协议(AMQP)。


AMQP


AMQP是一个定义了应用之间消息传送协议的开放标准. AMQP旨在解决在两个应用之间传送消息存在的以下问题:



  • 网络是不可靠的 -> 消息需要保存后再转发并有出错处理机制


  • 与本地调用相比,网络速度慢 -> 得支持异步调用


  • 应用之间是不同的(比如实现语言不同, 操作系统不同),且应用会经常变化 -> 得与应用无关



AMQP 使用异步的、应用对应用的、二进制数据通信来解决这些问题。


基本组件


RabbitMQ 是 AMQP 的一种实现, 其基本组件包括: 
- Producer:Message的生产者, 负责产生消息并把消息发到Exchange。 
- Message:RabbitMQ 转发的二进制对象,包括Headers、Properties和 Data。其中Data不是必要的。 
- Exchange:负责接收Producer的Message, 并把它转发到合适的Queue. 
- Binding:标识Queue和Exchange之间的关系。Exchange根据Message的Properties和Binding的Properties来确定将消息转发到哪些Queue。一个重要的Properties是binding_key。 
- Queue:缓存Exchange发来的消息,并将消息主动发给Consumer或者由Consumer主动来获取消息。 
- Consumer:使用Queue从Exchange中获取Message。


Message和Exchange


Message


消息结构包括:Headers、Properties和data。


其中,Properties包括几个重要的属性:



  • routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。


  • delivery_mode:将其值设置为2将使用消息持久化。持久化的消息会被保存到磁盘。


  • reply_to:客户端回调队列的名字。


  • correlation_id


  • content_type



Exchange


Exchange有几个重要的属性:



  • name:exchange名字。空字符串名字的exchange为默认的exchange。


  • type:决定了exchange的消息转发方式, 包括direct、fanout、topic和headers。


  • durable:值为True的exchange会在rabbitmq重启会自动创建。Openstack使用的exchange该值都为False。


  • auto_delete:值为True的exchange当消费者的连接都关闭后会被自动删除。 Openstack使用的exchange该值都为False。


  • exclusive:设置为True的话,该exchange只允许被创建的connection使用,且在该connection关闭后它会自动删除。



RabbitMQ消息路由机制


决定Exchange消息路由的属性有:



  • Exchange的type


  • Message的routing_key


  • Binding的binding_key



具体规则如下:



RabbitMQ有多种版本的客户端,本文使用Pika,安装如下。


$ pip install pika

RabbitMQ扩展插件


Management Plugin


提供GUI来管理RabbitMQ。官方地址: https://www.rabbitmq.com/management.html


RabbitMQ用户密码可以在 /etc/rabbitmq/abbitmq.config 查看:


打开图形界面:


# rabbitmq-plugins enable rabbitmq_management

然后通过端口15672就可以访问web管理界面。


Hello World!


首先,我们尝试从Publisher发送一条消息“Hello World”到Consumer。


Publisher




发送消息主要包括以下几个操作: 

1. 与RabbitMQ建立连接。 

2. 声明要使用的queue。 

3. RabbitMQ中,消息不会直接发到queue,而是发到exchange,由exchange转发到相应的queue。下面的例子中使用了默认的exchange,它会进行定向转发,也就是将message发到routing_key所指定的queue中。 

4. 最后,为了保证网络缓存flushed(也就是消息被发出去了),手动关闭连接。



# filename: send.py
#!/usr/bin/env python
import pika


cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
   host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

执行完以上指令,通过命令行你可以看到queue已经被建立且包含了我们发出的信息:


Consumer




接收消息主要包括以下几个操作: 

1. 与RabbitMQ建立连接。 

2. 声明监听的queue。 

3. 建立consumer。comsumer需要一个回调函数来负责处理接收到的消息。 

4. start_consuming(),其本质是一个while循环,不断取出队列中的消息。



# filename: receive.py
#!/usr/bin/env python
import pika


cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
   host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
channel.basic_consume(callback,
                     queue='hello',
                     no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Work Queues


在上一个例子中,consumer只是简单地打印信息,在这个例子中,我们将consumer改为一个worker,它将根据消息完成一些任务。其本质和print("hello world")并没有什么区别,但是为了保证任务能正确完成,需要一些额外的操作,使workder更健壮。


consumer挂了怎么办


改写以上代码。


publisher发送的消息可以从命令参数中读取。参数包括若干个点,点的数量决定了consumer需要花多少秒来完成任务:


# send.py
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                     routing_key='hello',
                     body=message)
print(" [x] Sent %r" % message)

consumer处理消息的回调函数,将根据message进行sleep():


# receive.py
def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   time.sleep(body.count(b'.'))
   print(" [x] Done")
   ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
                     queue='hello')

当consumer处理完任务,会回复ack。如果没有ack,这个消息将在queue中处于unacknowledged状态。如果这个consumer处理过程中挂了,这个message将被分发给其它consumer。这个机制 保证了所有的消息都可以被处理 。(一种很坏的情况是,consumer处理了message但没有返回ack,但这个consumer又一直不挂,那么这些被它处理的message就会一直以unack的状态保存在queue中。)


rabbitmq挂了怎么办


为了保证rabbitmq挂了都不会使message消失,我们必须保证: 
- queue持久化 
- message持久化


由于rabbitmq不允许两个队列重名,下面的代码改用task_queue作为队列名。修改代码如下:


# filename: send.py
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='',
                     routing_key="task_queue",
                     body=message,
                     properties=pika.BasicProperties(
                         delivery_mode=2, )
                     )

# filename: receive.py
channel.queue_declare(queue='task_queue', durable=True)

任务平均分配


目前的情况是,任务将被平均分配给每一个consumer。比如,如果有两个consumer,那么任务将你一个我一个来分配,而不会根据任务的复杂度来分配。一种极端情况是,奇数任务复杂度很高,偶数任务复杂度很低,那么就会导致一个consumer一直很忙,而另一个一直很闲。为此,进一步修改代码:


# filename: receive.py
channel.basic_qos(prefetch_count=1)

这个参数限制了consumer手上的message数量。如果consumer手上已经有一个unack的message,那么后续的message就不会发给它了。




完整代码


# filename: send.py
# !/usr/bin/env python
import pika
import sys


cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
   host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                     routing_key="task_queue",
                     body=message,
                     properties=pika.BasicProperties(
                         delivery_mode=2, )
                     )
print(" [x] Sent %r" % message)
connection.close()

# filename: receive.py
#!/usr/bin/env python
import pika
import time


cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
   host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
   print(" [x] Received %r" % body)
   time.sleep(body.count(b'.'))
   print(" [x] Done")
   ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
                     queue='task_queue')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

再回顾一下上面的代码。首先明确,这种情况使用的是默认exchange。对于producer,它将消息交给exchange,然后exchange通过routing key来判断要将消息交到哪个queue。实际上相当于将消息直接发送到queue中。而consumer直接指定queue的名字,也就是它直接绑定到这个queue。这个过程中exchange其实没什么存在感。


ARTICLES


近期热门文章



生成器


关于生成器的那些事儿


爬虫代理


如何构建爬虫代理服务


地理编码


怎样用Python实现地理编码


nginx日志


使用Python分析nginx日志


淘宝女郎


一个批量抓取淘女郎写真图片的爬虫


IP代理池


突破反爬虫的利器——开源IP代理池


布隆去重


基于Redis的Bloomfilter去重(附代码)


内建函数


Python中内建函数的用法


QQ空间爬虫


QQ空间爬虫最新分享,一天 400 万条数据


对象


Python教你找到最心仪对象


线性回归


Python机器学习算法入门之梯度下降法实现线性回归


匿名代理池


进击的爬虫:用Python搭建匿名代理池


发射导弹


Python发射导弹的正确姿势


在公众号底部回复上述关键词可直接打开相应文章



Python 中 文 开 发 者 的 精 神 家 园




Python中文社区


www.python-cn.com





致力于成为


国内最好的Python社区





QQ群:152745094


专栏作者申请邮箱


sinoandywong@gmail.com


— Life is short,we use Python —






推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 在《PHP应用性能优化实战指南:从理论到实践的全面解析》一文中,作者分享了一次实际的PHP应用优化经验。文章回顾了先前进行的一次优化项目,指出即使系统运行时间较长后出现的各种问题和性能瓶颈,通过采用一些通用的优化策略仍然能够有效解决。文中不仅详细阐述了优化的具体步骤和方法,还结合实例分析了优化前后的性能对比,为读者提供了宝贵的参考和借鉴。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • MySQL性能优化与调参指南【数据库管理】
    本文详细探讨了MySQL数据库的性能优化与参数调整技巧,旨在帮助数据库管理员和开发人员提升系统的运行效率。内容涵盖索引优化、查询优化、配置参数调整等方面,结合实际案例进行深入分析,提供实用的操作建议。此外,还介绍了常见的性能监控工具和方法,助力读者全面掌握MySQL性能优化的核心技能。 ... [详细]
  • 深入解析:RKHunter与AIDE在入侵检测中的应用与优势
    本文深入探讨了RKHunter与AIDE在入侵检测领域的应用及其独特优势。通过对比分析,详细阐述了这两种工具在系统完整性验证、恶意软件检测及日志文件监控等方面的技术特点和实际效果,为安全管理人员提供了有效的防护策略建议。 ... [详细]
  • 本文详细探讨了Java集合框架的使用方法及其性能特点。首先,通过关系图展示了集合接口之间的层次结构,如`Collection`接口作为对象集合的基础,其下分为`List`、`Set`和`Queue`等子接口。其中,`List`接口支持按插入顺序保存元素且允许重复,而`Set`接口则确保元素唯一性。此外,文章还深入分析了不同集合类在实际应用中的性能表现,为开发者选择合适的集合类型提供了参考依据。 ... [详细]
  • BZOJ4240 Gym 102082G:贪心算法与树状数组的综合应用
    BZOJ4240 Gym 102082G 题目 "有趣的家庭菜园" 结合了贪心算法和树状数组的应用,旨在解决在有限时间和内存限制下高效处理复杂数据结构的问题。通过巧妙地运用贪心策略和树状数组,该题目能够在 10 秒的时间限制和 256MB 的内存限制内,有效处理大量输入数据,实现高性能的解决方案。提交次数为 756 次,成功解决次数为 349 次,体现了该题目的挑战性和实际应用价值。 ... [详细]
  • 如何将PHP文件上传至服务器及正确配置服务器地址 ... [详细]
  • 在处理高并发场景时,确保业务逻辑的正确性是关键。本文深入探讨了Java原生锁机制的多种细粒度实现方法,旨在通过使用数据的时间戳、ID等关键字段进行锁定,以最小化对系统性能的影响。文章详细分析了不同锁策略的优缺点,并提供了实际应用中的最佳实践,帮助开发者在高并发环境下高效地实现锁机制。 ... [详细]
  • 题目描述:小K不幸被LL邪教洗脑,洗脑程度之深使他决定彻底脱离这个邪教。在最终离开前,他计划再进行一次亚瑟王游戏。作为最后一战,他希望这次游戏能够尽善尽美。众所周知,亚瑟王游戏的结果很大程度上取决于运气,但通过合理的策略和算法优化,可以提高获胜的概率。本文将详细解析洛谷P3239 [HNOI2015] 亚瑟王问题,并提供具体的算法实现方法,帮助读者更好地理解和应用相关技术。 ... [详细]
  • Django框架下的对象关系映射(ORM)详解
    在Django框架中,对象关系映射(ORM)技术是解决面向对象编程与关系型数据库之间不兼容问题的关键工具。通过将数据库表结构映射到Python类,ORM使得开发者能够以面向对象的方式操作数据库,从而简化了数据访问和管理的复杂性。这种技术不仅提高了代码的可读性和可维护性,还增强了应用程序的灵活性和扩展性。 ... [详细]
  • 优化后的标题:PHP分布式高并发秒杀系统设计与实现
    PHPSeckill是一个基于PHP、Lua和Redis构建的高效分布式秒杀系统。该项目利用php_apcu扩展优化性能,实现了高并发环境下的秒杀功能。系统设计充分考虑了分布式架构的可扩展性和稳定性,适用于大规模用户同时访问的场景。项目代码已开源,可在Gitee平台上获取。 ... [详细]
  • 题目旨在解决树上的路径最优化问题,具体为在给定的树中寻找一条长度介于L到R之间的路径,使该路径上的边权平均值最大化。通过点分治策略,可以有效地处理此类问题。若无长度限制,可采用01分数规划模型,将所有边权减去一个常数m,从而简化计算过程。此外,利用单调队列优化动态规划过程,进一步提高算法效率。 ... [详细]
  • 欢迎来到Netgen新时代:探索网络生成技术的无限可能
    欢迎进入Netgen的新时代:探索网络生成技术的无限潜力。本文将详细介绍如何编译下载的Netgen源代码,生成Netgen程序,并提供开发所需的库nglib。此外,还将探讨Netgen在现代网络设计与仿真中的应用前景,以及其在提高网络性能和可靠性方面的关键作用。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
author-avatar
真实的阿凯123
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有