热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinkcheckpoint源码分析二

概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程详细参考:

概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程
详细参考:https://blog.csdn.net/weixin_40809627/article/details/108537480

本篇文章将接着上篇文章,继续介绍 flink checkPoint 的检查点快照、本地状态存储、checkpoint的确认、和状态恢复等过程。


一、存储检查点状态快照

在task 触发了chckpoint 之后,对于Task而言,最重要的就是将当前 Task 中所有算子的状态快照(state snapshot)储存到外部存储系统的。外部系统可能是一个分布式文件系统,也可能是JobManager内存中。

在 StreamTask.performCheckpoint 方法中,开始进行 checkpoint 操作,这里主要分为三部分:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照:

class StreamTask {private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {final long checkpointId = checkpointMetaData.getCheckpointId();final boolean result;synchronized (lock) {if (isRunning) {if (checkpointOptions.getCheckpointType().isSynchronous()) {syncSavepointLatch.setCheckpointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(checkpointId);// Step (2): Send the checkpoint barrier downstreamoperatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not// impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);result = true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter>> recordWriter : recordWriters) {try {recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}result = false;}}if (isRunning && syncSavepointLatch.isSet()) {//保存 savepoint,等待 checkpoint 确认完成final boolean checkpointWasAcked =syncSavepointLatch.blockUntilCheckpointIsAcknowledged();if (checkpointWasAcked) {finishTask();}}return result;}
}

在介绍如何存储检查点快照之前,先了解下相关checkpoint 存储相关的一些类,简单地来说,CheckpointStorage是对状态存储系统地抽象,它有两个不同的实现,分别是MemoryBackendCheckpointStorage 和 FsCheckpointStorage。MemoryBackendCheckpointStorage 会将所有算子的检查点状态存储在 JobManager 的内存中,通常不适合在生产环境中使用;而 FsCheckpointStorage 则会把所有算子的检查点状态持久化存储在文件系统中。 CheckpointStorageLocation 是对检查点状态存储位置的一个抽象。它呢能过提供获取检查点输出流的方法,通过输出流将状态和元数据写入到存储系统中。输出流关闭时,可以获得状态句柄StateHandle),后面可以使用句柄重新读取写入的状态。

下面时执行状态快照主要逻辑
在这里插入图片描述
每个算个的快照被抽象为OperatorSnapshotFutures,包含了operator state 和 keyed state 的快照结果:
检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask 可能包含多个算子,因而内部使用一个ap 维护 OperatorID -> OperatorSnapshotFutures 的关系。CheckpointingOperation 中,快照操作分为两个阶段,第一个阶段同步执行的,第二个阶段异步执行的。
在这里插入图片描述

class StreamTask {private static final class CheckpointingOperation {//OperatorID -> OperatorSnapshotFuturesprivate final Map operatorSnapshotsInProgress;//执行检查点快照public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {//1. 同步执行的部分for (StreamOperator op : allOperators) {checkpointStreamOperator(op);}//2. 异步执行的部分// checkpoint 可以配置成同步执行,也可以配置成异步执行的// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);} catch (Exception ex) {........}}@SuppressWarnings("deprecation")private void checkpointStreamOperator(StreamOperator op) throws Exception {if (null != op) {// 调用 StreamOperator.snapshotState 方法进行快照// 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完OperatorSnapshotFutures snapshotInProgress = op.snapshotState(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions,storageLocation);operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);}}}
}

在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态: 具体参考代码snapshotState
在这里插入图片描述

现在我们已经看到 checkpoint 操作是如何同用户自定义函数建立关联的了,接下来我们来看看由 Flink 托管的状态是如何写入存储系统的,即:
在这里插入图片描述

operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 keyed state

首先来看operator state。DefaultOperatorStateBackend 将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy 完成。首先会对当前注册的所有的operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括: 1)、打开输出流 2)、写入状态元数据信息 3)、写入状态 4)、关闭输出流,获得状态句柄。如果不启动异步checkpoint模式,那么这个FutureTask 在同步阶段就会立刻执行。
在这里插入图片描述
keyed state 写入的基本流程和此相似,但由于keyed state 在存储时有多种实现,包括基于堆内存和RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。另外,Flink 自 1.5.0 版本还引入了一个本地状态存储的优化,支持在 TaskManager 的本地保存一份 keyed state,试图优化状态恢复的速度和网络开销。
至此,我们介绍快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为AsyncCheckpointRunnable ,主要的操作包括 1)、执行同步阶段创建FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。

