热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

ActiveMQ消息签收机制代码实例详解

这篇文章主要介绍了ActiveMQ消息签收机制代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

这篇文章主要介绍了ActiveMQ消息签收机制代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

消费者客户端成功接收一条消息的标志是:这条消息被签收。

消费者客户端成功接收一条消息一般包括三个阶段:

1、消费者接收消息,也即从MessageConsumer的receive方法返回

2、消费者处理消息

3、消息被签收

其中,第三阶段的签收可以有ActiveMQ发起,也可以由消费者客户端发起,取决于Session是否开启事务以及签收模式的设置。

在带事务的Session中,消费者客户端事务提交之时,消息自动完成签收。

在不带事务的Session中,消息何时以及如何被签收取决于Session的签收模式设置

非事务Session可以设置如下几种签收模式:

1.Session.AUTO_ACKNOWLEDGE

当消息从MessageConsumer的receive方法返回或者从MessageListener接口的onMessage方法返回时,会话自动确认消息签收

2.Session.CLIENT_ACKNOWLEDGE

需要消费者客户端主动调用acknowledge方法签收消息,这种模式实在Session层面进行签收的,签收一个已经消费的消息会自动的签收这个Session已消费的所有消息:

例如一个消费者在一个Session中消费了5条消息,然后确认第3条消息,所有这5条消息都会被签收

3.Session.DUPS_OK_ACKNOWLEDGE

这种方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.

这种方式会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息才可使用。(如果ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

带事务session的案例

  生产者

    必须在生产完数据之后手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory cOnnectionFactory= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection cOnnection= connectionFactory.createConnection();
  //启动连接
  connection.start();
  // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:创建消息生产者
  MessageProducer producer = session.createProducer(queue);
  // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 发送消息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session, producer, i);
  }
  System.out.println("发送成功!");
  session.commit();
  session.close();
  connection.close();
 }
 /**
  * 在指定的会话上,通过指定的消息生产者发出一条消息
  *
  * @param session
  *   消息会话
  * @param producer
  *   消息生产者
  */
 public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
  // 创建一条文本消息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通过消息生产者发出消息
  producer.send(message);
 }
}

  消费者

    消费完数据之后必须手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory cOnnectionFactory= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection cOnnection= connectionFactory.createConnection();
  connection.start();
  // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
  Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消费者,消息接收者
  MessageConsumer cOnsumer= session.createConsumer(queue);
  while (true) {
   //receive():获取消息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到消息:" + message.getText());
    session.commit();
   } else {
    break;
   }
  }
  //回收资源
  session.close();
  connection.close();
 }
}

不带事务session的案例

  1.自动签收

  2.手动签收

    生产者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory cOnnectionFactory= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection cOnnection= connectionFactory.createConnection();
  //启动连接
  connection.start();
  // Session: 一个发送或接收消息的线程 false:代表不带事务的session AUTO_ACKNOWLEDGE:代表自动签收
  /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
  Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值my-queue是Query的名字
  Queue queue = session.createQueue("my-queue");
  // MessageProducer:创建消息生产者
  MessageProducer producer = session.createProducer(queue);
  // 设置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化
  producer.setDeliveryMode(DeliveryMode.PERSISTENT);
  // 发送消息
  for (int i = 1; i <= 5; i++) {
   sendMsg(session, producer, i);
  }
  System.out.println("发送成功!");
  session.close();
  connection.close();
 }
 /**
  * 在指定的会话上,通过指定的消息生产者发出一条消息
  *
  * @param session
  *   消息会话
  * @param producer
  *   消息生产者
  */
 public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
  // 创建一条文本消息
  TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
  // 通过消息生产者发出消息
  producer.send(message);
     message.acknowledge();  //手动提交
  } 
}

    消费者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;

import javax.jms.*;

