热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

启动activemq_「Java」SpringBootamp;ActiveMQ

一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,

一、消息队列

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

目前在生产环境中使用较多的消息队列有ActiveMQ、RabbitMQ、Kafka、RocketMQ等。

A、特性

  • 异步性:将耗时的同步操作以消息的方式进行异步化处理,减少了同步等待的时间;
  • 松耦合:消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行;
  • 分布式:通过对消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性;
  • 可靠性:消息队列一般会把接收到的消息存储到本地硬盘上,这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。

B、JMS规范

JMS即Java消息服务(Java Message Service)应用程序接口,是Java面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS的消息机制有2种模型,一种是Point to Point,表现为队列的形式,发送的消息,只能被一个接收者取走;另一种是Topic,可以被多个订阅者订阅,类似于群发。

ActiveMQ就是JMS的一个实现。

二、ActiveMQ介绍

ActiveMQ是Apache软件基金下的一个开源软件,它遵循JMS 1.1规范,是消息驱动中间件软件。它为企业消息传递提供高可用、出色性能、可扩展、稳定和安全保障。ActiveMQ使用Apache许可协议,因此,任何人都可以使用和修改它而不必反馈任何改变。

ActiveMQ的目标是在尽可能多的平台和语言上提供一个标准的,消息驱动的应用集成。ActiveMQ实现JMS规范并在此之上提供大量额外的特性。ActiveMQ支持队列和订阅两种模式的消息发送。

Spring Boot提供了ActiveMQ组件spring-boot-starter-activemq,用来支持ActiveMQ在Spring Boot体系内使用。

A、ActiveMQ安装

1、下载安装启动

# 安装JDK并配置环境# 下载activemq
wget http://archive.apache.org/dist/activemq/5.12.2/apache-activemq-5.12.2-bin.tar.gz# 解压安装
cd /usr/local/apache-activemq-5.12.2/bin# 启动ActiveMQ
./activemq start# web控制台
http://192.168.240.131:8161
admin/admin

2、安全配置

安装完成ActiveMQ后,任何连接到ActiveMQ的程序都可以创建和消费队列,可以通过修改配置文件conf/activemq.xml来加入身份验证,在文件的borker标签中加入:


控制台账号密码,修改conf/jetty.xml,确保authenticate的值是true。


登陆管控台的帐号和密码在conf/jetty-realm.properties文件。

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
admin: admin, admin

重启生效。

A、相关依赖

org.springframework.bootspring-boot-starter-activemq

B、配置文件

在使用ActiveMQ时有两种使用方式,一种是使用独立安装的ActiveMQ,在生产环境推荐使用这种;另一种是使用基于内存ActiveMQ ,在调试阶段建议使用这种方式。

# 基于内存
ActiveMQspring.activemq.in-memory=true# 不适应连接池
spring.activemq.pool.enabled=false# 独立安装ActiveMQ
#spring.activemq.broker-url=tcp://10.255.242.168:61616
#spring.activemq.user=admin
#spring.activemq.password=admin

三、队列(Queue)

队列发送的消息,只能被一个消费者接收。

A、创建队列

@Configuration
public class ActiveMqConfig
{@Beanpublic Queue queue(){return new ActiveMQQueue("isisiwish.test.queue");}
}

使用定义了队列queue命名为isisiwish.test.queue。

B、消息生产者

@Slf4j
@Component
public class Producer
{@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Autowiredprivate Queue queue;public void sendQueue(String msg){log.info("send queue msg : {}", msg);this.jmsMessagingTemplate.convertAndSend(this.queue, msg);}
}

JmsMessagingTemplate是Spring提供发送消息的工具类,使用JmsMessagingTemplate和创建好的queue对消息进行发送。

C、消息消费者

@Slf4j
@Component
public class ConsumerA
{@JmsListener(destination = "isisiwish.test.queue")public void receiveQueue(String text){log.info("ConsumerA queue msg : {}", text);}
}@Slf4j
@Component
public class ConsumerB
{@JmsListener(destination = "isisiwish.test.queue")public void receiveQueue(String text){log.info("ConsumerB queue msg : {}", text);}
}

使用注解@JmsListener(destination = "isisiwish.test.queue"),表示此方法监控了名为isisiwish.test.queue的队列。当队列isisiwish.test.queue中有消息发送时会触发此方法的执行,text为消息内容。

D、测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class MqActivemqQueueApplicationTests
{&#64;Autowiredprivate Producer producer;&#64;Testpublic void sendSimpleQueueMessage() throws InterruptedException{this.producer.sendQueue("Test queue message");}&#64;Testpublic void send100QueueMessage() throws InterruptedException{for (int i &#61; 0; i <100; i&#43;&#43;){this.producer.sendQueue("Test queue message " &#43; i);}Thread.sleep(1000L);}
}

