如果不想用通用的rabbitmq配置 那么就可以使用SimpleMessageListener自定义消费者
在此队列中 配置为此方法中的配置 而不是走rabbitmq的通用配置
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.context.annotation.Configuration;@Configuration
public class SimpleMessageListener {@AutowiredBusinessConfig businessConfig;@Beanpublic SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 添加队列 可以添加多个队列//container.setQueues(businessConfig.Queue());container.setExposeListenerChannel(true);// 设置当前的消费者数量// container.setMaxConcurrentConsumers(5);container.setConcurrentConsumers(1);// 设置确认模式手工确认container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {byte[] body = message.getBody();System.out.println("1 receive msg : " + JSONObject.parseObject(new String(body)));// 不读取消息并且将当前消息抛弃掉,消息队列中删除当前消息// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);// 不读取消息,消息队列中保留当前消息未被查看状态// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);// 确认消息成功消费,删除消息队列中的消息// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 确认消息成功消费,删除消息队列中的消息,他跟上面貌似一样channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);}});return container;}@Beanpublic SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);// 监听多个queue//container.setQueues(businessConfig.Queue());// 设置当前消费者数量container.setConcurrentConsumers(1);// 设置最大的消费者数量container.setMaxConcurrentConsumers(5);// 设置不要重回队列container.setDefaultRequeueRejected(false);// 设置自动签收container.setAcknowledgeMode(AcknowledgeMode.AUTO);// 设置消费端tag策略container.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return queue + "_" + System.currentTimeMillis();}});// 设置监听container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {// 消息处理String msg = new String(message.getBody(), "UTF-8");System.out.println("---消费者---队列名:" + message.getMessageProperties().getConsumerQueue() + ",消息:" + msg+ ",deliveryTag:" + message.getMessageProperties().getDeliveryTag());channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);;}});return container;}
}