public class JmsReceiver {
 public static void main(String[] args) throws JMSException {
  // ConnectionFactory :连接工厂,JMS 用它创建连接
  ConnectionFactory cOnnectionFactory= new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
  // JMS 客户端到JMS Provider 的连接
  Connection cOnnection= connectionFactory.createConnection();
  connection.start();
  // Session: 一个发送或接收消息的线程 true:表单开启事务 AUTO_ACKNOWLEDGE:代表自动签收
  /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
  Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
  // Destination :消息的目的地;消息发送给谁.
  // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
  Queue queue = session.createQueue("my-queue");
  // 消费者,消息接收者
  MessageConsumer cOnsumer= session.createConsumer(queue);
  while (true) {
   //receive():获取消息
   TextMessage message = (TextMessage) consumer.receive();
   if (null != message) {
    System.out.println("收到消息:" + message.getText());
    message.acknowledge();  //手动提交
   } else {
    break;
   }
  }
  //回收资源
  session.close();
  connection.close();
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • PHP 5.5.0rc1 发布:深入解析 Zend OPcache
    2013年5月9日,PHP官方发布了PHP 5.5.0rc1和PHP 5.4.15正式版,这两个版本均支持64位环境。本文将详细介绍Zend OPcache的功能及其在Windows环境下的配置与测试。 ... [详细]
  • 本文详细介绍了IBM DB2数据库在大型应用系统中的应用,强调其卓越的可扩展性和多环境支持能力。文章深入分析了DB2在数据利用性、完整性、安全性和恢复性方面的优势,并提供了优化建议以提升其在不同规模应用程序中的表现。 ... [详细]
  • 优化联通光猫DNS服务器设置
    本文详细介绍了如何为联通光猫配置DNS服务器地址,以提高网络解析效率和访问体验。通过智能线路解析功能,域名解析可以根据访问者的IP来源和类型进行差异化处理,从而实现更优的网络性能。 ... [详细]
  • Windows服务与数据库交互问题解析
    本文探讨了在Windows 10(64位)环境下开发的Windows服务,旨在定期向本地MS SQL Server (v.11)插入记录。尽管服务已成功安装并运行,但记录并未正确插入。我们将详细分析可能的原因及解决方案。 ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 深入理解Cookie与Session会话管理
    本文详细介绍了如何通过HTTP响应和请求处理浏览器的Cookie信息,以及如何创建、设置和管理Cookie。同时探讨了会话跟踪技术中的Session机制,解释其原理及应用场景。 ... [详细]
  • 如何配置Unturned服务器及其消息设置
    本文详细介绍了Unturned服务器的配置方法和消息设置技巧,帮助用户了解并优化服务器管理。同时,提供了关于云服务资源操作记录、远程登录设置以及文件传输的相关补充信息。 ... [详细]
  • 网络运维工程师负责确保企业IT基础设施的稳定运行,保障业务连续性和数据安全。他们需要具备多种技能,包括搭建和维护网络环境、监控系统性能、处理突发事件等。本文将探讨网络运维工程师的职业前景及其平均薪酬水平。 ... [详细]
  • QUIC协议:快速UDP互联网连接
    QUIC(Quick UDP Internet Connections)是谷歌开发的一种旨在提高网络性能和安全性的传输层协议。它基于UDP,并结合了TLS级别的安全性,提供了更高效、更可靠的互联网通信方式。 ... [详细]
  • 深入理解OAuth认证机制
    本文介绍了OAuth认证协议的核心概念及其工作原理。OAuth是一种开放标准,旨在为第三方应用提供安全的用户资源访问授权,同时确保用户的账户信息(如用户名和密码)不会暴露给第三方。 ... [详细]
  • Valve 发布 Steam Deck 的新版 Windows 驱动程序
    Valve 最新发布了针对 Steam Deck 掌机的 Windows 驱动程序,旨在提升其在 Windows 环境下的兼容性、安全性和性能表现。 ... [详细]
  • 本文介绍了如何使用PHP代码实现微信平台的媒体素材上传功能,详细解释了API接口的使用方法和注意事项,确保文件路径正确以避免常见的错误。 ... [详细]
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
author-avatar
广佛笑嘻嘻_229
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有