中间件:介于两者之间的技术
消息中间件:消息中间件利用⾼效可靠的消息传递机制进行平台⽆关的数据交流,并基于数据通信来进行分布式系统的集成。
RocketMQ是阿⾥巴巴开源的一个消息中间件,是一个队列列模型的消息中间件,具有⾼性能、高可靠、高实时、分布式特点。目前已贡献给apache。
将⼀些可以进行异步化的操作通过发送消息来进行异步化,提⾼效率。
具体场景:用户为了使用某个应用,进⾏注册,系统需要发送注册邮件并验证短信。对这两个操作的处理⽅式有两种:串行及并行。
串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信
在这种方式下,需要最终发送验证短信后再返回给客户端。
并行处理:新注册信息写入后,发短信和发邮件并行处理
在这种方式下,发短信和发邮件 需处理完成后再返回给客户端。
假设以上三个子系统处理的时间均为50ms,且不考虑网络延迟,则总的处理时间:
串行:50 + 50 + 50 = 150ms
并行:50 + 50 = 100ms
使用消息队列
并在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略略不计,因此总的处理时间相⽐串行提高了2倍,相⽐并行提⾼了一倍。
在高并发场景下把请求存入消息队列,利用排队思想降低系统瞬间峰值。
具体场景:购物网站开展秒杀活动,一般由于瞬时访问量过大,服务器接收过大,会导致流量暴增,相关系统无法处理请求甚至崩溃。而加入消息队列后,系统可以从消息队列中取数据,相当于消息队列做了一次缓冲。
优点:
消息中间件不仅只有RocketMQ,还有很多其他的消息中间件。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
JMS: 全称是Java Message Service,即消息服务应用程序接口,是一个Java面向消息中间件平台的API,用于在两个应用程序之间,或分布式系统中发送消息,进⾏异步通信
RabbitMQ:AMQP协议的领导实现,⽀持多种场景。淘宝的MySQL集群内部有使⽤它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
AMQP: 即Advanced Message Queuing Protocol,一个提供统⼀消息服务的应用层标准高级消息队列协议,是应用层协议的⼀个开放标准,为面向消息的中间件设计
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:⽐如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Producer: 消息生产者,负责消息的产生,由业务系统负责产生
Consumer:消息消费者,负责消息消费,由后台业务系统负责异步消费
Topic:消息的逻辑管理单位
这三者是RocketMq中最基本的概念。Producer是消息的生产者。Consumer是消息的消费者。消息通过Topic进行传递。Topic存放的是消息的逻辑地址。
具体来说是Producer将消息发往具体的Topic。Consumer订阅Topic,主动拉取或被动接受消息,如果Consumer消费消息失败则默认会重试16次
连接: 单个broker和所有nameserver保持⻓长连接心跳:
心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式:
不管是同步刷盘还是异步刷盘,都是通过Broker配置文件⾥的flushDiskType参数设置的,这个参数被设置成SYNC_FLUSH、ASYNC_FLUSH中的一个
官网:http://rocketmq.apache.org/
下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
目的:为了防止出现内存不足的情况
修改NameServer和Broker配置:(windows/Linux)
(如果是Linux环境则对应的是 runbroker.sh 和runserver.sh),编辑runbroker.cmd文件,修改如下:
参数解释:
在相应对安装目录/bin目录rocketmq-all-4.4.0-bin-release\bin中进行启动:
start mqnamesrv.cmd
Linux则执⾏
sh ./mqnamesrv
出现以下日志表示启动成功
2. 启动RocketMQ服务,也就是broker
进入bin目录,执⾏命令:
Windows
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
如果要同时加载配置文件:
输入命令:
start mqbroker.cmd -n 127.0.0.1:9876 -c D:\rocketmq-all-4.4.0-bin-release\conf\broker.conf autoCreateTopicEnable=true
成功启动:
Linux
sh ./mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
注意:autoCreateTopicEnable=true这个设置表示开启自动创建topic功能,真实生产环境不建议开启。
出现以下日志表示启动成功:
补充命令:
jps: 查看Java进程,是JDK给我们提供的一个⼯具命令
注意:有找不到或无法加载主类bug见以下链接
https://blog.csdn.net/gy99csdn/article/details/116083846
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>com.gygroupId><artifactId>rocketmqartifactId><version>1.0-SNAPSHOTversion><properties><java.version>1.8java.version>properties><dependencies><dependency><groupId>org.apache.rocketmqgroupId><artifactId>rocketmq-clientartifactId><version>4.4.0version>dependency>dependencies><build><plugins><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target>configuration>plugin>plugins>build>
project>
BasicProducer
public class BasicProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// 1. 创建一个消息生产者对象DefaultMQProducer producer = new DefaultMQProducer("test_basic_producer");// 2. 设置producer连接那个nameserver的地址producer.setNamesrvAddr("127.0.0.1:9876");// 3. 启动消息生产者producer.start();//4. 准备好待发送的消息String topic = "test_t";Message message = new Message();//设置消息的Topicmessage.setTopic(topic);//向消息对象中,放入我们实际的数据message.setBody("hello,rocketMq".getBytes());//+ "",意为拼接成字符串message.putUserProperty("sendTime", System.currentTimeMillis() + "");// 5. 利用消息生产者,将消息发送出去SendResult send = producer.send(message);System.out.println(send);if (send.getSendStatus().equals(SendStatus.SEND_OK)) {System.out.println("消息发送成功,消息id为" + send.getMsgId());return;}System.out.println("消息发送失败" + send.getMsgId());}
}
启动生产者结果:
SendResult [sendStatus=SEND_OK, msgId=C0A82B334BD018B4AAC27AE2CBFE0000
, offsetMsgId=C0A8047300002A9F000000000000059A
, messageQueue=MessageQueue [topic=test_t, brokerName=broker-a, queueId=2], queueOffset=0]
消息发送成功,消息id为C0A82B334BD018B4AAC27AE2CBFE0000
BasicConsumer
public class BasicConsumer {public static void main(String[] args) throws MQClientException {//1.创建一个消息生产者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_basic_consumer");//2.设置消费者和nameserver建立连接的地址consumer.setNamesrvAddr("127.0.0.1:9876");//向Rocket订阅指定主题的消息consumer.subscribe("test_t", "*");//注册消息消费者的监听器(实现消息的消费逻辑)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);String sendTime = messageExt.getUserProperty("sendTime");try {String format = String.format("消息从发送到接收的时间延时是%dms", ((System.currentTimeMillis() - Long.parseLong(sendTime))));System.out.println(format);} catch (Exception e) {e.printStackTrace();}//获取消息的msgIdString msgId = messageExt.getMsgId();// 从获取到的消息对象中,取出我们消息中的数据byte[] body = messageExt.getBody();String s = new String(body);String formatStr = String.format("接收到消息,消息id为%s, 消息内容是%s", msgId, s);System.out.println(formatStr);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消息消费者consumer.start();}
}
首先启动消费者,等待生产者启动后,生产者传递消息给消费者,消费者接受到消息。
消息从发送到接收的时间延时是920ms接收到消息,消息id为C0A82B33483818B4AAC27AE498990000
, 消息内容是hello,rocketMq
分为不同的级别,见下图:
public class DelayConsumer {public static void main(String[] args) throws MQClientException {// 创建一个消息消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_basic_consumer");// 设置消费者和nameserver建立连接的地址consumer.setNamesrvAddr("127.0.0.1:9876");// 向Rocket订阅指定主题的消息consumer.subscribe("test_delay", "*");// 注册消息消费的监听器(实现消息的消费逻辑)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);String sendTime = messageExt.getUserProperty("sendTime");try {String format = String.format("消息从发送到接收的时间延时是%dms",(System.currentTimeMillis() - Long.parseLong(sendTime)));System.out.println(format);} catch (Exception e) {e.printStackTrace();}// 获取消息的msgIdString msgId = messageExt.getMsgId();// 从获取到的消息对象中,取出我们消息中的数据byte[] body = messageExt.getBody();String s = new String(body);String formatStr = String.format("接收到消息,消息id为%s, 消息内容是%s", msgId, s);System.out.println(formatStr);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消息消费者consumer.start();}
}
public class DelayProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {// 1. 创建一个消息生产者对象DefaultMQProducer producer = new DefaultMQProducer("test_basic_producer");// 2. 设置producer连接那个nameserver的地址producer.setNamesrvAddr("127.0.0.1:9876");// 3. 启动消息生产者producer.start();//4. 准备好待发送的消息String topic = "test_delay";Message message = new Message();// 设置消息的Topicmessage.setTopic(topic);// 向消息对象中,放入我们实际的数据message.setBody("hello, rocketMq".getBytes());//private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";message.setDelayTimeLevel(2);message.putUserProperty("sendTime", System.currentTimeMillis() + "");// 5. 利用消息生产者,将消息发送出去SendResult send = producer.send(message);System.out.println(send);if (send.getSendStatus().equals(SendStatus.SEND_OK)) {System.out.println("消息发送成功,消息id为" + send.getMsgId());return;}System.out.println("消息发送失败" + send.getMsgId());}
}
结果和上面一样,延迟发送而已!