当有多个消费者监听一个队列时&#xff0c;消费者会自动均衡负载的接收消息&#xff0c;并且每个消息只能有一个消费者所接收。

PS&#xff1a;控制台输出javax.jms.JMSException: peer (vm://localhost#1) stopped.报错信息可以忽略。

四、广播&#xff08;Topic&#xff09;

广播发送的消息&#xff0c;可以被多个消费者接收。

A、创建Topic

&#64;Configuration
public class ActiveMqConfig
{&#64;Beanpublic Topic topic(){return new ActiveMQTopic("isisiwish.test.topic");}
}

B、消息生产者

&#64;Slf4j
&#64;Component
public class ConsumerA
{&#64;JmsListener(destination &#61; "isisiwish.test.topic")public void receiveTopic(String text){log.info("ConsumerA topic msg : {}", text);}
}&#64;Slf4j
&#64;Component
public class ConsumerB
{&#64;JmsListener(destination &#61; "isisiwish.test.topic")public void receiveTopic(String text){log.info("ConsumerB topic msg : {}", text);}
}

C、消息消费者

&#64;Slf4j
&#64;Component
public class Producer
{&#64;Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;&#64;Autowiredprivate Topic topic;public void sendTopic(String msg){log.info("send queue msg : {}", msg);this.jmsMessagingTemplate.convertAndSend(this.topic, msg);}
}

D、测试

&#64;RunWith(SpringRunner.class)
&#64;SpringBootTest
public class MqActivemqTopicApplicationTests
{&#64;Autowiredprivate Producer producer;&#64;Testpublic void sendSimpleTopicMessage() throws InterruptedException{this.producer.sendTopic("Test Topic message");Thread.sleep(1000L);}
}

广播&#xff08;Topic&#xff09;是一个发送者多个消费者的模式&#xff0c;两个消费者都收到了发送的消息。

五、同时支持队列&#xff08;Queue&#xff09;和广播&#xff08;Topic&#xff09;

Spring Boot集成ActiveMQ的项目默认只支持队列或者广播中的一种&#xff0c;通过配置项 spring.jms.pub-sub-domain的值来控制&#xff0c;true为广播模式&#xff0c;false为队列模式&#xff0c;默认情况下支持队列模式。

如果需要在同一项目中既支持队列模式也支持广播模式&#xff0c;可以通过DefaultJmsListenerContainerFactory创建自定义的JmsListenerContainerFactory实例&#xff0c;之后在&#64;JmsListener注解中通过containerFactory属性引用它。

分别创建两个自定义的JmsListenerContainerFactory实例&#xff0c;通过pubSubDomain来控制是支持队列模式还是广播模式。

&#64;Configuration
&#64;EnableJms
public class ActiveMqConfig
{&#64;Bean("queueListenerFactory")public JmsListenerContainerFactory queueListenerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory &#61; new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);return factory;}&#64;Bean("topicListenerFactory")public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory &#61; new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(true);return factory;}&#64;Beanpublic Queue queue(){return new ActiveMQQueue("isisiwish.test.queue");}&#64;Beanpublic Topic topic(){return new ActiveMQTopic("isisiwish.test.topic");}
}

在消费者接收的方法中&#xff0c;指明使用containerFactory接收消息。

&#64;Slf4j
&#64;Component
public class ConsumerA
{&#64;JmsListener(destination &#61; "isisiwish.test.queue", containerFactory &#61; "queueListenerFactory")public void receiveQueue(String text){log.info("ConsumerA queue msg : {}", text);}&#64;JmsListener(destination &#61; "isisiwish.test.topic", containerFactory &#61; "topicListenerFactory")public void receiveTopic(String text){log.info("ConsumerA topic msg : {}", text);}
}

常用配置。

