应用场景1----异步处理:
写进消息队列只用5ms 然后异步处理。
2.应用解耦
3.流量削峰
队列是定长的,没进队列的话就返回秒杀失败。
消息代理:就是消息中间件的服务器。消息发到服务器,服务器把消息发到指定的目的地。
通信机制如下图:对于点对点的只要有人拿到了其他人就不能拿到了。
4就是出队和入队5就是消息发布者发布到一个主题里面,bcd监听,只要一到达就可以收到。
JMS:AMQP是规范,不同
spring使用的话只要导入starter就可以了,然后就是自动配置类。
AMQP是跨平台的有的比较多的。
13------------------------------------------------------------------------------------------------------------------------
这个是由erlang实现的。
消息生产者将消息发送到服务器--消息代理,发给服务器里面的交换机。接收消息,路由到消息队列。队列和交换器的关联关系是绑定,就是路由关系表。
信道:解决登陆复用的。就是一条tcp连接的多个信道。
消费者和消息队列用tcp复用进行连接。
14-------------------------------------------------------------------------------------------------------------------------
消息队列和交换器是多对多的关系。
交换器的四种类型。
direct:直连,路由件和绑定的一模一样的时候才发送。
fanout:就是交换器消息抵达fanout交换器的时候,就是绑定的队列每人发一份的,就是广播模式。JMS里面发布订阅的参考实现。
topic:允许对路由件进行模糊匹配有选择的发给队列。类似于正则表达式。
headers:这个用的比较少的。
总结:
15--------------------------------------------------------------------------------------------------------------------------------------
讲解了如何用docker下载镜像。
搭建RabbitMQ:
安装带management的是带管理界面的。
docker pull rabbitmq:3-management
启动运行镜像:
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 1f53405491bcd是后台运行 -p是暴露端口 两个端口 5672是容器的 15672web端口
访问下:http://192.168.244.130:15672/
用户名和密码是guest
03:44
中间绿色的是四个队列。此外层是绑定规则。
新建exchange
添加交换器 Durable是是否持久化就是下次再来是不是数据还在。
新建两个交换机。
目前有三个交换器,继续添加消息队列。
继续添加:
-----------------------------------------------------------------------------
交换器绑定队列:
点进去交换器:
按照以上得方法给其他得交换机也绑定队列。
topic的
解释下上面第一个的意思 只要是队列的名字是atguigu.XXXXX我的exchange都绑定上。
绑定主要绑定的是exchange和queue
三步:建交换机-队列-绑定队列
------------------------------------------------------------
给交换器发送消息看哪个队列能收到消息。
测试下消息,点击交换器:
在队列atguigu里面查看:
fanout的路由发送。
可以看到全部发出了。
-----------------------------------------------
最后测试topic,根据匹配规则发送给队列。
都发了。
应答之后删除掉。选择这个。
就是不同的类型的exchange绑定queue设置名称,按照名称和规则绑定。
16--------------------------------------------------------------------------------------------------------------------------
创建过程整合测试rabbitmq
选择web和rabbitmq
看到这个。
分析下自动配置的原理:
我们可以看下这个自动配置类,都是在自动配置的配置文件里面的。
点进去这个配置文件:
根据配置文件我们开始配置。
spring.rabbitmq.host=192.168.244.130
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=
基本的配置看自动配置配置的。
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
如何发送消息和接受消息呢?我们还看自动配置
redisTemplate这个类的作用是给rabbitMq发送和接受消息的。
这个是系统管理组件,声明交换器和队列管理什么的。
注入消息组件。
---------------------------------
测试几个消息:
/*** 1、单播(点对点)*/@Testpublic void contextLoads() {//Message需要自己构造一个;定义消息体内容和消息头,最后一哥参数查看Message类//rabbitTemplate.send(exchage,routeKey,message);//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;//rabbitTemplate.convertAndSend(exchage,routeKey,object);Map
}
发送测试。
这个会有乱码是因为序列化是默认的java的序列化的方式。
接受消息,肯定在队列里面接受消息:
//接受数据,如何将数据自动的转为json发送出去@Testpublic void receive(){//参数是队列的名字Object o = rabbitTemplate.receiveAndConvert("atguigu.news");System.out.println(o.getClass());System.out.println(o);}
----------------------------------------------------------------------------------------------------
注意一点收到消息队列就没了因为就出队了,不是广播的只保证有一个收到。
如何将数据自动转为json发送出去:
原始的结果:
点进去simpleMessageConverter():
我们可以换一个MessageConverter。
看这个自动配置不为null的话就用我们自己定义的。
package com.atguigu.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MyAMQPConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
在次测试:
现在存取对象:
package com.atguigu.bean;public class Book {private String bookName;private String author;public Book() {}public Book(String bookName, String author) {this.bookName = bookName;this.author = author;}public String getBookName() {return bookName;}public void setBookName(String bookName) {this.bookName = bookName;}public String getAuthor() {return author;}public void setAuthor(String author) {this.author = author;}@Overridepublic String toString() {return "Book{" +"bookName='" + bookName + '\'' +", author='" + author + '\'' +'}';}
}
广播:
/*** 广播*/@Testpublic void sendMsg(){Map
17--------------------------------------------------------------------------------------------------------------------------
实际开发要需要监听场景:
类似库存系统和订单系统。
场景:两个系统订单系统和库存系统。下单的信息放在消息队列里面,库存系统实时的监听订单系统的消息。
一旦有新的订单进来库存系统由相应的操作。
package com.atguigu.service;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class BookService {//监听是监听的下消息队列 直接将消息反序列化为book对象 相当于强转@RabbitListener(queues = "atguigu.news")public void receive(String book){System.out.println("收到消息:"+book);}@RabbitListener(queues = "atguigu")public void receive02(Message message){System.out.println(message.getBody());System.out.println(message.getMessageProperties());}
}
注意这个使用之前要加上这个,开启基于注解的RabbitMQ:
切记。
清空队列了。
测试如下方法:
1.主程序开启基于注解的RabbltMQ。
2.注解监控消息,一旦在队列加入了值,直接监控器就会取出来。
18-----------------------------------------------------------------------------------------------------------------------------
管理系统:创建消息队列什么的。这个的目的是在程序中创建消息队列 交换器 绑定规则 和消息队列
测试类里面测试,注入:
@AutowiredAmqpAdmin amqpAdmin;
在自动配置类RabbitAutoConfiguration
里面找到这个组件:
搜索下Exchange,看下继承关系。
创建交换器:
@Testpublic void createExchange(){//创建exchangeamqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));System.out.println("创建完成");amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//创建绑定规则//参数:目的地就是绑定到哪个队列,绑定的类型是队列还是交换器,绑定的交换器给哪个交换器绑定,路由件,其他是nullamqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));}
运行:http://192.168.244.130:15672/
创建队列:
@Testpublic void createExchange(){//amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));//System.out.println("创建完成");//创建队列amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//创建绑定规则//amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));//amqpAdmin.de}
创建绑定队则:
@Testpublic void createExchange(){//amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));//System.out.println("创建完成");//amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));//创建绑定规则//参数:目的地就是绑定到哪个队列,绑定的类型是队列还是交换器,
绑定的交换器给哪个交换器绑定,路由件,其他是nullamqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));//amqpAdmin.delete......}
19-----------------------------------------------------------------------------------------------------------------------------
github地址:https://github.com/FandyWw/spring-boot-02-amqp