热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Java消息中间件

视频学习记录第一章消息中间件概述1.消息中间件的好处解耦、异步、横向扩展、安全可靠、顺序保证2.什么是消息中间件发送和接收数据,利用高效可靠的异步消息传递机制集成分布式

视频学习记录

第一章 消息中间件概述

1. 消息中间件的好处

解耦、异步、横向扩展、安全可靠、顺序保证

2. 什么是消息中间件

发送和接收数据,利用高效可靠的异步消息传递机制集成分布式系统

3. 什么是JMS(规范)

Java消息服务(Java Message Service),是一个Java平台中面向消息中间件的API

4. 什么是AMQP(协议)

AMQP(advanced message queuing protocol),是一个提供统一消息服务的应用层标准协议。
此协议不受客户端和中间件的不同产品和不同开发语言的限制。

5. 几个常用消息中间对比

. ActiveMQ RabbitMQ Kafka
优点 遵循JMS规范,安装方便 继承Erlang天生的并发性,最初用于金融行业,稳定性和安全性有保障 依赖zk,可动态扩展节点,高性能、高吞吐量、无限扩容、消息可指定追溯
缺点 有可能会丢失消息。现在的重心在下一代产品apolle上,所以5.x的产品不怎么维护了 Erlang语言难度较大,不支持动态扩展 严格的顺序机制,不支持消息优先级,不支持标准的消息协议,不利于平台迁移
支持协议 AMQP,OpenWire,Stomp,XMPP AMQP
应用 适合中小企业,不适合好千个队列的应用 适合对稳定性要求高的企业级应用 应用在大数据日志处理或对实时性、可靠性(少量数据丢失)要求较低的场景应用

第二章 初始JMS

2.1 JSM相关概念

  1. 提供者: 实现JMS规范的消息中间件服务器
  2. 客户端:发送或接收消息的应用程序
  3. 生产者/发布者: 创建并发送消息的客户端
  4. 消费者/订阅者:接收并处理消息的客户端
  5. 消息:应用程序之间传递的数据内容
  6. 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

2.2 队列模式

1. 特性
客户端包括生产者和消费者
队列中的消息只能被一个消息费者消息
消费者可以随时消费队列中的消息

2. 队列模型示意图
这里写图片描述

2.3 主题模式

1. 特性
客户端包括发布者和订阅者
主题中的消息被所有订阅者消息
消费者不能消费订阅之前就发送到主题中的消息

2. 主题模型示意图
这里写图片描述

2.4 JSM编码接口

ConnectionFactory 用于创建连接到消息中间件的连接工厂
Connection 代表了应用程序和消息服务器之间的通信链路
Destination 指消息发布和接收的地点,包括队列或主题
Session 表示一个单线程的上下文,用于发送和接收消息
MessageProducer 由会话创建,用于发送消息到目标
MessageConsumer 由会话创建,用于接收发送到目标的消息
Message 是在消费者和生产者之间传送的对象, 消息头,一组消息属性,一个消息体
这里写图片描述

第三章 ActiveMQ的使用

3.1 activeMQ在Windows平台上的安装

1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/activemq-5152-release.html

2.运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip到C盘,然后双击C:\apache-activemq-5.15.2\bin\win64\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,进入管理界面。
用户名与密码均为:admin

3.2 ActiveMQ的队列模式

生产者代码片

package com.queue;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生产者
* @author Peter
*
*/

