视频学习记录
解耦、异步、横向扩展、安全可靠、顺序保证
发送和接收数据,利用高效可靠的异步消息传递机制集成分布式系统
Java消息服务(Java Message Service),是一个Java平台中面向消息中间件的API
AMQP(advanced message queuing protocol),是一个提供统一消息服务的应用层标准协议。
此协议不受客户端和中间件的不同产品和不同开发语言的限制。
. | ActiveMQ | RabbitMQ | Kafka |
---|---|---|---|
优点 | 遵循JMS规范,安装方便 | 继承Erlang天生的并发性,最初用于金融行业,稳定性和安全性有保障 | 依赖zk,可动态扩展节点,高性能、高吞吐量、无限扩容、消息可指定追溯 |
缺点 | 有可能会丢失消息。现在的重心在下一代产品apolle上,所以5.x的产品不怎么维护了 | Erlang语言难度较大,不支持动态扩展 | 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移 |
支持协议 | AMQP,OpenWire,Stomp,XMPP | AMQP | |
应用 | 适合中小企业,不适合好千个队列的应用 | 适合对稳定性要求高的企业级应用 | 应用在大数据日志处理或对实时性、可靠性(少量数据丢失)要求较低的场景应用 |
1. 特性:
客户端包括生产者和消费者
队列中的消息只能被一个消息费者消息
消费者可以随时消费队列中的消息
2. 队列模型示意图
1. 特性:
客户端包括发布者和订阅者
主题中的消息被所有订阅者消息
消费者不能消费订阅之前就发送到主题中的消息
2. 主题模型示意图
ConnectionFactory 用于创建连接到消息中间件的连接工厂
Connection 代表了应用程序和消息服务器之间的通信链路
Destination 指消息发布和接收的地点,包括队列或主题
Session 表示一个单线程的上下文,用于发送和接收消息
MessageProducer 由会话创建,用于发送消息到目标
MessageConsumer 由会话创建,用于接收发送到目标的消息
Message 是在消费者和生产者之间传送的对象, 消息头,一组消息属性,一个消息体
1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/activemq-5152-release.html
2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip到C盘,然后双击C:\apache-activemq-5.15.2\bin\win64\activemq.bat运行ActiveMQ程序。
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,进入管理界面。
用户名与密码均为:admin
生产者代码片:
package com.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者
* @author Peter
*
*/
public class Proceducer {
/**
*
*/
private final static String URL = "tcp://localhost:61616";
/**
*
*/
private final static String QUEUE_NAME = "queue-name";
public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}
执行上面代码后,在管理界面看到的结果是:
消费者代码片:
/**
* 消费者
* @author Peter
*/
public class Consumer {
/**
* 中间件地址
*/
private final static String URL = "tcp://localhost:61616";
/**
* 中间件队列名,与生产者的一致
*/
private final static String QUEUE_NAME = "queue-name";
public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}
执行上面代码后,在管理界面的结果如下:
如果我再新建一个消费者,我们会发现,两个消费者在抢收消息,即一个消费者收到了消息,则另一个消费者就收不到该消息了。
由于订阅者是收不到还未订阅主题之前的内容的,所以必须要先启动订阅者。
订阅者代码片:
/**
* 订阅者
* @author Peter
*
*/
public class Consumer {
private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";
public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于订阅了该主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}
发布者代码片:
/**
* 发布者
* @author Peter
*
*/
public class Proceducer {
private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";
public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于发布一个主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}
如果我们再新建一个订阅者,我们会发现两个订阅者收到的消息完全一样。
我们下载的activeMQ压缩文件里解压后,能找到相关的jar包,但spring-jms这个可去maven仓库下载
1. ConnectionFactory 用于管理连接的连接工厂【也是连接池:管理JmsTemplate每次发送消息都会重新创建的连接、会话、productor】
实现类:
SingleConnectionFactory:每次都返回同一个连接
CachingConnectionFactory:继承了SingleConnectionFactory,并实现了缓存
2.JmsTemplate 用于发送和接收消息的模板类
由spring提供,它是线程安全类,可以在整个应用范围内应用
3.MessageListener 消息监听器
只需实现一个只接收Message参数的onMesssage方法
1. 发送消息的接口
public interface ProducerInter {
public void sendMessage(String message);
}
2. 发送消息实现类
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class ProducerImpl implements ProducerInter {
@Autowired
JmsTemplate jms;
// 由于可能会有多个目标,所以一定要以注入bean的id区分
@Resource(name="destination")
Destination destination;
@Override
public void sendMessage(String message) {
jms.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session sessioin) throws JMSException {
TextMessage msg = sessioin.createTextMessage(message);
System.out.println("发送消息:"+msg.getText());
return msg;
}
});
}
}
3. 配置文件(producer.xml)
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"
default-autowire="byName" default-lazy-init="false">
<context:component-scan base-package="com.jms.spring">context:component-scan>
<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>
<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactoryId"/>
bean>
<bean id="producerImpl" class="com.jms.spring.ProducerImpl">bean>
beans>
4. 测试发送
执行之后,进入管理界面可查看结果
public class TestProducer {
public static void main(String[] args) {
// 从classpath下加载配置文件
ApplicationContext applicatiOnContext= new ClassPathXmlApplicationContext("producer.xml");
ProducerImpl pro = (ProducerImpl) applicationContext.getBean("producerImpl");
pro.sendMessage("hello world");
}
}
5. 监听消息类
public class ConsumerMessageListener implements MessageListener{
// 监听消息
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("收到消息:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6. 接收消息的配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"
default-autowire="byName" default-lazy-init="false">
<context:component-scan base-package="com.jms.spring">context:component-scan>
<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>
<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>
<bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener">bean>
<bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactoryId"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="consumerMessageListener"/>
bean>
beans>
7. 测试消费者
public class TestConsumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("consumer.xml");
}
}
只需要将配置文件中的目标对象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主题模式下,一定要先启动消费者。
客户端集群:让多个消费者消费同一个队列
Broker clusters:多个Broker之间同步消息
Master Slave(主从):实现高可用
定义:允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器
语法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions参数说明
randomize 默认为true,表示在uri列表中选择uri连接时,是否采用随机策略
initialReconnectDelay 默认为10,单位毫秒,表示第一尝试重连之间等待的时间
maxReconnectionDelay 默认30000,单位毫秒,最长重连的时间间隔
1. 原理:
2. NetworkConnector(网络连接器)
网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息
分为静态连接器和动态连接器
3. 静态连接器:适用连接地址不多的情况
<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)">
networkConnectors>
4. 动态连接器
<networkConnectors>
<networkConnector uri="multicast://default">
networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default">
transportConnectors>
1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本删除了)
Share storage master/slave 共享存储
Replicated LevelDB Store 基于可复制的LevelDB Store
2. 共享存储集群的原理
先启动A,A就因为排他锁独占资源成为Master,此时A有外部服务能力,而B没有
如果A挂了,则B获取资源成为Master,这时所有请求都会交给B
3. 基于复制的LevelDB Store的原理
因为是基于ZooKeeper的,所以至少需要3劝服务器。zk选举A作为Master后,A就具有了外部服务能力,而B、C没有。当A获取到外部资源存储后,会通过zk将资源同步到B和C。
如果A故障,则zk会重新选举一个节点作为Master
. | 高可用 | 负载均衡 |
---|---|---|
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |
利用Json