MessageListenerAdapter即消息监听适配器
代码实现
package com.cx.temp.common.rabbitmq.spring;
import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate;
import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.util.UUID;
/**
*
*/
@Configuration
@ComponentScan({"com.cx.temp.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/test001");
return connectionFactory;
}
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 针对消费者配置
* 1.设置交换机类型
* 2.将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange: 通过添加属性key-value匹配
* DirectExchange: 按照routingkey分发到指定队列
* TopicExchange: 多关键字匹配
* @return
*/
@Bean
public TopicExchange exchange001(){
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001(){
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
@Bean
public TopicExchange exchange002(){
return new TopicExchange("topic002", true, false);
}
@Bean
public Queue queue002() {
return new Queue("queue002", true); //队列持久
}
@Bean
public Queue queue003() {
return new Queue("queue003", true); //队列持久
}
@Bean
public Binding binding002(){
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}
@Bean
public Binding binding003(){
return BindingBuilder.bind(queue003()).to(exchange002()).with("spring.*");
}
@Bean
public Queue queue_image() {
return new Queue("image_queue", true); //队列持久
}
@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true); //队列持久
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false); //不进行重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// container.setMessageListener(new ChannelAwareMessageListener() {
// @Override
// public void onMessage(Message message, Channel channel) throws Exception {
// String msg = new String(message.getBody());
// System.err.println("-----------消费者:" + msg);
// }
// });
//第一种
// MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// container.setMessageListener(adapter);
//第二种
// MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage
//第三种
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
}
字节转换器
package com.cx.temp.common.rabbitmq.spring.convert;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
自定义输出设置
package com.cx.temp.common.rabbitmq.spring.adapter;
/**
*/
public class MessageDelegate {
//方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】
//表示他是通过反射查找handleMessage这个方法名进行处理的
//第一种 旧版的RabbitMQ默认是字节数据
// public void handleMessage(byte[] messageBody) {
// System.out.println("默认方法,消息内容:" + new String(messageBody));
// }
//第一种 现演示的版本默认走字符串
// public void handleMessage(String messageBody) {
// System.out.println("默认方法,消息内容:" + messageBody);
// }
//第二种
public void consumeMessage(byte[] messageBody) {
System.out.println("字节数组方法,消息内容:" + new String(messageBody));
}
//
// public void consumeMessage(String messageBody) {
// System.out.println("字符串方法,消息内容:" + messageBody);
// }
//
// public void method1(String messageBody) {
//
// }
}
测试类
package com.cx.temp.rabbitmq;
import com.cx.temp.admin.AdminApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = AdminApplication.class)
public class RabbitMQTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception {
//第一种声明与绑定方式
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
//第二种 支持链式声明与绑定
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.topic.queue", false))
.to(new TopicExchange("test.topic", false, false))
.with("user.#"));
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
//消息发送之后在对这个message进行设置
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("----------添加额外的设置-----------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc",message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
}
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc",message);
}
}
执行测试类
testSendMessage4Text方法
package com.cx.temp.common.rabbitmq.spring; package com.cx.temp.common.rabbitmq.spring.convert; package com.cx.temp.common.rabbitmq.spring.adapter; package com.cx.temp.rabbitmq; 启动Appliciton后执行 testSendMessage4Text控制台输出
import com.cx.temp.common.rabbitmq.spring.adapter.MessageDelegate;
import com.cx.temp.common.rabbitmq.spring.convert.TextMessageConverter;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
*
*/
@Configuration
@ComponentScan({"com.cx.temp.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/test001");
return connectionFactory;
}
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
/**
* 针对消费者配置
* 1.设置交换机类型
* 2.将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange: 通过添加属性key-value匹配
* DirectExchange: 按照routingkey分发到指定队列
* TopicExchange: 多关键字匹配
* @return
*/
@Bean
public TopicExchange exchange001(){
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001(){
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
@Bean
public TopicExchange exchange002(){
return new TopicExchange("topic002", true, false);
}
@Bean
public Queue queue002() {
return new Queue("queue002", true); //队列持久
}
@Bean
public Queue queue003() {
return new Queue("queue003", true); //队列持久
}
@Bean
public Binding binding002(){
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}
@Bean
public Binding binding003(){
return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
}
@Bean
public Queue queue_image() {
return new Queue("image_queue", true); //队列持久
}
@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true); //队列持久
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false); //不进行重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //自动签收机制
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// container.setMessageListener(new ChannelAwareMessageListener() {
// @Override
// public void onMessage(Message message, Channel channel) throws Exception {
// String msg = new String(message.getBody());
// System.err.println("-----------消费者:" + msg);
// }
// });
//第一种
// MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// container.setMessageListener(adapter);
//第二种
// MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
// adapter.setDefaultListenerMethod("consumeMessage"); //也可以通过这个方法设置默认查找适配器对象的方法,默认的方法名是handleMessage
//第三种
/** 1.适配器方式,默认是有自己的方法名称的:handleMessage
//可以自己指定一个方法的名字:consumeMessage
//也可以添加一个转换器:从字节数组转换String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
*/
/**
* 2.适配器方式:我们的队列名称和方法名称也可以也进行一一匹配
*/
Map
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
}
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if(null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
/**
*/
public class MessageDelegate {
//方法名是固定的,通过查看MessageListenerAdapter的源码,里面有句【public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";】
//表示他是通过反射查找handleMessage这个方法名进行处理的
//第一种 旧版的RabbitMQ默认是字节数据
// public void handleMessage(byte[] messageBody) {
// System.out.println("默认方法,消息内容:" + new String(messageBody));
// }
//第一种 现演示的版本默认走字符串
// public void handleMessage(String messageBody) {
// System.out.println("默认方法,消息内容:" + messageBody);
// }
//第二种 旧版
// public void consumeMessage(byte[] messageBody) {
// System.out.println("字节数组方法,消息内容:" + new String(messageBody));
// }
//第二种 新版
public void consumeMessage(String messageBody) {
System.out.println("字符串方法,消息内容:" + messageBody);
}
public void method1(String messageBody) {
System.out.println("method1收到消息内容:" + messageBody);
}
public void method2(String messageBody) {
System.out.println("method2收到消息内容:" + messageBody);
}
}
import com.cx.temp.admin.AdminApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.HashMap;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = AdminApplication.class)
public class RabbitMQTest {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() throws Exception {
//第一种声明与绑定方式
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
//第二种 支持链式声明与绑定
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.topic.queue", false))
.to(new TopicExchange("test.topic", false, false))
.with("user.#"));
rabbitAdmin.declareBinding(BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
//消息发送之后在对这个message进行设置
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("----------添加额外的设置-----------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc",message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.amqp", "hello object message send!");
}
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc",message);
rabbitTemplate.send("topic002", "rabbit.abc",message);
}
}