热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

​08Rabbitmq从入门到精通

一消息队列介绍1.1介绍消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”1.2MQ解决什么问题MQ是


一 消息队列介绍


1.1 介绍

消息队列就是基础数据结构中的“先进先出”的一种数据机构。想一下,生活中买东西,需要排队,先排的人先买消费,就是典型的“先进先出”

image-20200907000210863


1.2 MQ解决什么问题

MQ是一直存在,不过随着微服务架构的流行,成了解决微服务之间问题的常用工具。


应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。

当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性

image-20200907000246573


流量消峰

举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。


消息分发

多个服务队数据感兴趣,只需要监听同一类消息即可处理。

image-20200907000338060

例如A产生数据,B对数据感兴趣。如果没有消息的队列A每次处理完需要调用一下B服务。过了一段时间C对数据也感性,A就需要改代码,调用B服务,调用C服务。只要有服务需要,A服务都要改动代码。很不方便。

image-20200907000411649

有了消息队列后,A只管发送一次消息,B对消息感兴趣,只需要监听消息。C感兴趣,C也去监听消息。A服务作为基础服务完全不需要有改动


异步消息

image-20200907000500012

有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅

image-20200907000523720

使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。

这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息


1.3 常见消息队列及比较

1

结论:

Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka


二 Rabbitmq安装

官网:https://www.rabbitmq.com/getstarted.html


2.1 服务端原生安装










1
2
3
4
5

# 安装配置epel源
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server



2.2 服务端Docker安装










1
2

docker pull rabbitmq:management
docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:managemen



2.3 客户端安装










1

pip3 install pika



2.4 设置用户和密码










1
2
3
4
5
6
7
8
9
10

rabbitmqctl add_user lqz 123
# 设置用户为administrator角色
rabbitmqctl set_user_tags lqz administrator
# 设置权限
rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
# 然后重启rabbiMQ服务
systemctl reatart rabbitmq-server

# 然后可以使用刚才的用户远程连接rabbitmq server了。



三 基于Queue实现生产者消费者模型










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

import Queue
import threading
message = Queue.Queue(10)
def producer(i):
while True:
message.put(i)
def consumer(i):
while True:
msg = message.get()
for i in range(12):
t = threading.Thread(target=producer, args=(i,))
t.start()
for i in range(10):
t = threading.Thread(target=consumer, args=(i,))
t.start()



四 基本使用(生产者消费者模型)

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。


生产者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

import pika
# 无密码
# cOnnection= pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
# 有密码
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz')
channel.basic_publish(exchange='',
routing_key='lqz', # 消息队列名称
body='hello world')
connection.close()



消费者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue='lqz',on_message_callback=callback,auto_ack=True)
channel.start_consuming()



五 消息安全之ack


生产者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

import pika
# 无密码
# cOnnection= pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
# 有密码
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz')
channel.basic_publish(exchange='',
routing_key='lqz', # 消息队列名称
body='hello world')
connection.close()



消费者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
# 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
# ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='lqz',on_message_callback=callback,auto_ack=False)
channel.start_consuming()



六 消息安全之durable持久化


生产者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

import pika
# 无密码
# cOnnection= pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
# 有密码
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz1',durable=True)
channel.basic_publish(exchange='',
routing_key='lqz1', # 消息队列名称
body='111',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent,消息也持久化
)
)
connection.close()



消费者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
channel.queue_declare(queue='lqz1')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
# 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
# ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='lqz1',on_message_callback=callback,auto_ack=False)
channel.start_consuming()



七 闲置消费

正常情况如果有多个消费者,是按照顺序第一个消息给第一个消费者,第二个消息给第二个消费者

但是可能第一个消息的消费者处理消息很耗时,一直没结束,就可以让第二个消费者优先获得闲置的消息


生产者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

import pika
# 无密码
# cOnnection= pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
# 有密码
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列),durable=True支持持久化,队列必须是新的才可以
channel.queue_declare(queue='lqz123',durable=True)
channel.basic_publish(exchange='',
routing_key='lqz123', # 消息队列名称
body='111',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent,消息也持久化
)
)
connection.close()



消费者










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 声明一个队列(创建一个队列)
# channel.queue_declare(queue='lqz123')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
# 通知服务端,消息取走了,如果auto_ack=False,不加下面,消息会一直存在
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='lqz123',on_message_callback=callback,auto_ack=False)
channel.start_consuming()



八 发布订阅


发布者










1
2
3
4
5
6
7
8
9
10
11
12

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m1',exchange_type='fanout')
channel.basic_publish(exchange='m1',
routing_key='',
body='lqz nb')
connection.close()



订阅者(启动几次订阅者会生成几个队列)










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='fanout' , 秘书工作方式将消息发送给所有的队列
channel.exchange_declare(exchange='m1',exchange_type='fanout')
# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m1',queue=queue_name)
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()



九 发布订阅高级之Routing(按关键字匹配)


发布者










1
2
3
4
5
6
7
8
9
10
11
12
13


import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m2',exchange_type='direct')
channel.basic_publish(exchange='m2',
routing_key='bnb', # 多个关键字,指定routing_key
body='lqz nb')
connection.close()



