热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java消息中间件

视频学习记录第一章消息中间件概述1.消息中间件的好处解耦、异步、横向扩展、安全可靠、顺序保证2.什么是消息中间件发送和接收数据,利用高效可靠的异步消息传递机制集成分布式

视频学习记录

第一章 消息中间件概述

1. 消息中间件的好处

解耦、异步、横向扩展、安全可靠、顺序保证

2. 什么是消息中间件

发送和接收数据,利用高效可靠的异步消息传递机制集成分布式系统

3. 什么是JMS(规范)

Java消息服务(Java Message Service),是一个Java平台中面向消息中间件的API

4. 什么是AMQP(协议)

AMQP(advanced message queuing protocol),是一个提供统一消息服务的应用层标准协议。
此协议不受客户端和中间件的不同产品和不同开发语言的限制。

5. 几个常用消息中间对比

. ActiveMQ RabbitMQ Kafka
优点 遵循JMS规范,安装方便 继承Erlang天生的并发性,最初用于金融行业,稳定性和安全性有保障 依赖zk,可动态扩展节点,高性能、高吞吐量、无限扩容、消息可指定追溯
缺点 有可能会丢失消息。现在的重心在下一代产品apolle上,所以5.x的产品不怎么维护了 Erlang语言难度较大,不支持动态扩展 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移
支持协议 AMQP,OpenWire,Stomp,XMPP AMQP
应用 适合中小企业,不适合好千个队列的应用 适合对稳定性要求高的企业级应用 应用在大数据日志处理或对实时性、可靠性(少量数据丢失)要求较低的场景应用

第二章 初始JMS

2.1 JSM相关概念

  1. 提供者: 实现JMS规范的消息中间件服务器
  2. 客户端:发送或接收消息的应用程序
  3. 生产者/发布者: 创建并发送消息的客户端
  4. 消费者/订阅者:接收并处理消息的客户端
  5. 消息:应用程序之间传递的数据内容
  6. 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

2.2 队列模式

1. 特性
客户端包括生产者和消费者
队列中的消息只能被一个消息费者消息
消费者可以随时消费队列中的消息

2. 队列模型示意图
这里写图片描述

2.3 主题模式

1. 特性
客户端包括发布者和订阅者
主题中的消息被所有订阅者消息
消费者不能消费订阅之前就发送到主题中的消息

2. 主题模型示意图
这里写图片描述

2.4 JSM编码接口

ConnectionFactory 用于创建连接到消息中间件的连接工厂
Connection 代表了应用程序和消息服务器之间的通信链路
Destination 指消息发布和接收的地点,包括队列或主题
Session 表示一个单线程的上下文,用于发送和接收消息
MessageProducer 由会话创建,用于发送消息到目标
MessageConsumer 由会话创建,用于接收发送到目标的消息
Message 是在消费者和生产者之间传送的对象, 消息头,一组消息属性,一个消息体
这里写图片描述

第三章 ActiveMQ的使用

3.1 activeMQ在Windows平台上的安装

1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/activemq-5152-release.html

2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip到C盘,然后双击C:\apache-activemq-5.15.2\bin\win64\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,进入管理界面。
用户名与密码均为:admin

3.2 ActiveMQ的队列模式

生产者代码片

package com.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者
* @author Peter
*
*/

