热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinkcheckpoint源码分析二

概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程详细参考:

概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程
详细参考:https://blog.csdn.net/weixin_40809627/article/details/108537480

本篇文章将接着上篇文章,继续介绍 flink checkPoint 的检查点快照、本地状态存储、checkpoint的确认、和状态恢复等过程。


一、存储检查点状态快照

在task 触发了chckpoint 之后,对于Task而言,最重要的就是将当前 Task 中所有算子的状态快照(state snapshot)储存到外部存储系统的。外部系统可能是一个分布式文件系统,也可能是JobManager内存中。

在 StreamTask.performCheckpoint 方法中,开始进行 checkpoint 操作,这里主要分为三部分:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照:

class StreamTask {private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {final long checkpointId = checkpointMetaData.getCheckpointId();final boolean result;synchronized (lock) {if (isRunning) {if (checkpointOptions.getCheckpointType().isSynchronous()) {syncSavepointLatch.setCheckpointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(checkpointId);// Step (2): Send the checkpoint barrier downstreamoperatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not// impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);result = true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter>> recordWriter : recordWriters) {try {recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}result = false;}}if (isRunning && syncSavepointLatch.isSet()) {//保存 savepoint,等待 checkpoint 确认完成final boolean checkpointWasAcked =syncSavepointLatch.blockUntilCheckpointIsAcknowledged();if (checkpointWasAcked) {finishTask();}}return result;}
}

在介绍如何存储检查点快照之前,先了解下相关checkpoint 存储相关的一些类,简单地来说,CheckpointStorage是对状态存储系统地抽象,它有两个不同的实现,分别是MemoryBackendCheckpointStorage 和 FsCheckpointStorage。MemoryBackendCheckpointStorage 会将所有算子的检查点状态存储在 JobManager 的内存中,通常不适合在生产环境中使用;而 FsCheckpointStorage 则会把所有算子的检查点状态持久化存储在文件系统中。 CheckpointStorageLocation 是对检查点状态存储位置的一个抽象。它呢能过提供获取检查点输出流的方法,通过输出流将状态和元数据写入到存储系统中。输出流关闭时,可以获得状态句柄StateHandle),后面可以使用句柄重新读取写入的状态。

下面时执行状态快照主要逻辑
在这里插入图片描述
每个算个的快照被抽象为OperatorSnapshotFutures,包含了operator state 和 keyed state 的快照结果:
检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask 可能包含多个算子,因而内部使用一个ap 维护 OperatorID -> OperatorSnapshotFutures 的关系。CheckpointingOperation 中,快照操作分为两个阶段,第一个阶段同步执行的,第二个阶段异步执行的。
在这里插入图片描述

class StreamTask {private static final class CheckpointingOperation {//OperatorID -> OperatorSnapshotFuturesprivate final Map operatorSnapshotsInProgress;//执行检查点快照public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {//1. 同步执行的部分for (StreamOperator op : allOperators) {checkpointStreamOperator(op);}//2. 异步执行的部分// checkpoint 可以配置成同步执行,也可以配置成异步执行的// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);} catch (Exception ex) {........}}@SuppressWarnings("deprecation")private void checkpointStreamOperator(StreamOperator op) throws Exception {if (null != op) {// 调用 StreamOperator.snapshotState 方法进行快照// 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完OperatorSnapshotFutures snapshotInProgress = op.snapshotState(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions,storageLocation);operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);}}}
}

在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态: 具体参考代码snapshotState
在这里插入图片描述

现在我们已经看到 checkpoint 操作是如何同用户自定义函数建立关联的了,接下来我们来看看由 Flink 托管的状态是如何写入存储系统的,即:
在这里插入图片描述

operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 keyed state

首先来看operator state。DefaultOperatorStateBackend 将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy 完成。首先会对当前注册的所有的operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括: 1)、打开输出流 2)、写入状态元数据信息 3)、写入状态 4)、关闭输出流,获得状态句柄。如果不启动异步checkpoint模式,那么这个FutureTask 在同步阶段就会立刻执行。
在这里插入图片描述
keyed state 写入的基本流程和此相似,但由于keyed state 在存储时有多种实现,包括基于堆内存和RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。另外,Flink 自 1.5.0 版本还引入了一个本地状态存储的优化,支持在 TaskManager 的本地保存一份 keyed state,试图优化状态恢复的速度和网络开销。
至此,我们介绍快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为AsyncCheckpointRunnable ,主要的操作包括 1)、执行同步阶段创建FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。

class StreamTask {protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {@Overridepublic void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());// 完成每一个 operator 的状态写入// 如果是同步 checkpoint,那么在此之前状态已经写入完成// 如果是异步 checkpoint,那么在这里才会写入状态for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) {OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();// finalize the async part of all by executing all snapshot runnablesOperatorSnapshotFinalizer finalizedSnapshots =new OperatorSnapshotFinalizer(snapshotInProgress);jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getJobManagerOwnedState());localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,finalizedSnapshots.getTaskLocalState());}final long asyncEndNanos = System.nanoTime();final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {//报告 snapshot 完成reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates,localTaskOperatorSubtaskStates,asyncDurationMillis);} else {LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",owner.getName(),checkpointMetaData.getCheckpointId());}} catch (Exception e) {handleExecutionException(e);} finally {owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}}}private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot,TaskStateSnapshot localTaskStateSnapshot,long asyncDurationMillis) {TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();boolean hasLocalState = localTaskStateSnapshot.hasState();// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state// to stateless tasks on restore. This enables simple job modifications that only concern// stateless without the need to assign them uids to match their (always empty) states.taskStateManager.reportTaskStateSnapshots(checkpointMetaData,checkpointMetrics,hasAckState ? acknowledgedTaskStateSnapshot : null,hasLocalState ? localTaskStateSnapshot : null);}
}public class TaskStateManagerImpl implements TaskStateManager {@Overridepublic void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);//发送 ACK 响应给 CheckpointCoordinatorcheckpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);}
}

