Spring 集成RabbitMq
一、基本配置
1、pom添加以下jar
2、spring配置文件springContext.xml添加以下配置
3、rabbitmq_producer.xml生产者配置如下(其中配置了exchange的三种类型:fanout,direct,topic)
3、rabbitmq_consumer.xml消费者配置如下:(其中定义了三种exchange类型对应队列的消费者 ,)
二、编写测试代码(在此只进行Direct类型 交换机测试代码的表写,其他类型仿照此示例即可)
1、定义消息生产者(DirectProducer)
package com.pinghengxing.direct;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/** * &#64;author ww*/public class DirectProducer {private static ApplicationContext ac;public static void sendMessage(String exchange,String routingKey,Object message){ac &#61; new ClassPathXmlApplicationContext("classpath:com/config/springContext.xml");RabbitTemplate rt &#61; ac.getBean(RabbitTemplate.class);for(int i&#61;0;i<10;i&#43;&#43;){rt.convertAndSend(exchange, routingKey, message&#43;""&#43;i);}}public static void main(String[] args) {DirectProducer.sendMessage("myDirectExchange","direct","路由模式");}}
2、定义消息消费者&#xff08;DirectReceiver1&#xff0c;DirectReceiver1 &#xff09;-多个消费者
消费者1
package com.pinghengxing.direct;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
&#64;Component("directReceiver")
public class DirectReceiver implements ChannelAwareMessageListener{&#64;Overridepublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("************************direct111********************************");System.out.println("路由模式direct111 接收信息&#xff1a;"&#43;new String(message.getBody()));System.out.println("********************************************************");//设置手工应答
// if(true){
// throw new Exception();
// }channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
消费者2
package com.pinghengxing.direct;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
&#64;Component("directReceiver2")
public class DirectReceiver2 implements ChannelAwareMessageListener{&#64;Overridepublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("************************direct222********************************");System.out.println("路由模式direct222 接收信息&#xff1a;"&#43;new String(message.getBody()));System.out.println("********************************************************");//设置手工应答
// if(true){
// throw new Exception();
// }channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
三、测试
1、进行测试&#xff0c;结果如下&#xff1a;&#xff08;两个消费者都可以从队列中取到数据&#xff0c;且数据不重复&#xff09;
四、confirm-callback监听&#xff08;用于监听exchange是否接收成功&#xff09;
1、在配置工厂连接的时候&#xff0c;设置publisher-confirms&#61;"true"
2、在定义rabbitmq模板时&#xff0c;指定confirm-callback的实现类
3、创建实现类ConfirmCallback&#xff0c;实现RabbitTemplate.ConfirmCallback接口
package com.pinghengxing.callback;import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** confirm-callback监听&#xff08;用于监听exchange是否接收成功&#xff09;* &#64;author ww**/
&#64;Component("confirmCallback")
public class ConfirmCallBack implements RabbitTemplate.ConfirmCallback{/*** CorrelationData 是在发送消息时传入回调方法的参数&#xff0c;可以用于区分消息对象。 CorrelationData对象中只有一个属性 String id。* 通过这个参数&#xff0c;我们可以区分当前是发送哪一条消息时的回调&#xff0c;并通过ack参数来进行失败重发功能* &#64;param correlationData 回调的相关数据.* &#64;param ack true for ack, false for nack* &#64;param cause 专门给NACK准备的一个可选的原因&#xff0c;其他情况为null。*/public void confirm(CorrelationData correlationData, boolean ack,String cause) {System.out.println("********************************************************");System.out.println("exChange确认" &#43; ack &#43; " " &#43; cause);System.out.println("********************************************************");}}
4、测试
五、returnCallback监听&#xff08;basicpublish推送消息到queue失败时回调&#xff09;
1、在配置工厂连接的时候&#xff0c;设置publisher-returns&#61;"true"
virtual-host&#61;"ww" channel-cache-size&#61;"25" cache-mode&#61;"CHANNEL"
publisher-confirms&#61;"true" publisher-returns&#61;"true" connection-timeout&#61;"200" />
2、在定义rabbitmq模板时&#xff0c;指定return-callback的实现类&#xff0c;并且设置mandatory&#61;"true"
3、创建实现类ReturnCallBack&#xff0c;实现RabbitTemplate.ReturnCallback接口
package com.pinghengxing.callback;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 推送消息到queue失败时回调* &#64;author ww**/
&#64;Component("returnCallback")
public class ReturnCallBack implements RabbitTemplate.ReturnCallback {&#64;Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("********************************************************");System.out.println("失败确认:"&#43;message&#43;" | "&#43;replyCode&#43;" | "&#43;replyText&#43;" | "&#43;exchange&#43;" | "&#43;routingKey);System.out.println("********************************************************");}}
4、测试&#xff08;更改routing_key的值为direct123&#xff0c;由于找不到对应的队列&#xff0c;报以下错误&#xff09;
六、json转换&#xff08;可以将map等自动转换成json格式&#xff09;
1、pom.xml添加以下maven依赖
2、定义消息转换器&#xff0c;转成json格式
3、在定义rabbitmq模板时&#xff0c;指定转换器message-converter&#61;"jsonMessageConverter"
4、测试&#xff0c;创建map&#xff0c;进行生产&#xff0c;消费者接收到的信息如下&#xff1a;为json格式
友情链接&#xff1a;
完整的项目配置下载地址如下&#xff1a;可下载参考
https://files.cnblogs.com/files/pinghengxing/spring_rabbitmq_test.zip