public class Proceducer {

/**
*
*/

private final static String URL = "tcp://localhost:61616";
/**
*
*/

private final static String QUEUE_NAME = "queue-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}

执行上面代码后,在管理界面看到的结果是:
这里写图片描述

消费者代码片

/**
* 消费者
* @author Peter
*/

public class Consumer {
/**
* 中间件地址
*/

private final static String URL = "tcp://localhost:61616";
/**
* 中间件队列名,与生产者的一致
*/

private final static String QUEUE_NAME = "queue-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}

执行上面代码后,在管理界面的结果如下:
这里写图片描述

如果我再新建一个消费者,我们会发现,两个消费者在抢收消息,即一个消费者收到了消息,则另一个消费者就收不到该消息了。

3.3 ActiveMQ的主题模式(发布/订阅)

由于订阅者是收不到还未订阅主题之前的内容的,所以必须要先启动订阅者。

订阅者代码片:

/**
* 订阅者
* @author Peter
*
*/

public class Consumer {
private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于订阅了该主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}

发布者代码片

/**
* 发布者
* @author Peter
*
*/

public class Proceducer {

private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于发布一个主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}

如果我们再新建一个订阅者,我们会发现两个订阅者收到的消息完全一样。

3.4 spring集成JMS连接ActiveMQ

我们下载的activeMQ压缩文件里解压后,能找到相关的jar包,但spring-jms这个可去maven仓库下载

3.4.1 几个相关类

1. ConnectionFactory 用于管理连接的连接工厂【也是连接池:管理JmsTemplate每次发送消息都会重新创建的连接、会话、productor】
实现类:
SingleConnectionFactory:每次都返回同一个连接
CachingConnectionFactory:继承了SingleConnectionFactory,并实现了缓存

2.JmsTemplate 用于发送和接收消息的模板类
由spring提供,它是线程安全类,可以在整个应用范围内应用

3.MessageListener 消息监听器
只需实现一个只接收Message参数的onMesssage方法

3.4.2 消息队列模式与spring集成

1. 发送消息的接口

public interface ProducerInter {
public void sendMessage(String message);
}

2. 发送消息实现类

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ProducerImpl implements ProducerInter {

@Autowired
JmsTemplate jms;
// 由于可能会有多个目标,所以一定要以注入bean的id区分
@Resource(name="destination")
Destination destination;

@Override
public void sendMessage(String message) {
jms.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session sessioin) throws JMSException {
TextMessage msg = sessioin.createTextMessage(message);
System.out.println("发送消息:"+msg.getText());
return msg;
}
});

}

}

3. 配置文件(producer.xml)


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"

default-autowire="byName" default-lazy-init="false">



<context:component-scan base-package="com.jms.spring">context:component-scan>


<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>


<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>


<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactoryId"/>
bean>


<bean id="producerImpl" class="com.jms.spring.ProducerImpl">bean>
beans>

4. 测试发送
执行之后,进入管理界面可查看结果

public class TestProducer {
public static void main(String[] args) {
// 从classpath下加载配置文件
ApplicationContext applicatiOnContext= new ClassPathXmlApplicationContext("producer.xml");
ProducerImpl pro = (ProducerImpl) applicationContext.getBean("producerImpl");
pro.sendMessage("hello world");

}
}

5. 监听消息类

public class ConsumerMessageListener implements MessageListener{
// 监听消息
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("收到消息:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}

6. 接收消息的配置


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"

default-autowire="byName" default-lazy-init="false">



<context:component-scan base-package="com.jms.spring">context:component-scan>


<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>


<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>


<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>




<bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener">bean>


<bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactoryId"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="consumerMessageListener"/>
bean>

beans>

7. 测试消费者

public class TestConsumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("consumer.xml");
}
}

3.4.3 主题模式与spring的集成

只需要将配置文件中的目标对象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主题模式下,一定要先启动消费者。

第四章 ActiveMQ集群

4.1 集群方式

客户端集群:让多个消费者消费同一个队列
Broker clusters:多个Broker之间同步消息
Master Slave(主从):实现高可用

4.2 客户端配置

4.2.1. ActiveMQ失效转移(failover):

定义:允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器
语法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions参数说明
randomize 默认为true,表示在uri列表中选择uri连接时,是否采用随机策略
initialReconnectDelay 默认为10,单位毫秒,表示第一尝试重连之间等待的时间
maxReconnectionDelay 默认30000,单位毫秒,最长重连的时间间隔

4.3 Broker Cluster集群配置

1. 原理:
这里写图片描述
2. NetworkConnector(网络连接器)
网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息
分为静态连接器和动态连接器

3. 静态连接器:适用连接地址不多的情况

<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)">
networkConnectors>

4. 动态连接器

<networkConnectors>
<networkConnector uri="multicast://default">
networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default">
transportConnectors>

4.4 Master/Slave集群配置

