热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rabbitmq消息确认机制

rabbitmq的消息确认机制分两部分一部分是生产端,一部分是消费端以下都是经过本人亲测得出的结论环境版本:RabbitMQ3.6.3,Erlang19.0rabbit

rabbitmq的消息确认机制分两部分

一部分是生产端,一部分是消费端

以下都是经过本人亲测得出的结论

环境版本:

RabbitMQ 3.6.3, Erlang 19.0  rabbitmq-java-client-bin-3.6.3


生产端

有两种选择,transaction   和   confirm。

transaction机制

                      //transaction 机制  
channel.txSelect();

String msg = "msg test !!!";
for(int i=0;i<10000;i++){
msg = i+" : msg test !!!";
channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());
System.out.println("publish msg "+msg);
if (i>0&&i%100==0){
//批量提交
channel.txCommit();
}

} // 若出现异常 进行 channel.txRollback(),对相应批次的msg进行重发或记录
channel.txCommit();




confirm机制

// confirm  机制 异步 通过注册listener,实现异步ack,提高性能  
channel.confirmSelect();

// 可以考虑将要发送到MQ的msg记录到SortedSet(TreeSet)中
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("+++++++++++++handleAck deliveryTag: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
// sortedSet.headSet(deliveryTag + 1).clear();
} else {
// sortedSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------handleNack deliveryTag: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
// sortedSet.headSet(deliveryTag + 1) toDo 进行重新发送,或记录下来,后续再统一发送;
} else {
// sortedSet.first(); 进行重新发送,或记录下来,后续再统一发送;
}
}
});




                  // confirm  机制  同步 
channel.confirmSelect();

String msg = "msg test !!!";
for(int i=0;i<10000;i++){
msg = i+" : msg test !!!";
channel.basicPublish(EXCHAGE, QUEUE_NAME,null,msg.getBytes());
System.out.println("publish msg "+msg);
//此处的 if 是为了实现批量confirm 能比较好的提高性能
if (i>0&&i%100==0){
if(channel.waitForConfirms()){
System.out.println("success publish msg " + (i-100)+" to "+i);
}else{
System.out.println("failed publish msg " + (i-100)+" to "+i);
i-=100;//此处-100是为了重发,也可以先记录下,之后再进行重发
}
}
}

confirm  的性能要好于transaction



消费端

首先通过producer往mq中放置了10个msg


然后通过debug断点方式consumer进行消费,但是先不ack,如下

可以看到已经被消费掉了,但是状态还是unacked


后面若一直不执行basicAck,则状态就一直保持不变

再如果我将consumer关掉,则unacked的msg又回到了ready。

这就是consumer的确认机制,来保证把消息真正的处理完后,告诉MQ,让其删除。

若consumer没有ack,则MQ就不会删除,即使consumer死掉,MQ又会把消息发给其他consumer(如果有其他consumer的话)


如下我将consumer干掉,MQ中的消息变成了


我再启动consumer,并进行2个msg的ack后,结果如下图


下图是10msg的ack后



代码中的关键的两处如下,其中注释的两行

private static void revieve() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection cOnnection= factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

QueueingConsumer cOnsumer= new QueueingConsumer(channel);
//消费消息,其中的false表示需要后面显示的调用basicAck,告诉MQ将msg删除
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String msg = new String(delivery.getBody());

System.out.println("recieve " + msg);
//显示发送ack,只有在前面进行启用显示发送ack机制后才奏效。 false代表是否multiple
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}
} catch (Exception e) {
e.printStackTrace();
}

}

上面代码basicAck中的第二个参数若是true,则表示小于等于这个tag的全部发ack,类似于批量ack。本以为这个功能能够提高ack的性能,类似于批量。

但结果不是很明显,我设的批量值是100,我的ack能达到1000/s 左右。不知是我批量值设置问题,还是本身性能就这样。





推荐阅读
  • Java 11相对于Java 8,OptaPlanner性能提升有多大?
    本文通过基准测试比较了Java 11和Java 8对OptaPlanner的性能提升。测试结果表明,在相同的硬件环境下,Java 11相对于Java 8在垃圾回收方面表现更好,从而提升了OptaPlanner的性能。 ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  • 1Lock与ReadWriteLock1.1LockpublicinterfaceLock{voidlock();voidlockInterruptibl ... [详细]
  • Jmeter对RabbitMQ压力测试
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Jmeter对RabbitMQ压力测试相关的知识,希望对你有一定的参考价值。Jm ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了源码分析--ConcurrentHashMap与HashTable(JDK1.8)相关的知识,希望对你有一定的参考价值。  Concu ... [详细]
  • 查找给定字符串的所有不同回文子字符串原文:https://www ... [详细]
author-avatar
7-好
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有