class StreamTask {protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {@Overridepublic void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());// 完成每一个 operator 的状态写入// 如果是同步 checkpoint,那么在此之前状态已经写入完成// 如果是异步 checkpoint,那么在这里才会写入状态for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// finalize the async part of all by executing all snapshot runnablesOperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {//报告 snapshot 完成reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}}}private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot,TaskStateSnapshot localTaskStateSnapshot,long asyncDurationMillis) {TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();boolean hasLocalState = localTaskStateSnapshot.hasState();// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state// to stateless tasks on restore. This enables simple job modifications that only concern// stateless without the need to assign them uids to match their (always empty) states.taskStateManager.reportTaskStateSnapshots(checkpointMetaData,checkpointMetrics,hasAckState ? acknowledgedTaskStateSnapshot : null,hasLocalState ? localTaskStateSnapshot : null);}
}public class TaskStateManagerImpl implements TaskStateManager {@Overridepublic void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);//发送 ACK 响应给 CheckpointCoordinatorcheckpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);}
}

二、本地状态存储

所谓本地状态存储,即在存储检查点快照时,在Task 所在的TaskManager 本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对 keyed state,我们以较为简单的 HeapKeyedStateBackend 为例,看看本地状态存储时如何实现的

class HeapSnapshotStrategyextends AbstractSnapshotStrategy implements SnapshotStrategySynchronicityBehavior {@Nonnull@Overridepublic RunnableFuture> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory primaryStreamFactory,@Nonnull CheckpointOptions checkpointOptions) throws IOException {......//创建 CheckpointStreamWithResultProviderfinal SupplierWithException checkpointStreamSupplier =localRecoveryConfig.isLocalRecoveryEnabled() ?() -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId,CheckpointedStateScope.EXCLUSIVE,primaryStreamFactory,localRecoveryConfig.getLocalStateDirectoryProvider()) :() -> CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE,primaryStreamFactory);........}
}

其中关键的一点在于,根据是否启用本地状态恢复创建不同的

CheckpointStreamWithResultProvider。
public interface CheckpointStreamWithResultProvider extends Closeable {@Nonnullstatic CheckpointStreamWithResultProvider createSimpleStream(@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}@Nonnullstatic CheckpointStreamWithResultProvider createDuplicatingStream(@Nonnegative long checkpointId,@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory,@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);try {File outFile = new File(secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),String.valueOf(UUID.randomUUID()));Path outPath = new Path(outFile.toURI());CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);//有两个输出流,primary 和 secondary,secondary 对应本地存储return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);} catch (IOException secondaryEx) {LOG.warn("Exception when opening secondary/local checkpoint output stream. " +"Continue only with the primary stream.", secondaryEx);}return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}
}

所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut 对应外部存储,而secondaryOut 对应本地存储。状态会输出两份。本地状态句柄会存储在 TaskLocalStateStore 中。


三、Checkpoint 的确认

todo 结合源码分析:https://blog.jrwang.me/2019/flink-source-code-checkpoint/


推荐阅读
  • 本文介绍了Android中的assets目录和raw目录的共同点和区别,包括获取资源的方法、目录结构的限制以及列出资源的能力。同时,还解释了raw目录中资源文件生成的ID,并说明了这些目录的使用方法。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • 本文讨论了在使用Git进行版本控制时,如何提供类似CVS中自动增加版本号的功能。作者介绍了Git中的其他版本表示方式,如git describe命令,并提供了使用这些表示方式来确定文件更新情况的示例。此外,文章还介绍了启用$Id:$功能的方法,并讨论了一些开发者在使用Git时的需求和使用场景。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
  • 流数据流和IO流的使用及应用
    本文介绍了流数据流和IO流的基本概念和用法,包括输入流、输出流、字节流、字符流、缓冲区等。同时还介绍了异常处理和常用的流类,如FileReader、FileWriter、FileInputStream、FileOutputStream、OutputStreamWriter、InputStreamReader、BufferedReader、BufferedWriter等。此外,还介绍了系统流和标准流的使用。 ... [详细]
  • 抽空写了一个ICON图标的转换程序
    抽空写了一个ICON图标的转换程序,支持png\jpe\bmp格式到ico的转换。具体的程序就在下面,如果看的人多,过两天再把思路写一下。 ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
  • Question该提问来源于开源项目:react-native-device-info/react-native-device-info ... [详细]
author-avatar
DaybreakCP
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有