public class Proceducer {

/**
*
*/

private final static String URL = "tcp://localhost:61616";
/**
*
*/

private final static String QUEUE_NAME = "queue-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}

执行上面代码后,在管理界面看到的结果是:
这里写图片描述

消费者代码片

/**
* 消费者
* @author Peter
*/

public class Consumer {
/**
* 中间件地址
*/

private final static String URL = "tcp://localhost:61616";
/**
* 中间件队列名,与生产者的一致
*/

private final static String QUEUE_NAME = "queue-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标
Destination dest = session.createQueue(QUEUE_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}

执行上面代码后,在管理界面的结果如下:
这里写图片描述

如果我再新建一个消费者,我们会发现,两个消费者在抢收消息,即一个消费者收到了消息,则另一个消费者就收不到该消息了。

3.3 ActiveMQ的主题模式(发布/订阅)

由于订阅者是收不到还未订阅主题之前的内容的,所以必须要先启动订阅者。

订阅者代码片:

/**
* 订阅者
* @author Peter
*
*/

public class Consumer {
private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于订阅了该主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个消费者
MessageConsumer cOnsumer= session.createConsumer(dest);
// 7. 创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("接收消息为:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 先不关闭,不然还没接收到消息就关闭了
//con.close();
}
}

发布者代码片

/**
* 发布者
* @author Peter
*
*/

public class Proceducer {

private final static String URL = "tcp://localhost:61616";
private final static String TOPIC_NAME = "topic-name";

public static void main(String[] args) throws JMSException {
// 1. 创建ConnectionFactory
ConnectionFactory factory = new ActiveMQConnectionFactory(URL);
// 2. 创建Connection
Connection con = factory.createConnection();
// 3. 启动连接
con.start();
// 4. 创建会话
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个目标【与队列模式的区别就在这里,相当于发布一个主题】
Destination dest = session.createTopic(TOPIC_NAME);
// 6. 创建一个生产者
MessageProducer pro = (MessageProducer) session.createProducer(dest);
for(int i = 0; i<10;i++) {
// 7. 创建消息
TextMessage msg = session.createTextMessage("消息"+i);
// 8. 发布消息
pro.send(msg);
System.out.println(msg);
}
// 9. 关闭连接
con.close();
}
}

如果我们再新建一个订阅者,我们会发现两个订阅者收到的消息完全一样。

3.4 spring集成JMS连接ActiveMQ

我们下载的activeMQ压缩文件里解压后,能找到相关的jar包,但spring-jms这个可去maven仓库下载

3.4.1 几个相关类

1. ConnectionFactory 用于管理连接的连接工厂【也是连接池:管理JmsTemplate每次发送消息都会重新创建的连接、会话、productor】
实现类:
SingleConnectionFactory:每次都返回同一个连接
CachingConnectionFactory:继承了SingleConnectionFactory,并实现了缓存

2.JmsTemplate 用于发送和接收消息的模板类
由spring提供,它是线程安全类,可以在整个应用范围内应用

3.MessageListener 消息监听器
只需实现一个只接收Message参数的onMesssage方法

3.4.2 消息队列模式与spring集成

1. 发送消息的接口

public interface ProducerInter {
public void sendMessage(String message);
}

2. 发送消息实现类

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ProducerImpl implements ProducerInter {

@Autowired
JmsTemplate jms;
// 由于可能会有多个目标,所以一定要以注入bean的id区分
@Resource(name="destination")
Destination destination;

@Override
public void sendMessage(String message) {
jms.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session sessioin) throws JMSException {
TextMessage msg = sessioin.createTextMessage(message);
System.out.println("发送消息:"+msg.getText());
return msg;
}
});

}

}

3. 配置文件(producer.xml)


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"

default-autowire="byName" default-lazy-init="false">



<context:component-scan base-package="com.jms.spring">context:component-scan>


<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>


<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>


<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactoryId"/>
bean>


<bean id="producerImpl" class="com.jms.spring.ProducerImpl">bean>
beans>

4. 测试发送
执行之后,进入管理界面可查看结果

public class TestProducer {
public static void main(String[] args) {
// 从classpath下加载配置文件
ApplicationContext applicatiOnContext= new ClassPathXmlApplicationContext("producer.xml");
ProducerImpl pro = (ProducerImpl) applicationContext.getBean("producerImpl");
pro.sendMessage("hello world");

}
}

5. 监听消息类

public class ConsumerMessageListener implements MessageListener{
// 监听消息
@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println("收到消息:"+msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}

}

6. 接收消息的配置


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"

default-autowire="byName" default-lazy-init="false">



<context:component-scan base-package="com.jms.spring">context:component-scan>


<bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
bean>


<bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactoryId"/>
bean>


<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queuename"/>
bean>




<bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener">bean>


<bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactoryId"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="consumerMessageListener"/>
bean>

beans>

7. 测试消费者

public class TestConsumer {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("consumer.xml");
}
}

3.4.3 主题模式与spring的集成

只需要将配置文件中的目标对象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主题模式下,一定要先启动消费者。

第四章 ActiveMQ集群

4.1 集群方式

客户端集群:让多个消费者消费同一个队列
Broker clusters:多个Broker之间同步消息
Master Slave(主从):实现高可用

4.2 客户端配置

4.2.1. ActiveMQ失效转移(failover):

定义:允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其它消息服务器
语法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions参数说明
randomize 默认为true,表示在uri列表中选择uri连接时,是否采用随机策略
initialReconnectDelay 默认为10,单位毫秒,表示第一尝试重连之间等待的时间
maxReconnectionDelay 默认30000,单位毫秒,最长重连的时间间隔

4.3 Broker Cluster集群配置

1. 原理:
这里写图片描述
2. NetworkConnector(网络连接器)
网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息
分为静态连接器和动态连接器