# 基于内存的ActiveMQ
#spring.activemq.in-memory&#61;true
#spring.activemq.pool.enabled&#61;false# 独立安装的ActiveMQ
spring.activemq.broker-url&#61;tcp://10.255.242.168:61616
spring.activemq.user&#61;admin
spring.activemq.password&#61;admin# 结束之前等待的时间
#spring.activemq.close-timeout&#61;15s# 等待消息发送响应的时间&#xff0c;设置为0永远等待
spring.activemq.send-timeout&#61;0# 默认情况下ActiveMQ提供的是queue模式&#xff0c;若要使用topic模式需要配置下面配置
#spring.jms.pub-sub-domain&#61;true#账号
# spring.activemq.user&#61;admin# 密码
# spring.activemq.password&#61;admin# 是否信任所有包
#spring.activemq.packages.trust-all&#61;# 要信任的特定包&#xff08;逗号分隔&#xff09;
#spring.activemq.packages.trusted&#61;# 当连接请求和满时是否阻塞&#xff0c;设置false会抛出JMSException异常
#spring.activemq.pool.block-if-full&#61;true# 如果池满&#xff0c;则在抛出异常前阻塞时间
#spring.activemq.pool.block-if-full-timeout&#61;-1ms# 是否在启动时创建连接&#xff0c;可以在启动时用于热加载
#spring.activemq.pool.create-connection-on-startup&#61;true# 是否用Pooledconnectionfactory代替普通的ConnectionFactory
#spring.activemq.pool.enabled&#61;false# 连接过期超时
#spring.activemq.pool.expiry-timeout&#61;0ms# 连接空闲超时
#spring.activemq.pool.idle-timeout&#61;30s# 连接池最大连接数
#spring.activemq.pool.max-connections&#61;1# 每个连接的有效会话的最大数目。
#spring.activemq.pool.maximum-active-session-per-connection&#61;500# 当有JMSException时尝试重新连接
#spring.activemq.pool.reconnect-on-exception&#61;true# 空闲连接清除线程之间运行的时间&#xff0c;当为负数时&#xff0c;没有空闲连接驱逐线程运行
#spring.activemq.pool.time-between-expiration-check&#61;-1ms# 是否只使用一个MessageProducer
#spring.activemq.pool.use-anonymous-producers&#61;true

ab49cc5513a91cd1c2cf566ef85e81a0.png

六、总结

消息中间件广泛应用在大型互联网架构中&#xff0c;利用消息中间件队列和广播各自的特性可以支持很多业务&#xff0c;比如群发发送短信、给单个用户发送邮件等。

ActiveMQ是一款非常流行的消息中间件&#xff0c;它的特点是部署简单、使用方便&#xff0c;比较适合中小型团队。Spring Boot提供了集成ActiveMQ对应的组件&#xff0c;在Spring Boot中使用ActiveMQ只需要添加相关注解即可。



推荐阅读
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 使用Python在SAE上开发新浪微博应用的初步探索
    最近重新审视了新浪云平台(SAE)提供的服务,发现其已支持Python开发。本文将详细介绍如何利用Django框架构建一个简单的新浪微博应用,并分享开发过程中的关键步骤。 ... [详细]
  • 深入探讨CPU虚拟化与KVM内存管理
    本文详细介绍了现代服务器架构中的CPU虚拟化技术,包括SMP、NUMA和MPP三种多处理器结构,并深入探讨了KVM的内存虚拟化机制。通过对比不同架构的特点和应用场景,帮助读者理解如何选择最适合的架构以优化性能。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • PHP 5.2.5 安装与配置指南
    本文详细介绍了 PHP 5.2.5 的安装和配置步骤,帮助开发者解决常见的环境配置问题,特别是上传图片时遇到的错误。通过本教程,您可以顺利搭建并优化 PHP 运行环境。 ... [详细]
  • 使用 Azure Service Principal 和 Microsoft Graph API 获取 AAD 用户列表
    本文介绍了一段通用代码示例,该代码不仅能够操作 Azure Active Directory (AAD),还可以通过 Azure Service Principal 的授权访问和管理 Azure 订阅资源。Azure 的架构可以分为两个层级:AAD 和 Subscription。 ... [详细]
  • 如何配置Unturned服务器及其消息设置
    本文详细介绍了Unturned服务器的配置方法和消息设置技巧,帮助用户了解并优化服务器管理。同时,提供了关于云服务资源操作记录、远程登录设置以及文件传输的相关补充信息。 ... [详细]
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • ServiceStack与Swagger的无缝集成指南
    本文详细介绍了如何在ServiceStack项目中集成Swagger,以实现API文档的自动生成和在线测试。通过本指南,您将了解从配置到部署的完整流程,并掌握如何优化API接口的开发和维护。 ... [详细]
  • 2023年京东Android面试真题解析与经验分享
    本文由一位拥有6年Android开发经验的工程师撰写,详细解析了京东面试中常见的技术问题。涵盖引用传递、Handler机制、ListView优化、多线程控制及ANR处理等核心知识点。 ... [详细]
  • 本文详细介绍了Linux系统中init进程的作用及其启动过程,解释了运行级别的概念,并提供了调整服务启动顺序的具体步骤和实例。通过了解这些内容,用户可以更好地管理系统的启动流程和服务配置。 ... [详细]
  • 微软Exchange服务器遭遇2022年版“千年虫”漏洞
    微软Exchange服务器在新年伊始遭遇了一个类似于‘千年虫’的日期处理漏洞,导致邮件传输受阻。该问题主要影响配置了FIP-FS恶意软件引擎的Exchange 2016和2019版本。 ... [详细]
author-avatar
白开水
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有