二、本地状态存储

所谓本地状态存储,即在存储检查点快照时,在Task 所在的TaskManager 本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对 keyed state,我们以较为简单的 HeapKeyedStateBackend 为例,看看本地状态存储时如何实现的

class HeapSnapshotStrategyextends AbstractSnapshotStrategy implements SnapshotStrategySynchronicityBehavior {@Nonnull@Overridepublic RunnableFuture> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory primaryStreamFactory,@Nonnull CheckpointOptions checkpointOptions) throws IOException {......//创建 CheckpointStreamWithResultProviderfinal SupplierWithException checkpointStreamSupplier =localRecoveryConfig.isLocalRecoveryEnabled() ?() -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId,CheckpointedStateScope.EXCLUSIVE,primaryStreamFactory,localRecoveryConfig.getLocalStateDirectoryProvider()) :() -> CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE,primaryStreamFactory);........}
}

其中关键的一点在于,根据是否启用本地状态恢复创建不同的

CheckpointStreamWithResultProvider。
public interface CheckpointStreamWithResultProvider extends Closeable {@Nonnullstatic CheckpointStreamWithResultProvider createSimpleStream(@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}@Nonnullstatic CheckpointStreamWithResultProvider createDuplicatingStream(@Nonnegative long checkpointId,@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory,@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);try {File outFile = new File(secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),String.valueOf(UUID.randomUUID()));Path outPath = new Path(outFile.toURI());CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);//有两个输出流,primary 和 secondary,secondary 对应本地存储return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);} catch (IOException secondaryEx) {LOG.warn("Exception when opening secondary/local checkpoint output stream. " +"Continue only with the primary stream.", secondaryEx);}return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}
}

所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut 对应外部存储,而secondaryOut 对应本地存储。状态会输出两份。本地状态句柄会存储在 TaskLocalStateStore 中。


三、Checkpoint 的确认

todo 结合源码分析:https://blog.jrwang.me/2019/flink-source-code-checkpoint/


推荐阅读
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本地存储组件实现对IE低版本浏览器的兼容性支持 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案
    深入剖析Java中SimpleDateFormat在多线程环境下的潜在风险与解决方案 ... [详细]
  • 在Linux系统中,网络配置是至关重要的任务之一。本文详细解析了Firewalld和Netfilter机制,并探讨了iptables的应用。通过使用`ip addr show`命令来查看网卡IP地址(需要安装`iproute`包),当网卡未分配IP地址或处于关闭状态时,可以通过`ip link set`命令进行配置和激活。此外,文章还介绍了如何利用Firewalld和iptables实现网络流量控制和安全策略管理,为系统管理员提供了实用的操作指南。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • 在Java编程中,初始化List集合有多种高效的方法。本文介绍了六种常见的技术,包括使用常规方式、Arrays.asList、Collections.addAll、Java 8的Stream API、双重大括号初始化以及使用List.of。每种方法都有其特定的应用场景和优缺点,开发者可以根据实际需求选择最合适的方式。例如,常规方式通过直接创建ArrayList对象并逐个添加元素,适用于需要动态修改列表的情况;而List.of则提供了一种简洁的不可变列表初始化方式,适合于固定数据集的场景。 ... [详细]
  • 如何将TS文件转换为M3U8直播流:HLS与M3U8格式详解
    在视频传输领域,MP4虽然常见,但在直播场景中直接使用MP4格式存在诸多问题。例如,MP4文件的头部信息(如ftyp、moov)较大,导致初始加载时间较长,影响用户体验。相比之下,HLS(HTTP Live Streaming)协议及其M3U8格式更具优势。HLS通过将视频切分成多个小片段,并生成一个M3U8播放列表文件,实现低延迟和高稳定性。本文详细介绍了如何将TS文件转换为M3U8直播流,包括技术原理和具体操作步骤,帮助读者更好地理解和应用这一技术。 ... [详细]
  • Flowable 流程图路径与节点展示:已执行节点高亮红色标记,增强可视化效果
    在Flowable流程图中,通常仅显示当前节点,而路径则需自行获取。特别是在多次驳回的情况下,节点可能会出现混乱。本文重点探讨了如何准确地展示流程图效果,包括已结束的流程和正在执行的流程。具体实现方法包括生成带有高亮红色标记的图片,以增强可视化效果,确保用户能够清晰地了解每个节点的状态。 ... [详细]
  • 本文详细解析了 Android 系统启动过程中的核心文件 `init.c`,探讨了其在系统初始化阶段的关键作用。通过对 `init.c` 的源代码进行深入分析,揭示了其如何管理进程、解析配置文件以及执行系统启动脚本。此外,文章还介绍了 `init` 进程的生命周期及其与内核的交互方式,为开发者提供了深入了解 Android 启动机制的宝贵资料。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
author-avatar
DaybreakCP
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有