热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink二阶段提交

Flink二阶段提交

Flink二阶段提交

  • 一、引申(什么是XA事务)
  • 二、Flink二阶段提交
    • 2.1 引入 EXACTLY_ONCE 语义
    • 2.2 Kafka幂等性和事务性
    • 2.3 Flink二阶段提交
一、引申(什么是XA事务)

XA(eXtended Architecture)是指由X/Open 组织提出的分布式交易处理的规范。XA 是一个分布式事务协议,由Tuxedo 提出,所以分布式事务也称为XA 事务。XA 协议主要定义了事务管理器TM(Transaction Manager,协调者)和资源管理器RM(Resource Manager,参与者)之间的接口。其中,资源管理器往往由数据库实现,如Oracle、DB2、MySQL,这些商业数据库都实现了XA 接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA 事务是基于两阶段提交(Two-phaseCommit,2PC)协议实现的,可以保证数据的强一致性,许多分布式关系型数据管理系统都采用此协议来完成分布式。阶段一为准备阶段,即所有的参与者准备执行事务并锁住需要的资源。当参与者Ready时,向TM 汇报自己已经准备好。阶段二为提交阶段。当TM 确认所有参与者都Ready 后,向所有参与者发送COMMIT 命令。

XA 事务允许不同数据库的分布式事务,只要参与在全局事务中的每个结点都支持XA 事务。Oracle、MySQL 和SQL Server 都支持XA 事务。

XA 事务由一个或多个资源管理器(RM)、一个事务管理器(TM)和一个应用程序(ApplicationProgram)组成。

  • 资源管理器:提供访问事务资源的方法。通常一个数据库就是一个资源管理器。

  • 事务管理器:协调参与全局事务中的各个事务。需要和参与全局事务的所有资源管理器进行通信。

  • 应用程序:定义事务的边界。

XA 事务的缺点是性能不好,且无法满足高并发场景。一个数据库的事务和多个数据库间的XA 事务性能会相差很多。因此,要尽量避免XA 事务,如可以将数据写入本地,用高性能的消息系统分发数据,或使用数据库复制等技术。只有在其他办法都无法实现业务需求,且性能不是瓶颈时才使用XA。

二、Flink二阶段提交

2.1 引入 EXACTLY_ONCE 语义

EXACTLY_ONCE语义简称EOS,指的是每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制

外部系统写入支持幂等性
外部系统支持以事务的方式写入
Flink在1.4.0版本引入了TwoPhaseCommitSinkFunction接口,并在Kafka Producer的connector中实现了它,支持了对外部Kafka Sink的EXACTLY_ONCE语义。

详见:End-to-End Exactly-Once Processing in Apache Flink

2.2 Kafka幂等性和事务性

在kafka 0.11版本中已经提出,kafka 将对事务和幂等性的支持,使得kafka 端到端exactly once语义成为可能。幂等性与事务性都是Kafka发展过程中非常重要的。

在正常情况下,produce向Broker投递消息,broker将消息追加写到对应的流(即某一个topic的某一partition)中,并向Producer返回ACK信号,表示确认收到。

1、幂等性的实现
kafka 为了实想幂等性,他在底层的设计架构中引入了Producer和SequenceNumber。
(1)、ProducerID:在每一个新的Producer初始化时,或被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
(2)、sequenceNumber:对于每个producerID,Producer发送数据的每个Topic和Partition都对饮一个从0开始递增的SequenceNumber值。
2、当引入幂等性后解决的问题。
同样的数据发送到kafka中会对数据增加Pid 和sequenceId

2、事务

在数据端对端数据保证中,另个一个我们特别关注的问题就是事务。即原子性操作。对应的结果是同时成功或者同时失败,kafka的事务注重的生产和消费的的原子性操作。典型的例子为。
一系列的Producer生产消息和消费消息提交Offsets的操作在一个事务中。

例如产生的场景包括:

(1)、producer多次发送消息封装在一个原子性操作,即要求同时成功,或者同时失败。
(2)、在消费者&生产者的模式下,因为consumer在 commit offsets出现问题时,导致重复消费消息时,需要将这个模式下的Consumer和Commit offsets操作和Producer一系列生产消息的操作封装成一个原子性操作。
(3)、kafka的事务总体可以分为三方面的内容:

  • 1)、只有Producer生产消息,这种场景需要事务的介入;
  • 2)、消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  • 3)、但是只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。