3. 静态连接器:适用连接地址不多的情况

<networkConnectors>
<networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)">
networkConnectors>

4. 动态连接器

<networkConnectors>
<networkConnector uri="multicast://default">
networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default">
transportConnectors>

4.4 Master/Slave集群配置

1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本删除了)
Share storage master/slave 共享存储
Replicated LevelDB Store 基于可复制的LevelDB Store

2. 共享存储集群的原理
先启动A,A就因为排他锁独占资源成为Master,此时A有外部服务能力,而B没有
这里写图片描述
如果A挂了,则B获取资源成为Master,这时所有请求都会交给B
这里写图片描述

3. 基于复制的LevelDB Store的原理
因为是基于ZooKeeper的,所以至少需要3劝服务器。zk选举A作为Master后,A就具有了外部服务能力,而B、C没有。当A获取到外部资源存储后,会通过zk将资源同步到B和C。
这里写图片描述
如果A故障,则zk会重新选举一个节点作为Master

4.5 Broker clusters和Master Slave对比

. 高可用 负载均衡
Master/Slave
Broker Cluster

4.6 高可用且负载均衡的集群方案

这里写图片描述

第五章 消息中间件如何传对象

利用Json


推荐阅读
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
    HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 在iOS开发中,基于HTTPS协议的安全网络请求实现至关重要。HTTPS(全称:HyperText Transfer Protocol over Secure Socket Layer)是一种旨在提供安全通信的HTTP扩展,通过SSL/TLS加密技术确保数据传输的安全性和隐私性。本文将详细介绍如何在iOS应用中实现安全的HTTPS网络请求,包括证书验证、SSL握手过程以及常见安全问题的解决方法。 ... [详细]
  • 本文以 www.域名.com 为例,详细介绍如何为每个注册用户提供独立的二级域名,如 abc.域名.com。实现这一功能的核心步骤包括:首先,确保域名支持泛解析,即将 A 记录设置为 *.域名.com,以便将所有二级域名请求指向同一服务器。接着,在服务器端使用 ASP.NET 2.0 进行配置,通过解析 HTTP 请求中的主机头信息,动态识别并处理不同的二级域名,从而实现个性化内容展示。此外,还需在数据库中维护用户与二级域名的对应关系,确保每个用户的二级域名都能正确映射到其专属内容。 ... [详细]
  • 掌握PHP框架开发与应用的核心知识点:构建高效PHP框架所需的技术与能力综述
    掌握PHP框架开发与应用的核心知识点对于构建高效PHP框架至关重要。本文综述了开发PHP框架所需的关键技术和能力,包括但不限于对PHP语言的深入理解、设计模式的应用、数据库操作、安全性措施以及性能优化等方面。对于初学者而言,熟悉主流框架如Laravel、Symfony等的实际应用场景,有助于更好地理解和掌握自定义框架开发的精髓。 ... [详细]
  • 基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
    一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ... [详细]
  • 本文详细介绍了如何解决DNS服务器配置转发无法解析的问题,包括编辑主配置文件和重启域名服务的具体步骤。 ... [详细]
  • 单片微机原理P3:80C51外部拓展系统
      外部拓展其实是个相对来说很好玩的章节,可以真正开始用单片机写程序了,比较重要的是外部存储器拓展,81C55拓展,矩阵键盘,动态显示,DAC和ADC。0.IO接口电路概念与存 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • 该大学网站采用PHP和MySQL技术,在校内可免费访问某些外部收费资料数据库。为了方便学生校外访问,建议通过学校账号登录实现免费访问。具体方案可包括利用学校服务器作为代理,结合身份验证机制,确保合法用户在校外也能享受免费资源。 ... [详细]
  • 在探讨Hibernate框架的高级特性时,缓存机制和懒加载策略是提升数据操作效率的关键要素。缓存策略能够显著减少数据库访问次数,从而提高应用性能,特别是在处理频繁访问的数据时。Hibernate提供了多层次的缓存支持,包括一级缓存和二级缓存,以满足不同场景下的需求。懒加载策略则通过按需加载关联对象,进一步优化了资源利用和响应时间。本文将深入分析这些机制的实现原理及其最佳实践。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 本文推荐了六款高效的Java Web应用开发工具,并详细介绍了它们的实用功能。其中,分布式敏捷开发系统架构“zheng”项目,基于Spring、Spring MVC和MyBatis技术栈,提供了完整的分布式敏捷开发解决方案,支持快速构建高性能的企业级应用。此外,该工具还集成了多种中间件和服务,进一步提升了开发效率和系统的可维护性。 ... [详细]
author-avatar
嗯哼
很清新
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有