热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

6(自)、交换机之关键字模式

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(error),而不存储那些警告(warnning)或者


  • 上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(error),而不存储那些警告(warnning)或者信息(info)日志信息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routing_key队列中去.



  • 在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct。队列Q1绑定键为orange,队列Q2绑定键有两个:一个绑定键为blank,另一个绑定键为green

  • 在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1。绑定键为blank和green的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。


2、多重绑定



  • 当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是她表现的就和fanout有点类似了。就跟广播差不多,如上图所示。


3、实战


  • 生产者代码

import pika
credentials = pika.PlainCredentials(username='root', password='root')
cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
host='47.94.132.145',
virtual_host='/',
port=5672,
credentials=credentials
))
channel = connection.channel()
# 声明一个名为logs2类型为direct的交换机
channel.exchange_declare(exchange='logs2',
exchange_type='direct') # direct:关键字模式参数
message = b"info: Hello World"
# 向logs2交换机插入数据,routing_key设置关键字,谁绑定了关键字谁就可以收到
channel.basic_publish(
exchange='logs2',
routing_key='info',
body=message
)
print("[x] send %r" %message)
connection.close()


  • 消费着代码1:

import pika
credentials = pika.PlainCredentials(username='root', password='root')
cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
host='47.94.132.145',
virtual_host='/',
port=5672,
credentials=credentials
))
channel = connection.channel()
# 声明一个名为logs2类型为direct的交换机
channel.exchange_declare(
exchange='logs2',
exchange_type='direct'
)
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
# 将队列绑定到交换机上, routing_key绑定关键字
# 绑定3个关键字;注意一个queue_bind只能绑定一个关键字
channel.queue_bind(
exchange='logs2',
queue=queue_name,
routing_key='error'
)
channel.queue_bind(
exchange='logs2',
queue=queue_name,
routing_key='info'
)
channel.queue_bind(
exchange='logs2',
queue=queue_name,
routing_key='warning'
)
# 确定回调函数
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 确定监听队列参数
channel.basic_consume(
queue=queue_name, # 队列
auto_ack=False, # 手动应答
on_message_callback=callback # 回调函数
)
print("[*] waiting for message. to exit press CTRL+C")
# 正式监听
channel.start_consuming()


  • 消费者代码2:

import pika
credentials = pika.PlainCredentials(username='root', password='root')
cOnnection= pika.BlockingConnection(pika.ConnectionParameters(
host='47.94.132.145',
virtual_host='/',
port=5672,
credentials=credentials
))
channel = connection.channel()
# 声明一个名为logs2类型为direct的交换机
channel.exchange_declare(
exchange='logs2',
exchange_type='direct'
)
# 创建队列
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
# 将队列绑定到交换机上, routing_key绑定关键字
# 绑定3个关键字;注意一个queue_bind只能绑定一个关键字
channel.queue_bind(
exchange='logs2',
queue=queue_name,
routing_key='error'
)
# 确定回调函数
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 确定监听队列参数
channel.basic_consume(
queue=queue_name, # 队列
auto_ack=False, # 手动应答
on_message_callback=callback # 回调函数
)
print("[*] waiting for message. to exit press CTRL+C")
# 正式监听
channel.start_consuming()


推荐阅读
author-avatar
zc43pml
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有