热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

rocketmq怎么保证数据不会重复_搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务...

本文转载自http:linkedkeeper.com本文转载自http:linkedkeeper.com本文内容参考网络,侵删本系列文章将整理到我在GitHub上的《

本文转载自 http://linkedkeeper.com

本文转载自 http://linkedkeeper.com

本文内容参考网络,侵删

本系列文章将整理到我在GitHub上的《Java面试指南》仓库,更多精彩内容请到我的仓库里查看

https://github.com/h2pl/Java-Tutorial

喜欢的话麻烦点下Star哈

文章首发于我的个人博客:

http://www.how2playlife.com

该系列博文会告诉你什么是分布式系统,这对后端工程师来说是很重要的一门学问,我们会逐步了解常见的分布式技术、以及一些较为常见的分布式系统概念,同时也需要进一步了解zookeeper、分布式事务、分布式锁、负载均衡等技术,以便让你更完整地了解分布式技术的具体实战方法,为真正应用分布式技术做好准备。

如果对本系列文章有什么建议,或者是有什么疑问的话,也可以关注公众号【Java技术江湖】联系作者,欢迎你参与本系列博文的创作和修订。

初步认识RocketMQ的核心模块

9a38110fde5c11a221e874c6a89ea90a.png

rocketmq模块

rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。

rocketmq-client:提供发送、接受消息的客户端API。

rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。(有点NameNode的味道)

rocketmq-common:通用的一些类,方法,数据结构等

rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议

rocketmq-store:消息、索引存储等

rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】

rocketmq-tools:命令行工具

分布式消息队列RocketMQ--事务消息--解决分布式事务

说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?

一般的思路都是通过消息中间件来实现“最终一致性”:A系统扣钱,然后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢? 还是先发送消息,后update DB?

假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

所以,这里下个结论: 只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都是有问题的。

那这个问题怎么解决呢?

错误的方案0

有人可能想到了,我可以把“发送消息”这个网络调用和update DB放在同1个事务里面,如果发送消息失败,update DB自动回滚。这样不就保证2个操作的原子性了吗?

这个方案看似正确,其实是错误的,原因有2:

(1)网络的2将军问题:发送消息失败,发送方并不知道是消息中间件真的没有收到消息呢?还是消息已经收到了,只是返回response的时候失败了?

如果是已经收到消息了,而发送端认为没有收到,执行update db的回滚操作。则会导致A账号的钱没有扣,B账号的钱却加了。

(2)把网络调用放在DB事务里面,可能会因为网络的延时,导致DB长事务。严重的,会block整个DB。这个风险很大。

基于以上分析,我们知道,这个方案其实是错误的!

方案1–业务方自己实现

假设消息中间件没有提供“事务消息”功能,比如你用的是Kafka。那如何解决这个问题呢?

解决方案如下: (1)Producer端准备1张消息表,把update DB和insert message这2个操作,放在一个DB事务里面。

(2)准备一个后台程序,源源不断的把消息表中的message传送给消息中间件。失败了,不断重试重传。允许消息重复,但消息不会丢,顺序也不会打乱。

(3)Consumer端准备一个判重表。处理过的消息,记在判重表里面。实现业务的幂等。但这里又涉及一个原子性问题:如果保证消息消费 + insert message到判重表这2个操作的原子性?

消费成功,但insert判重表失败,怎么办?关于这个,在Kafka的源码分析系列,第1篇, exactly once问题的时候,有过讨论。

通过上面3步,我们基本就解决了这里update db和发送网络消息这2个操作的原子性问题。

但这个方案的一个缺点就是:需要设计DB消息表,同时还需要一个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合额外增加业务方的负担。

方案2 – RocketMQ 事务消息

为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了2个阶段:Prepare阶段和确认阶段。

具体来说,上面的2个步骤,被分解成3个步骤: (1) 发送Prepared消息 (2) update DB (3) 根据update DB结果成功或失败,Confirm或者取消Prepared消息。

可能有人会问了,前2步执行成功了,最后1步失败了怎么办?这里就涉及到了RocketMQ的关键点:RocketMQ会定期(默认是1分钟)扫描所有的Prepared消息,询问发送方,到底是要确认这条消息发出去?还是取消此条消息?

具体代码实现如下:

也就是定义了一个checkListener,RocketMQ会回调此Listener,从而实现上面所说的方案。

// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

然后执行本地事务,具体代码如下

public TransactionSendResult sendMessageInTransaction(.....) {// 逻辑代码,非实际代码// 1.发送消息sendResult = this.send(msg);// sendResult.getSendStatus() == SEND_OK// 2.如果消息发送成功,处理与消息关联的本地事务单元LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);// 3.结束事务this.endTransaction(sendResult, localTransactionState, localException);
}

a617282cb2b7f4462c38feca36ec4498.png

上面所说的消息中间件上注册的listener,超时以后会回调producer的接口以确定事务执行情况

7ca86b82a329387dd9707ba795433418.png

总结:对比方案2和方案1,RocketMQ最大的改变,其实就是把“扫描消息表”这个事情,不让业务方做,而是消息中间件帮着做了。

至于消息表,其实还是没有省掉。因为消息中间件要询问发送方,事物是否执行成功,还是需要一个“变相的本地消息表”,记录事物执行状态。

人工介入

可能有人又要说了,无论方案1,还是方案2,发送端把消息成功放入了队列,但消费端消费失败怎么办?

消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?

答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?

对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。



推荐阅读
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • 本文目录一览:1、数据库有哪几种2、数据库软件 ... [详细]
  • 超赞!GitHub上百万下载量Java面试手册!颠覆你的认知
    金三面试不顺心,马上银四面试在即,自己复盘总觉得Java知识点很凌乱?没有合适的方法学习!今天分享这份GitHub上百万下载量Ja ... [详细]
  • Spring Cloud笔记Spring Cloud Stream消息驱动(十五)
    1.消息驱动概述1.SpringCloudStream是什么SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过Inpust和Outputs与Spri ... [详细]
  • 阿里p8用端午节3天假期整理出了Spring Cloud知识点大全,太全了
     前言:今天一觉起来,发现我们开工的日期又延迟了,虽然已经在家办公一个多礼拜了,但是由于家里的环境还是不能有很高的效率。于是干脆就对SpringCloud的一些知识点做了一些整理。 ... [详细]
  • k8s(十一)helm安装premetheus
    1prometheus基础知识1.1prometheus简介Prometheus是一套开源的系统监控报警框架。和Kubernetes类似,它也发源于Google的Borg体系,其原 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • Jmeter对RabbitMQ压力测试
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Jmeter对RabbitMQ压力测试相关的知识,希望对你有一定的参考价值。Jm ... [详细]
  • 五、RabbitMQ Java Client基本使用详解
    JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前 ... [详细]
  • 我正在使用sql-serverkafka-connect和debezium监视sqlserver数据库,但是当我发布并运行我的wo ... [详细]
  • 局限性_Scrum框架的局限性
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了Scrum框架的局限性相关的知识,希望对你有一定的参考价值。在很多工作坊的讨论中, ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 2018深入java目标计划及学习内容
    本文介绍了作者在2018年的深入java目标计划,包括学习计划和工作中要用到的内容。作者计划学习的内容包括kafka、zookeeper、hbase、hdoop、spark、elasticsearch、solr、spring cloud、mysql、mybatis等。其中,作者对jvm的学习有一定了解,并计划通读《jvm》一书。此外,作者还提到了《HotSpot实战》和《高性能MySQL》等书籍。 ... [详细]
author-avatar
大可
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有