推荐阅读:我总结了72份面试题,累计3170页,斩获了30+互联网公司offer(含BATJM)2020首战告捷,这份Java面试神技Plus版,让我成功拿到了阿里、京东、字节跳动
推荐阅读:
我总结了72份面试题,累计3170页,斩获了30+互联网公司offer(含BATJM)
2020首战告捷,这份Java面试神技Plus版,让我成功拿到了阿里、京东、字节跳动等大厂offer
疫情之下,收到美团电话面试(成功拿下offer),附学习路线+刷题库
1. Exchange
先来放上RabbitMQ
架构图。
Exchange
是一个非常关键的组件,有了它才有了各种消息分发模式。
我先简单说说Exchange
有哪几种类型:
fanout:Fanout-Exchange
会将它接收到的消息发往所有与他绑定的Queue中。
direct:Direct-Exchange
会把它接收到的消息发往与它有绑定关系且Routingkey
完全匹配的Queue中(默认)。
topic:Topic-Exchange
与Direct-Exchange相似,不过Topic-Exchange不需要全匹配,可以部分匹配,它约定:Routingkey
为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
header:Header-Exchange
不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经不再使用,本文中也不会去讲,大家知道即可。
本文中我们主要讲前三种Exchange
方式,相信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。
Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。
2. Fanout-Exchange
先来看看Fanout-Exchange
,Fanout-Exchange
又称扇形交换机,这个交换机应该是最容易理解的。
Exchange
和Queue
建立一个绑定关系,Exchange
会分发给所有和它有绑定关系的Queue
中,绑定了十个Queue
就把消息复制十份进行分发。
这种绑定关系为了效率肯定都会维护一张表,从算法效率上来说一般是O(1),所以Fanout-Exchange
是这几个交换机中查找需要被分发队列最快的交换机。
下面是一段代码演示:
@Beanpublic Queue fanout1() {return new Queue("fanout1");}@Beanpublic Queue fanout2() {return new Queue("fanout2");}@Beanpublic FanoutExchange fanoutExchange() {// 三个构造参数:name durable autoDeletereturn new FanoutExchange("fanoutExchange", false, false);}@Beanpublic Binding binding1() {return BindingBuilder.bind(fanout1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(fanout2()).to(fanoutExchange());}
为了清晰明了,我新建了两个演示用的队列,然后建了一个FanoutExchange
,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。
紧接着编写一下生产者和消费者:
public void sendFanout() {Client client = new Client();// 应读者要求,以后代码打印的地方都会改成log方式,这是一种良好的编程习惯,用System.out.println一般是不推荐的。log.info("Message content : " + client);rabbitTemplate.convertAndSend("fanoutExchange",null,client);System.out.println("消息发送完毕。");}@Testpublic void sendFanoutMessage() {rabbitProduce.sendFanout();}
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {@RabbitListener(queues = "fanout1")public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}@RabbitListener(queues = "fanout2")public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}}
这两段代码都很好理解,不再赘述。
其中发送消息的代码有三个参数,第一个参数是Exchange
的名称,第二个参数是routingKey
的名称,这个参数在扇形交换机里面用不到,在其他两个交换机类型里面会用到。
代码的准备到此结束,我们可以运行发送方法之后run一下了~
项目启动后,我们可以先来观察一下队列与交换机的绑定关系有没有生效,我们在RabbitMQ控制台使用rabbitmqctl list_bindings
命令查看绑定关系。
关键部分我用红框标记了起来,这就代表着名叫fanoutExchange
的交换机绑定着两个队列,一个叫fanout1
,另一个叫fanout2
。
紧接着,我们来看控制台的打印情况:
可以看到,一条信息发送出去之后,两个队列都接收到了这条消息,紧接着由我们的两个消费者消费。
Tip: 如果你的演示应用启动之后没有消费信息,可以尝试重新运行一次生产者的方法发送消息。
3. Direct-Exchange
Direct-Exchange
是一种精准匹配的交换机,我们之前一直使用默认的交换机,其实默认的交换机就是Direct类型。
如果将Direct交换机都比作一所公寓的管理员,那么队列就是里面的住户。(绑定关系)
管理员每天都会收到各种各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还需要标明要送往哪一户(routingKey),不然消息无法投递。
以上图为例,准备一条消息发往名为SendService
的直接交换机中去,这个交换机主要是用来做发送服务,所以其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。
我们的消息除了指定ExchangeKey
还需要指定routingKey
,routingKey
对应着最终要发送的是哪个队列,我们的示例中的routingKey
是sms,这里这条消息就会交给SMS队列。
听了上面这段,可能大家对routingKey
还不是很理解,我们上段代码实践一下,大家应该就明白了。
准备工作:
@Beanpublic Queue directQueue1() {return new Queue("directQueue1");}@Beanpublic Queue directQueue2() {return new Queue("directQueue2");}@Beanpublic DirectExchange directExchange() {// 三个构造参数:name durable autoDeletereturn new DirectExchange("directExchange", false, false);}@Beanpublic Binding directBinding1() {return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");}@Beanpublic Binding directBinding2() {return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");}
新建两个队列,新建了一个直接交换机,并设置了绑定关系。
这里的示例代码和上面扇形交换机的代码很像,唯一可以说不同的就是绑定的时候多调用了一个with
将routingKey
设置了上去。
所以是交换机和队列建立绑定关系的时候设置的routingKey
,一个消息到达交换机之后,交换机通过消息上带来的routingKey
找到自己与队列建立绑定关系时设置的routingKey
,然后将消息分发到这个队列去。
生产者:
public void sendDirect() {Client client = new Client();log.info("Message content : " + client);rabbitTemplate.convertAndSend("directExchange","sms",client);System.out.println("消息发送完毕。");}
消费者:
@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {@RabbitListener(queues = "directQueue1")public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}@RabbitListener(queues = "directQueue2")public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}}
效果图如下:
只有一个消费者进行了消息,符合我们的预期。
4. Topic-Exchange
Topic-Exchange
是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*“和”#"通配符定义模糊bindingKey,然后按照routingKey
进行模糊匹配队列进行分发。
*
:能够模糊匹配一个单词。
#
:能够模糊匹配零个或多个单词。
因为加入了两个通配定义符,所以Topic交换机的routingKey
也有些变化,routingKey
可以使用.
将单词分开。
这里我们直接来用一个例子说明会更加的清晰:
准备工作:
// 主题交换机示例@Beanpublic Queue topicQueue1() {return new Queue("topicQueue1");}@Beanpublic Queue topicQueue2() {return new Queue("topicQueue2");}@Beanpublic TopicExchange topicExchange() {// 三个构造参数:name durable autoDeletereturn new TopicExchange("topicExchange", false, false);}@Beanpublic Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");}@Beanpublic Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");}
新建两个队列,新建了一个Topic交换机,并设置了绑定关系。
这里的示例代码我们主要看设置routingKey
,这里的routingKey
用上了通配符,且中间用.
隔开,这就代表topicQueue1
消费sms
开头的消息,topicQueue2
消费mail
开头的消息,具体不同往下看。
生产者:
public void sendTopic() {Client client = new Client();log.info("Message content : " + client);rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);System.out.println("消息发送完毕。");}
消费者:
@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {@RabbitListener(queues = "topicQueue1")public void onMessage1(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}@RabbitListener(queues = "topicQueue2")public void onMessage2(Message message, Channel channel) throws Exception {log.info("Message content : " + message);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);log.info("消息已确认");}}
这里我们的生产者发送的消息routingKey
是sms.liantong
,它就会被发到topicQueue1
队列中去,这里消息的routingKey
也需要用.
隔离开,用其他符号无法正确识别。
如果我们的routingKey
是sms.123.liantong
,那么它将无法找到对应的队列,因为topicQueue1
的模糊匹配用的通配符是*
而不是#
,只有#
是可以匹配多个单词的。
Topic-Exchange
和Direct-Exchange
很相似,我就不再赘述了,通配符*
和#
的区别也很简单,大家可以自己试一下。