背景:
如果你想在springboot中使用activemq,你可以有两个选择,第一个就是使用内置的activemq,还可以使用外部的activemq,我在这里主要使用了外部的activemq,我是在windows下部署的activemq,不要忘记先启动它。
第一步:添加maven依赖
org.springframework.bootspring-boot-starter-activemq
org.apache.activemqactivemq-pool
第二步:配置applicaiton.properties配置文件
#配置activemq的连接地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#配置用户名
spring.activemq.user=admin
#配置密码
spring.activemq.password=admin
#true表示使用连接池,false表示不使用连接池;当为false时,每发送一条数据创建一个连接
spring.activemq.pool.enabled=true
#连接池最大连接数
spring.activemq.pool.max-connections=10
#空闲的连接过期时间,默认为30s
spring.activemq.pool.idle-timeout=30000
#强制的连接过期时间,与idleTimeout的区别在于:idleTimeout是在连接空闲一段时间失效,而expiryTimeout不管当前连接的情况,只要达到指定时间就失效。默认为0,never
spring.activemq.pool.expiry-timeout=0
第三步:编写相应代码
1、我把生产者和消费者都集成在了一个项目中,项目结构图如下所示:
2、定义activemq的主题类型和名称,我们这里定义了主题模式(名称为activemq.topic)和队列模式(名称为activemq.queue)
import javax.jms.Queue;
import javax.jms.Topic;import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ActivemqConfig {// 定义队列模式@Beanpublic Queue queue1() {return new ActiveMQQueue("activemq.queue");}//定义主题模式@Beanpublic Topic topic() {return new ActiveMQTopic("activemq.topic");}}
3、定义一个消息的提供者MessageProvider,使用一个定时任务来持续的发送数据,这里提供两种消息的发送方式,使用哪一种方式都可以,它们之间的差异性我们后续会分析,方式一(强烈不推荐):
@Configuration
public class MessageProvider1 {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Scheduled(fixedRate=5000,initialDelay=3000)public void send() {// 向队列发送消息jmsMessagingTemplate.convertAndSend("activemq.queue", Math.random());// 向主题发送消息jmsMessagingTemplate.convertAndSend("activemq.topic", Math.random());}
}
下面是方式二,此时可以发现系统并没有指定queue的名称和topic的名称。
@Configuration
public class MessageProvider2 {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue;@Autowiredprivate Topic topic;@Scheduled(fixedRate=5000,initialDelay=3000)public void send() {// 向队列发送消息jmsMessagingTemplate.convertAndSend(queue, Math.random()); // 向主题发送消息jmsMessagingTemplate.convertAndSend(topic, Math.random());}
}
4、定义一个消息的消费者MessageCustomer
@Component
public class MessageCustomer {@JmsListener(destination = "activemq.queue")public void q1(String text) {System.out.println("消费者consumer0消费的消息为:"+text);}@JmsListener(destination = "activemq.queue")public void q2(String text) {System.out.println("消费者consumer1消费的消息为:"+text);}@JmsListener(destination = "activemq.topic")public void t1(String text) {System.out.println("消费者consumer2消费的消息为:"+text);}@JmsListener(destination = "activemq.topic")public void t2(String text) {System.out.println("消费者consumer3消费的消息为:"+text);}
}
5、定义启动类Application
@SpringBootApplication
@EnableScheduling
// 启动消息队列
@EnableJms
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
第四步:使用MessageProvider2进行测试
1、我们首先采用MessageProvider2的方式进行测试,启动程序,输出结果如下所示,我们发现输出结果并不是我们想要的那种结果,如果是我们想要的结果输出结果还得打印消费者consumer2和消费者consumer3消费的消息。
2、结果分析:原来springboot默认采用的是p2p模式(队列)进行消息的监听。所以无法监听到topic模式的消息,那么如何对topic模式的消息进行监听呢?只需要在applicaiton.properties里面加上这样一个属性:
spring.jms.pub-sub-domain=true
3、重启项目,输出结果如下图所示,此时topic模式的消息正常消费,但是队列模式的消息又无法正常消费,原因是springboot默认只支持一种类型消息的消费,那么如何支持两种呢?我们后面会说。
第五步:使用MessageProvider1进行测试
1、我们首先注释掉刚才加在application.properties里面加的spring.jms.pub-sub-domain=true,启动项目,输出结果如下所示:
2、我们发现输出结果并不是我们想要的,springboot默认支持队列模式的消息监听,但是当我门发送消息时,只是指定了消息接收方的名字,并没有指定消息接收方的类型,所以springboot默认将这两个消息提供者都看成了队列类型。所以才会出现现在的结果。
3、在application.properties里面加上spring.jms.pub-sub-domain=true,重启项目,输出结果如下所示:
4、我们发现只是将队列类型改成了主题类型,从根本上还是没有解决我们的问题,我们想让springboot既支持队列模式又支持主题模式。
第六步:springboot同时支持topic和queue
1、注释掉在application.properties里面加的spring.jms.pub-sub-domain=true
2、在ActivemqConfig这个类里面重新定义两个bean
@Configuration
public class ActivemqConfig {// 定义队列模式@Beanpublic Queue queue() {return new ActiveMQQueue("activemq.queue");}//定义主题模式@Beanpublic Topic topic() {return new ActiveMQTopic("activemq.topic");}@Beanpublic JmsListenerContainerFactory> topicListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setPubSubDomain(true);factory.setConnectionFactory(connectionFactory);return factory;}@Beanpublic JmsListenerContainerFactory> queueListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);return factory;}
}
3、在MessageCustomer的方法注解JmsListener上添加属性containerFactory属性
@Component
public class MessageCustomer {@JmsListener(destination = "activemq.queue",containerFactory = "queueListenerContainerFactory")public void q1(String text) {System.out.println("消费者consumer0消费的消息为:"+text); }@JmsListener(destination = "activemq.queue",containerFactory = "queueListenerContainerFactory") public void q2(String text) {System.out.println("消费者consumer1消费的消息为:"+text); }@JmsListener(destination = "activemq.topic",containerFactory = "topicListenerContainerFactory")public void t1(String text) {System.out.println("消费者consumer2消费的消息为:"+text);}@JmsListener(destination = "activemq.topic",containerFactory = "topicListenerContainerFactory")public void t2(String text) {System.out.println("消费者consumer3消费的消息为:"+text);}
}
4、重启项目,输出结果如下,我们发现现在的结果就是同时支持了queue模式和topic模式。
第七步:遇坑分析
1、一直以为在activemq的管理界面中点击Queues里面显示的创建了多少个queue(队列)模式的主题,这个理解是错误的。其实这里面显示的信息是“MessageCustomer里面监听(destination)的(所有监听的主题和类型都会出现在这里,不论它是队列模式还是主题模式)+你自己创建的队列类型的主题”。
我们在destination中监听了activemq.queue和activemq.topic这两个,但我们创建了activemq.queue2队列和activemq.topic2主题,并向它们分别发送了消息,注意:如果不向activemq.queue2和activemq.topic2发送消息,仅仅是定义了一个bean,那么activemq就发现不了它们两个。此时的Queues的组成就是监听的activemq.queue和activemq.topic+创建的activemq.queue2。
2、如果我们想看我们定义的topic,进入到Topics下查看即可。