1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本删除了)
Share storage master/slave 共享存储
Replicated LevelDB Store 基于可复制的LevelDB Store

2. 共享存储集群的原理
先启动A,A就因为排他锁独占资源成为Master,此时A有外部服务能力,而B没有
这里写图片描述
如果A挂了,则B获取资源成为Master,这时所有请求都会交给B
这里写图片描述

3. 基于复制的LevelDB Store的原理
因为是基于ZooKeeper的,所以至少需要3劝服务器。zk选举A作为Master后,A就具有了外部服务能力,而B、C没有。当A获取到外部资源存储后,会通过zk将资源同步到B和C。
这里写图片描述
如果A故障,则zk会重新选举一个节点作为Master

4.5 Broker clusters和Master Slave对比

. 高可用 负载均衡
Master/Slave
Broker Cluster

4.6 高可用且负载均衡的集群方案

这里写图片描述

第五章 消息中间件如何传对象

利用Json


推荐阅读
  • CentOs 7.3中搭建RabbitMQ 3.6单机多实例服务的步骤与使用
    CentOs7.3中搭建RabbitMQ3.6单机多实例服务的步骤与使用-RabbitMQ简介RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • Voicewo在线语音识别转换jQuery插件的特点和示例
    本文介绍了一款名为Voicewo的在线语音识别转换jQuery插件,该插件具有快速、架构、风格、扩展和兼容等特点,适合在互联网应用中使用。同时还提供了一个快速示例供开发人员参考。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • 解决Sharepoint 2013运行状况分析出现的“一个或多个服务器未响应”问题的方法
    本文介绍了解决Sharepoint 2013运行状况分析中出现的“一个或多个服务器未响应”问题的方法。对于有高要求的客户来说,系统检测问题的存在是不可接受的。文章详细描述了解决该问题的步骤,包括删除服务器、处理分布式缓存留下的记录以及使用代码等方法。同时还提供了相关关键词和错误提示信息,以帮助读者更好地理解和解决该问题。 ... [详细]
  • Sleuth+zipkin链路追踪SpringCloud微服务的解决方案
    在庞大的微服务群中,随着业务扩展,微服务个数增多,系统调用链路复杂化。Sleuth+zipkin是解决SpringCloud微服务定位和追踪的方案。通过TraceId将不同服务调用的日志串联起来,实现请求链路跟踪。通过Feign调用和Request传递TraceId,将整个调用链路的服务日志归组合并,提供定位和追踪的功能。 ... [详细]
  • ElasticSerach初探第一篇认识ES+环境搭建+简单MySQL数据同步+SpringBoot整合ES
    一、认识ElasticSearch是一个基于Lucene的开源搜索引擎,通过简单的RESTfulAPI来隐藏Lucene的复杂性。全文搜索,分析系统&# ... [详细]
  • 本文介绍了C#中生成随机数的三种方法,并分析了其中存在的问题。首先介绍了使用Random类生成随机数的默认方法,但在高并发情况下可能会出现重复的情况。接着通过循环生成了一系列随机数,进一步突显了这个问题。文章指出,随机数生成在任何编程语言中都是必备的功能,但Random类生成的随机数并不可靠。最后,提出了需要寻找其他可靠的随机数生成方法的建议。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了在使用Python中的aiohttp模块模拟服务器时出现的连接失败问题,并提供了相应的解决方法。文章中详细说明了出错的代码以及相关的软件版本和环境信息,同时也提到了相关的警告信息和函数的替代方案。通过阅读本文,读者可以了解到如何解决Python连接服务器失败的问题,并对aiohttp模块有更深入的了解。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • Java和JavaScript是什么关系?java跟javaScript都是编程语言,只是java跟javaScript没有什么太大关系,一个是脚本语言(前端语言),一个是面向对象 ... [详细]
  • 近期看见一篇来自Intel的很有意思的分析文章,作者提到在他向45名与会的各公司程序员开发经理战略师提问“什么是实施并行编程的最大障碍”时,下面五个因素 ... [详细]
author-avatar
嗯哼
很清新
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有