事务提供的5种API方法:

org.apache.kafka.clients.producer.Producer<K,V>接口中:
// 1. 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 2. 开启事务
void beginTransaction() throws ProducerFencedException;
// 3. 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4. 提交事务
void commitTransaction() throws ProducerFencedException;
// 5. 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

详见:kafka的幂等性和事务性

2.3 Flink二阶段提交

场景描述

两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。

2PC介绍:

2PC 在分布式系统中,为了让每个节点能够感知其他所有节点的事务执行情况,需要我们引入一个中心节点来统一所有节点的执行逻辑和进度,这个中心节点叫做协调者(Coordinator),而其中向中心节点汇报或者被中心节点调度的其他节点叫做参与者(Participant)

2PC原理

①、请求阶段

  • 1、协调者向所有参与者发送准备请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
  • 2、参与者执行事务中的包含操作,并记录undo日志(用于回滚)和redo日志(用于重放),但是不真正提交。
  • 3、参与者向协调者返回事务才做的执行结果,执行陈工返回yes,否则返回no.

②、提交阶段(分成成功失败两种情况)

情况1: 若所有的参与者都返回yes,说明事务可以提交。

  • 1、协调者向所有参与者发送commit请求。
  • 2、参与者收到commit 请求后,将事务真正的提交上去,并释放占用的事务资源,并向协调者返回ack。
  • 3、协调者收到所有参与者ack消息,事务成功完成。

情况2:若有参与者返回no或者超时未返回,说明事务终端,需要回滚。

  • 1、协调者向所有参与者发送rollback请求。
  • 2、参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
  • 3、协调者收到所有参与者的ack消息,事务回滚完成。

在这里插入图片描述

2pc 的优缺点

2PC的优点在于原理非常简单,容易理解及实现。
缺点主要有3个,列举如下:
(1)协调者存在单点问题。如果协调者挂了,整个2PC逻辑就彻底不能运行。
(2)、执行过程是完全同步的。各参与者在等待其他参与者响应的过程中都处于阻塞状态,大并发下有性能问题。
(3)、仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了commit请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。
不过,现在人们在分布式一致性领域做了很多工作,以ZooKeeper为代表的分布式协调框架也数不胜数,2PC有了这些的加持,可靠性大大提升了,也就能够真正用在要求高的生产环境中了。

Flink基于2PC的实现

2PC 的最常见应用场景其实是关系型数据库,比如mysql InnoDB 存储引擎的XA事务系统。
Flink作为流式处理引擎,自然也提供了对exactly once语义的保证。flink的内部意图检查点机制和轻量级分布式快照算法ABS 保证exactly once .。二我们要实现端到端的精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

在Spark Streaming中,要实现事务性写入完全靠用户自己,框架本身并没有提供任何实现。但是在Flink中提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。

在这里插入图片描述

flink 官方推荐所有需要保证exactly once 的sink 逻辑都继承该抽象类。它具体定义如下四个抽象方法。需要我们去在子类中实现。

// 开始一个事务,返回事务信息的句柄
protected abstract TXN beginTransaction() throws Exception;
// 预提交(即提交请求)阶段的逻辑
protected abstract void preCommit(TXN transaction) throws Exception;

// 正式提交阶段的逻辑
protected abstract void commit(TXN transaction);
// 取消事务
protected abstract void abort(TXN transaction);


