本人通过学习rabbitmq,自己编写的demo,希望大家提出宝贵的建议
模板项目整体结构:
1、配置文件:application.yml
server:
port: 8089
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
#支持发布确认
publisher-confirms: true
#支持发布返回
publisher-returns: true
listener:
simple:
#采用手动应答
acknowledge-mode: manual
#当前监听容器数量
concurrency: 1
max-concurrency: 1
#是否支持重试
retry:
enabled: true
2、RabbitMQConfig.java 文件 核心
package com.rabbitmqtemplate.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import com.rabbitmqtemplate.callback.MsgSendConfirmCallBack;
import com.rabbitmqtemplate.callback.MsgSendReturnCallback;
import com.rabbitmqtemplate.constants.RabbitMqExchange;
import com.rabbitmqtemplate.constants.RabbitMqQueue;
import com.rabbitmqtemplate.constants.RabbitMqRoutingKey;
/**
* rabbitmq核心配置
* @author lishilei
*
*/
@Configuration
public class RabbitMQConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
public Boolean publisherConfirms;
/**
* 创建工厂,自动创建的ConnectionFactory不能完成事件的回调
*
* @return
*/
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory cOnnectionFactory= new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(publisherConfirms);//手动应答模式
return connectionFactory;
}
/**
* RabbitTemplate 发送消息,必须是prototype类型,
* 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
*
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
/**若使用confirm-callback或return-callback,
* 必须要配置publisherConfirms或publisherReturns为true
* 每个rabbitTemplate只能有一个confirm-callback和return-callback
* */
template.setConfirmCallback(msgSendConfirmCallBack());
template.setReturnCallback(msgSendReturnCallback());
/**
* 使用return-callback时必须设置mandatory为true,
* 或者在配置中设置mandatory-expression的值为true,
* 可针对每次请求的消息去确定’mandatory’的boolean值,
* 只能在提供’return -callback’时使用,与mandatory互斥
* */
template.setMandatory(true); //保证消息的可靠投递
return template;
}
/**
* 消息确认机制
* Confirms给客户端一种轻量级的方式,能够跟踪哪些消息被broker处理,
* 哪些可能因为broker宕掉或者网络失败的情况而重新发布。
* 确认并且保证消息被送达,提供了两种方式:发布确认和事务。
* (两者不可同时使用)在channel为事务时,
* 不可引入确认模式;同样channel为确认模式下,不可使用事务。
*
*/
@Bean
public MsgSendConfirmCallBack msgSendConfirmCallBack(){
return new MsgSendConfirmCallBack();
}
/**
* 消息未发送到交换机调用
* @return
*/
@Bean
public MsgSendReturnCallback msgSendReturnCallback(){
return new MsgSendReturnCallback();
}
@Bean
public Queue defaultQueue() {
return new Queue(RabbitMqQueue.QUEUE, true); // 队列持久
}
/**
* 针对消费者配置 FanoutExchange: 将消息分发到所有的绑定队列,
* 无routingkey的概念 HeadersExchange
* :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(RabbitMqExchange.EXCHANGE);
}
/**
* 通过绑定键将队列绑定到指定的交换机上
* @return
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(defaultQueue()).to(defaultExchange()).with(RabbitMqRoutingKey.ROUTINGKEY);
}
}
3、constants下文件
package com.rabbitmqtemplate.constants;
/**
* 交换机配置
* @author lishilei
*
*/
public interface RabbitMqExchange {
String EXCHANGE = "default-exchange";
}
package com.rabbitmqtemplate.constants;
/**
* 消息队列配置
* @author lishilei
*
*/
public interface RabbitMqQueue {
String QUEUE = "default-queue";
}
package com.rabbitmqtemplate.constants;
/**
* 路由配置
* @author lishilei
*
*/
public interface RabbitMqRoutingKey {
String ROUTINGKEY = "direct-routingkey";
}
4、callback
package com.rabbitmqtemplate.callback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import lombok.extern.slf4j.Slf4j;
/**
* 确认消息是否成功发送给交换机
* @author lishilei
*
*/
@Slf4j
public class MsgSendConfirmCallBack implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(" 回调消息id:" + correlationData);
log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
} else {
log.debug("消息发送到exchange失败,原因: {}", cause);
throw new RuntimeException("send error " + cause);
}
}
}
package com.rabbitmqtemplate.callback;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
/**
* 消息回调
*
* @author lishilei
*
*/
@Slf4j
public class MsgSendReturnCallback implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String correlatiOnId= message.getMessageProperties().getCorrelationIdString();
log.info("消息: {} 发送失败,应答码: {} 原因:{} 交换机:{} 路由键:{}", correlationId, replyCode,
replyText, exchange, routingKey);
}
}
5、Receiver 消费者
package com.rabbitmqtemplate.consumer;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import com.rabbitmqtemplate.constants.RabbitMqQueue;
import com.rabbitmqtemplate.dto.UserDTO;
import lombok.extern.slf4j.Slf4j;
/**
* 消费者
* @author lishilei
*
*/
@Component
@Slf4j
public class Receiver {
@RabbitListener(queues = RabbitMqQueue.QUEUE)
public void process(Message message, Channel channel) throws Exception {
/* 写入当前对象的二进制流
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(this);
读出二进制流产生的新对象
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bis);
return ois.readObject(); */
//获取传递的消息
ByteArrayInputStream bi = new ByteArrayInputStream(message.getBody());
ObjectInputStream oi = new ObjectInputStream(bi);
UserDTO obj = (UserDTO) oi.readObject();
log.info("消费端接收到消息 : " + obj);
// TODO 添加处理消息的业务逻辑
//手动应答,告诉rabbitmq业务执行完成,消息可以丢弃
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("消息处理成功");
}
}
6、Sender 生产者
package com.rabbitmqtemplate.sender;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmqtemplate.constants.RabbitMqExchange;
import com.rabbitmqtemplate.constants.RabbitMqRoutingKey;
/**
* 发送者
* @author lishilei
*
* @param
*/
@Component
public class Sender
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 消息发送
* @param e
*/
public void send(E e) {
String uuid = UUID.randomUUID().toString();
CorrelationData correlatiOnId= new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMqExchange.EXCHANGE, RabbitMqRoutingKey.ROUTINGKEY, e,correlationId);
}
}
package com.rabbitmqtemplate.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.rabbitmqtemplate.dto.UserDTO;
import com.rabbitmqtemplate.sender.Sender;
7、测试controller
/**
* 测试controller
* @Controller 和 @ResponseBody 等价 @RestController
* @author lishilei
*
*/
@Controller
@RequestMapping("/rabbbit")
public class SendController{
@Autowired
private Sender
@RequestMapping("/direct")
@ResponseBody
public String direct(){
UserDTO userDTO = new UserDTO();
userDTO.setAge(24);
userDTO.setName("alibaba");
sender.send(userDTO);
return "消息处理成功";
}
}
8、消息体,可自己定义
package com.rabbitmqtemplate.dto;
import java.io.Serializable;
import lombok.Data;
/**
* 消息体
* @author lishilei
*
*/
@Data
public class UserDTO implements Serializable{
/**
*
*/
private static final long serialVersiOnUID= 3623461501945244407L;
private String name;
private Integer age;
}
9、SpringBoot 工程启动
package com.rabbitmqtemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitMQApp
{
public static void main( String[] args )
{
SpringApplication.run(RabbitMQApp.class, args);
}
}