SpringBoot应用可以完成自动配置及依赖注入——可以通过Spring直接提供与MQ的连接对象
创建SpringBoot应用,添加依赖
server:
port: 9001
spring:
application:
name: producer
rabbitmq:
host: 47.96.11.185
port: 5672
virtual-host: host1
username: ytao
password: admin123
发送消息
@Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void sendMsg(String msg){ //1. 发送消息到队列 amqpTemplate.convertAndSend("queue1",msg); //2. 发送消息到交换机(订阅交换机) amqpTemplate.convertAndSend("ex1","",msg); //3. 发送消息到交换机(路由交换机) amqpTemplate.convertAndSend("ex2","a",msg); } }
创建项目添加依赖
配置yml
接收消息
@Service //@RabbitListener(queues = {"queue1","queue2"}) @RabbitListener(queues = "queue1") public class ReceiveMsgService { @RabbitHandler public void receiveMsg(String msg){ System.out.println("接收MSG:"+msg); } //@RabbitHandler //public void receiveMsg(byte[] bs){ // //} }
RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息
要求:
传递的对象实现序列化接口
传递的对象的包名、类名、属性名必须一致
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息队列可以发送 字符串、字节数组、序列化对象 amqpTemplate.convertAndSend("","queue1",goods); } }
消息消费者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(Goods goods){ System.out.println("Goods---"+goods); } }
要求:
传递的对象实现序列化接口
传递的对象的包名、类名、属性名必须一致
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods){ //消息队列可以发送 字符串、字节数组、序列化对象 byte[] bytes = SerializationUtils.serialize(goods); amqpTemplate.convertAndSend("","queue1",bytes); } }
消息消费者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(byte[] bs){ Goods goods = (Goods) SerializationUtils.deserialize(bs); System.out.println("byte[]---"+goods); } }
要求:对象的属性名一直
消息提供者
@Service public class MQService { @Resource private AmqpTemplate amqpTemplate; public void sendGoodsToMq(Goods goods) throws JsonProcessingException { //消息队列可以发送 字符串、字节数组、序列化对象 ObjectMapper objectMapper = new ObjectMapper(); String msg = objectMapper.writeValueAsString(goods); amqpTemplate.convertAndSend("","queue1",msg); } }
消息消费者
@Component @RabbitListener(queues = "queue1") public class ReceiveService { @RabbitHandler public void receiveMsg(String msg) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); Goods goods = objectMapper.readValue(msg,Goods.class); System.out.println("String---"+msg); } }
我们使用消息队列,消息队列和交换机可以通过管理系统完成创建,也可以在应用程序中通过Java代码来完成创建
使用Java代码新建队列
//1.定义队列 (使用Java代码在MQ中新建一个队列) //参数1:定义的队列名称 //参数2:队列中的数据是否持久化(如果选择了持久化) //参数3: 是否排外(当前队列是否为当前连接私有) //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)) //参数5:设置当前队列的参数 channel.queueDeclare("queue7",false,false,false,null);
//定义一个“订阅交换机” channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT); //定义一个“路由交换机” channel.exchangeDeclare("ex4", BuiltinExchangeType.DIRECT);
//绑定队列 //参数1:队列名称 //参数2:目标交换机 //参数3:如果绑定订阅交换机参数为"",如果绑定路由交换机则表示设置队列的key channel.queueBind("queue7","ex4","k1"); channel.queueBind("queue8","ex4","k2");
@Configuration public class RabbitMQConfiguration { //声明队列 @Bean public Queue queue9(){ Queue queue9 = new Queue("queue9"); //设置队列属性 return queue9; } @Bean public Queue queue10(){ Queue queue10 = new Queue("queue10"); //设置队列属性 return queue10; } //声明订阅模式交换机 @Bean public FanoutExchange ex5(){ return new FanoutExchange("ex5"); } //声明路由模式交换机 @Bean public DirectExchange ex6(){ return new DirectExchange("ex6"); } //绑定队列 @Bean public Binding bindingQueue9(Queue queue9, DirectExchange ex6){ return BindingBuilder.bind(queue9).to(ex6).with("k1"); } @Bean public Binding bindingQueue10(Queue queue10, DirectExchange ex6){ return BindingBuilder.bind(queue10).to(ex6).with("k2"); } }
当在消息发送过程中添加了事务,处理效率降低几十倍甚至上百倍
channel.txSelect(); //开启事务 try{ channel.basicPublish("ex4", "k1", null, msg.getBytes()); System.out.println("发送:" + msg); channel.txCommit(); //提交事务 }catch (Exception e){ channel.txRollback(); //事务回滚 }
消息确认机制:确认消息提供者是否成功发送消息到交换机
return机制:确认消息是否成功的从交换机分发到队列
普通confirm方式
//1.发送消息之前开启消息确认 channel.confirmSelect(); channel.basicPublish("ex1", "a", null, msg.getBytes()); //2.接收消息确认 boolean b = channel.waitForConfirms(); System.out.println("发送:" +(b?"成功":"失败"));
//1.发送消息之前开启消息确认 channel.confirmSelect(); //2.批量发送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish("ex1", "a", null, msg.getBytes()); } //3.接收批量消息确认:发送的所有消息中,如果有一条是失败的,则所有消息发送直接失败,抛出IO异常 boolean b = channel.waitForConfirms();
//发送消息之前开启消息确认 channel.confirmSelect(); //批量发送消息 for (int i=0 ; i<10 ; i++){ channel.basicPublish("ex1", "a", null, msg.getBytes()); } //假如发送消息需要10s,waitForConfirms会进入阻塞状态 //boolean b = channel.waitForConfirms(); //使用监听器异步confirm channel.addConfirmListener(new ConfirmListener() { //参数1: long l 返回消息的表示 //参数2: boolean b 是否为批量confirm public void handleAck(long l, boolean b) throws IOException { System.out.println("~~~~~消息成功发送到交换机"); } public void handleNack(long l, boolean b) throws IOException { System.out.println("~~~~~消息发送到交换机失败"); } });
添加return监听器
发送消息是指定第三个参数为true
由于监听器监听是异步处理,所以在消息发送之后不能关闭channel
String msg = "Hello HuangDaoJun!"; Connection connection = ConnectionUtil.getConnection(); //相当于JDBC操作的数据库连接 Channel channel = connection.createChannel(); //相当于JDBC操作的statement //return机制:监控交换机是否将消息分发到队列 channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2,AMQP.BasicProperties basicProperties,byte[] bytes) throws IOException { //如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况) System.out.println("*****"+i); //标识 System.out.println("*****"+s); // System.out.println("*****"+s1); //交换机名 System.out.println("*****"+s2); //交换机对应的队列的key System.out.println("*****"+new String(bytes)); //发送的消息 } }); //发送消息 //channel.basicPublish("ex2", "c", null, msg.getBytes()); channel.basicPublish("ex2", "c", true, null, msg.getBytes());
spring:
rabbitmq:
publisher-confirm-type: simple ## 开启消息确认模式
publisher-returns: true ##使用return监听机制
4.3.2 创建confirm和return监听
@Component public class MsgConfirmAndReturn implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { Logger logger = LoggerFactory.getLogger(MsgConfirmAndReturn.class); @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { //此方法用于监听消息确认结果(消息是否发送到交换机) if(b){ logger.info("-------消息成功发送到交换机"); }else{ logger.warn("-------消息发送到交换机失败"); } } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { //此方法用于return监听(当交换机分发消息到队列失败时执行) logger.warn("~~~~~~~交换机分发消息到队列失败"); } }
延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费
AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能
TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间
在创建队列的时候可以设置队列的存活时间,当消息进入到队列并且在存活时间内没有消费者消费,则此消息就会从当前队列被移除;
创建消息队列没有设置TTL,但是消息设置了TTL,那么当消息的存活时间结束,也会被移除;
当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列
5.2.2 创建交换机和队列
2.创建消息队列
3.创建死信队列
4.队列绑定