public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<
IN,
FlinkKafkaProducer.KafkaTransactionState,
FlinkKafkaProducer.KafkaTransactionContext>
{
/**
* Semantics that can be chosen.
*

  • {@link #EXACTLY_ONCE}
    *
  • {@link #AT_LEAST_ONCE}
    *
  • {@link #NONE}
    */
    public enum Semantic {
    /**
    * Semantic.EXACTLY_ONCE the Flink producer will write all messages in a Kafka transaction
    * that will be committed to Kafka on a checkpoint.
    *
    *

    In this mode {@link FlinkKafkaProducer} sets up a pool of {@link
    * FlinkKafkaInternalProducer}. Between each checkpoint a Kafka transaction is created,
    * which is committed on {@link
    FlinkKafkaProducer#notifyCheckpointComplete(long)}
    . If
    * checkpoint complete notifications are running late, {@link FlinkKafkaProducer} can run
    * out of {@link FlinkKafkaInternalProducer}s in the pool. In that case any subsequent
    * {@link FlinkKafkaProducer#snapshotState(FunctionSnapshotContext)} requests will fail and
    * {@link FlinkKafkaProducer} will keep using the {@link FlinkKafkaInternalProducer} from
    * the previous checkpoint. To decrease the chance of failing checkpoints there are four
    * options:
    *

  • decrease number of max concurrent checkpoints
    *
  • make checkpoints more reliable (so that they complete faster)
    *
  • increase the delay between checkpoints
    *
  • increase the size of {@link FlinkKafkaInternalProducer}s pool
    */
    EXACTLY_ONCE,
    /**
    * Semantic.AT_LEAST_ONCE the Flink producer will wait for all outstanding messages in the
    * Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
    */

    AT_LEAST_ONCE,
    /**
    * Semantic.NONE means that nothing will be guaranteed. Messages can be lost and/or
    * duplicated in case of failure.
    */

    NONE
    }

    下面以Flink与Kafka的集成来说明2PC的具体流程。注意这里的Kafka版本必须是0.11及以上,因为只有0.11+的版本才支持幂等producer以及事务性,从而2PC才有存在的意义。Kafka内部事务性的机制如下框图所示。

    flink 实现两阶段提交具体实现为:
    FlinkKafkaProducer.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。

    Flink版本:1.13.6

    @Override
    protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
    if (transaction.isTransactional()) {
    try {
    transaction.producer.commitTransaction();
    } finally {
    recycleTransactionalProducer(transaction.producer);
    }
    }
    }

    该方法的调用点位于 TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中,顾名思义,当所有的检查点都成功后,会调用这个方法。

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
    // the following scenarios are possible here
    //
    // (1) there is exactly one transaction from the latest checkpoint that
    // was triggered and completed. That should be the common case.
    // Simply commit that transaction in that case.
    //
    // (2) there are multiple pending transactions because one previous
    // checkpoint was skipped. That is a rare case, but can happen
    // for example when:
    //
    // - the master cannot persist the metadata of the last
    // checkpoint (temporary outage in the storage system) but
    // could persist a successive checkpoint (the one notified here)
    //
    // - other tasks could not persist their status during
    // the previous checkpoint, but did not trigger a failure because they
    // could hold onto their state and could successfully persist it in
    // a successive checkpoint (the one notified here)
    //
    // In both cases, the prior checkpoint never reach a committed state, but
    // this checkpoint is always expected to subsume the prior one and cover all
    // changes since the last successful one. As a consequence, we need to commit
    // all pending transactions.
    //
    // (3) Multiple transactions are pending, but the checkpoint complete notification
    // relates not to the latest. That is possible, because notification messages
    // can be delayed (in an extreme case till arrive after a succeeding checkpoint
    // was triggered) and because there can be concurrent overlapping checkpoints
    // (a new one is started before the previous fully finished).
    //
    // ==> There should never be a case where we have no pending transaction here
    //
    Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator =
    pendingCommitTransactions.entrySet().iterator();
    Throwable firstError = null;
    while (pendingTransactionIterator.hasNext()) {
    Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
    Long pendingTransactionCheckpointId = entry.getKey();
    TransactionHolder<TXN> pendingTransaction = entry.getValue();
    if (pendingTransactionCheckpointId > checkpointId) {
    continue;
    }
    LOG.info(
    "{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
    name(),
    checkpointId,
    pendingTransaction,
    pendingTransactionCheckpointId);
    logWarningIfTimeoutAlmostReached(pendingTransaction);
    try {
    commit(pendingTransaction.handle);
    } catch (Throwable t) {
    if (firstError == null) {
    firstError = t;
    }
    }
    LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
    pendingTransactionIterator.remove();
    }
    if (firstError != null) {
    throw new FlinkRuntimeException(
    "Committing one of transactions failed, logging first encountered failure",
    firstError);
    }
    }

    从代码中可以看出,该方法每次从赈灾等待提交的事务句柄中取出一个,检查他的检查点ID,并调用commit()方法提交,这个阶段流程图为:

    在这里插入图片描述

    可见,只有在所有的检查点都成功的这个前提下,写入才会成功。这符合前文描述2PC的流程。其中jobmanager为协调者,各个算子为参与者,并且中有sink一个参与者会执行提交。一旦有了检查点失败,notifyCheckpointComplete()方法不会执行,如果重试不成功,则最后会调用abort()方法回滚事务,如下:

    @Override
    protected void abort(FlinkKafkaProducer.KafkaTransactionState transaction) {
    if (transaction.isTransactional()) {
    transaction.producer.abortTransaction();
    recycleTransactionalProducer(transaction.producer);
    }
    }

    参考list:


    • End-to-End Exactly-Once Processing in Apache Flink
    • kafka的幂等性和事务性

  • 版权声明:本文为liuwei0376原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。
    原文链接:https://blog.csdn.net/liuwei0376/article/details/126259056
    推荐阅读
    • 在Linux系统中避免安装MySQL的简易指南
      在Linux系统中避免安装MySQL的简易指南 ... [详细]
    • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
    • 服务器部署中的安全策略实践与优化
      服务器部署中的安全策略实践与优化 ... [详细]
    • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
    • 在最近的项目中,我们广泛使用了Qt框架的网络库,过程中遇到了一些挑战和问题。本文旨在记录这些经验和解决方案,以便日后参考。鉴于我们的客户端GUI完全基于Qt开发,我们期望利用其强大的网络功能进行Fiddler网络数据包的捕获与分析,以提升开发效率和应用性能。 ... [详细]
    • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
    • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
      在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
    • 基于试题数据的智能化管理平台采用Java语言进行面向对象编程,旨在构建一个高效的试题信息管理系统。该系统在JDK 6.0和MyEclipse 10.6环境下开发,通过优化试题数据管理和处理流程,提升系统的稳定性和用户体验。平台支持多用户操作,具备强大的数据处理能力和灵活的扩展性,适用于各类教育机构和考试组织。 ... [详细]
    • Web开发框架概览:Java与JavaScript技术及框架综述
      Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
    • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
    • 通过利用代码自动生成技术,旨在减轻软件开发的复杂性,缩短项目周期,减少冗余代码的编写,从而显著提升开发效率。该方法不仅能够降低开发人员的工作强度,还能确保代码的一致性和质量。 ... [详细]
    • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
    • 触发器的稳态数量分析及其应用价值
      本文对数据库中的SQL触发器进行了稳态数量的详细分析,探讨了其在实际应用中的重要价值。通过研究触发器在不同场景下的表现,揭示了其在数据完整性和业务逻辑自动化方面的关键作用。此外,还介绍了如何在Ubuntu 22.04环境下配置和使用触发器,以及在Tomcat和SQLite等平台上的具体实现方法。 ... [详细]
    • 在深入掌握Spring框架的事务管理之前,了解其背后的数据库事务基础至关重要。Spring的事务管理功能虽然强大且灵活,但其核心依赖于数据库自身的事务处理机制。因此,熟悉数据库事务的基本概念和特性是必不可少的。这包括事务的ACID属性、隔离级别以及常见的事务管理策略等。通过这些基础知识的学习,可以更好地理解和应用Spring中的事务管理配置。 ... [详细]
    • MySQL数据库安装图文教程
      本文详细介绍了MySQL数据库的安装步骤。首先,用户需要打开已下载的MySQL安装文件,例如 `mysql-5.5.40-win32.msi`,并双击运行。接下来,在安装向导中选择安装类型,通常推荐选择“典型”安装选项,以确保大多数常用功能都能被正确安装。此外,文章还提供了详细的图文说明,帮助用户顺利完成整个安装过程,确保数据库系统能够稳定运行。 ... [详细]
    author-avatar
    了不起的老狐_226
    这个家伙很懒,什么也没留下!
    PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
    Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有