订阅者1










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')
# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='bnb')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()



订阅者2










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m2',exchange_type='direct')
# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='nb')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()



九 发布订阅高级之Topic(按关键字模糊匹配)


发布者










1
2
3
4
5
6
7
8
9
10
11
12
13

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3',exchange_type='topic')
channel.basic_publish(exchange='m3',
# routing_key='lqz.handsome', #都能收到
routing_key='lqz.handsome.xx', #只有lqz.#能收到
body='lqz nb')
connection.close()



订阅者1

*只能加一个单词

#可以加任意单词字符










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='direct' , 秘书工作方式将消息发送给不同的关键字
channel.exchange_declare(exchange='m3',exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.#')
def callback(ch, method, properties, body):
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()



订阅者2










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# exchange='m1',exchange(秘书)的名称
# exchange_type='topic' , 模糊匹配
channel.exchange_declare(exchange='m3',exchange_type='topic')
# 随机生成一个队列
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
# 让exchange和queque进行绑定.
channel.queue_bind(exchange='m3',queue=queue_name,routing_key='lqz.*')
def callback(ch, method, properties, body):
queue_name = result.method.queue # 发送的routing_key是什么
print("消费者接受到了任务: %r" % body)
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()



十 基于rabbitmq实现rpc


服务端










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

import pika
credentials = pika.PlainCredentials("admin","admin")
cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
channel = connection.channel()
# 起翰监听任务队列
channel.queue_declare(queue='rpc_queue')
def on_request(ch, method, props, body):
n = int(body)
respOnse= n + 100
# props.reply_to 要放结果的队列.
# props.correlation_id 任务
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume( queue='rpc_queue',on_message_callback=on_request,)
channel.start_consuming()



客户端










1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials("admin", "admin")
self.cOnnection= pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
self.channel = self.connection.channel()
# 随机生成一个消息队列(用于接收结果)
result = self.channel.queue_declare(queue='',exclusive=True)
self.callback_queue = result.method.queue
# 监听消息队列中是否有值返回,如果有值则执行 on_response 函数(一旦有结果,则执行on_response)
self.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response, auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.respOnse= body
def call(self, n):
self.respOnse= None
self.corr_id = str(uuid.uuid4())
# 客户端 给 服务端 发送一个任务: 任务id = corr_id / 任务内容 = '30' / 用于接收结果的队列名称
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', # 服务端接收任务的队列名称
properties=pika.BasicProperties(
reply_to = self.callback_queue, # 用于接收结果的队列
correlation_id = self.corr_id, # 任务ID
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return self.response
fibonacci_rpc = FibonacciRpcClient()
respOnse= fibonacci_rpc.call(50)
print('返回结果:',response)





推荐阅读
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • Windows下配置PHP5.6的方法及注意事项
    本文介绍了在Windows系统下配置PHP5.6的步骤及注意事项,包括下载PHP5.6、解压并配置IIS、添加模块映射、测试等。同时提供了一些常见问题的解决方法,如下载缺失的msvcr110.dll文件等。通过本文的指导,读者可以轻松地在Windows系统下配置PHP5.6,并解决一些常见的配置问题。 ... [详细]
  • phpcomposer 那个中文镜像是不是凉了 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • Jquery 跨域问题
    为什么80%的码农都做不了架构师?JQuery1.2后getJSON方法支持跨域读取json数据,原理是利用一个叫做jsonp的概念。当然 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • 工作经验谈之-让百度地图API调用数据库内容 及详解
    这段时间,所在项目中要用到的一个模块,就是让数据库中的内容在百度地图上展现出来,如经纬度。主要实现以下几点功能:1.读取数据库中的经纬度值在百度上标注出来。2.点击标注弹出对应信息。3 ... [详细]
  • [翻译]微服务设计模式5. 服务发现服务端服务发现
    服务之间需要互相调用,在单体架构中,服务之间的互相调用直接通过编程语言层面的方法调用就搞定了。在传统的分布式应用的部署中,服务地 ... [详细]
  • 服务网关与流量网关
    一、为什么需要服务网关1、什么是服务网关传统的单体架构中只需要开放一个服务给客户端调用,但是微服务架构中是将一个系统拆分成多个微服务,如果没有网关& ... [详细]
  • DockerDataCenter系列(四)-离线安装UCP和DTR,Go语言社区,Golang程序员人脉社 ... [详细]
  • k8s+springboot+Eureka如何平滑上下线服务
    k8s+springboot+Eureka如何平滑上下线服务目录服务平滑上下线-k8s版本目录“上篇介绍了springboot+Euraka服务平滑上下线的方式,有部分小伙伴反馈k ... [详细]
  • 由于同源策略的限制,满足同源的脚本才可以获取资源。虽然这样有助于保障网络安全,但另一方面也限制了资源的使用。那么如何实现跨域呢,以下是实现跨域的一些方法。 ... [详细]
  • BPM是什么软件?1、BPM是BusinessProcessManagement的简称,译为业务流程管理,它是一种以规范化的构造端到端的卓越业务流程为中心以持续的提高组织业务绩效为 ... [详细]
author-avatar
雅茹敬俐6999
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有