概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程
详细参考:https://blog.csdn.net/weixin_40809627/article/details/108537480
本篇文章将接着上篇文章,继续介绍 flink checkPoint 的检查点快照、本地状态存储、checkpoint的确认、和状态恢复等过程。
在task 触发了chckpoint 之后,对于Task而言,最重要的就是将当前 Task 中所有算子的状态快照(state snapshot)储存到外部存储系统的。外部系统可能是一个分布式文件系统,也可能是JobManager内存中。
在 StreamTask.performCheckpoint 方法中,开始进行 checkpoint 操作,这里主要分为三部分:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照:
class StreamTask {private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics,boolean advanceToEndOfTime) throws Exception {final long checkpointId = checkpointMetaData.getCheckpointId();final boolean result;synchronized (lock) {if (isRunning) {if (checkpointOptions.getCheckpointType().isSynchronous()) {syncSavepointLatch.setCheckpointId(checkpointId);if (advanceToEndOfTime) {advanceToEndOfEventTime();}}// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(checkpointId);// Step (2): Send the checkpoint barrier downstreamoperatorChain.broadcastCheckpointBarrier(checkpointId,checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not// impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);result = true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter
}
在介绍如何存储检查点快照之前,先了解下相关checkpoint 存储相关的一些类,简单地来说,CheckpointStorage是对状态存储系统地抽象,它有两个不同的实现,分别是MemoryBackendCheckpointStorage 和 FsCheckpointStorage。MemoryBackendCheckpointStorage 会将所有算子的检查点状态存储在 JobManager 的内存中,通常不适合在生产环境中使用;而 FsCheckpointStorage 则会把所有算子的检查点状态持久化存储在文件系统中。 CheckpointStorageLocation 是对检查点状态存储位置的一个抽象。它呢能过提供获取检查点输出流的方法,通过输出流将状态和元数据写入到存储系统中。输出流关闭时,可以获得状态句柄StateHandle),后面可以使用句柄重新读取写入的状态。
下面时执行状态快照主要逻辑
每个算个的快照被抽象为OperatorSnapshotFutures,包含了operator state 和 keyed state 的快照结果:
检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask 可能包含多个算子,因而内部使用一个ap 维护 OperatorID -> OperatorSnapshotFutures 的关系。CheckpointingOperation 中,快照操作分为两个阶段,第一个阶段同步执行的,第二个阶段异步执行的。
class StreamTask {private static final class CheckpointingOperation {//OperatorID -> OperatorSnapshotFuturesprivate final Map
}
在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态: 具体参考代码snapshotState
现在我们已经看到 checkpoint 操作是如何同用户自定义函数建立关联的了,接下来我们来看看由 Flink 托管的状态是如何写入存储系统的,即:
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 keyed state
首先来看operator state。DefaultOperatorStateBackend 将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy 完成。首先会对当前注册的所有的operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括: 1)、打开输出流 2)、写入状态元数据信息 3)、写入状态 4)、关闭输出流,获得状态句柄。如果不启动异步checkpoint模式,那么这个FutureTask 在同步阶段就会立刻执行。
keyed state 写入的基本流程和此相似,但由于keyed state 在存储时有多种实现,包括基于堆内存和RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。另外,Flink 自 1.5.0 版本还引入了一个本地状态存储的优化,支持在 TaskManager 的本地保存一份 keyed state,试图优化状态恢复的速度和网络开销。
至此,我们介绍快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为AsyncCheckpointRunnable ,主要的操作包括 1)、执行同步阶段创建FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。
class StreamTask {protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {@Overridepublic void run() {FileSystemSafetyNet.initializeSafetyNetForThread();try {TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =new TaskStateSnapshot(operatorSnapshotsInProgress.size());// 完成每一个 operator 的状态写入// 如果是同步 checkpoint,那么在此之前状态已经写入完成// 如果是异步 checkpoint,那么在这里才会写入状态for (Map.Entry
}public class TaskStateManagerImpl implements TaskStateManager {@Overridepublic void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData,@Nonnull CheckpointMetrics checkpointMetrics,@Nullable TaskStateSnapshot acknowledgedState,@Nullable TaskStateSnapshot localState) {long checkpointId = checkpointMetaData.getCheckpointId();localStateStore.storeLocalState(checkpointId, localState);//发送 ACK 响应给 CheckpointCoordinatorcheckpointResponder.acknowledgeCheckpoint(jobId,executionAttemptID,checkpointId,checkpointMetrics,acknowledgedState);}
}
所谓本地状态存储,即在存储检查点快照时,在Task 所在的TaskManager 本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对 keyed state,我们以较为简单的 HeapKeyedStateBackend 为例,看看本地状态存储时如何实现的
class HeapSnapshotStrategy
}
其中关键的一点在于,根据是否启用本地状态恢复创建不同的
CheckpointStreamWithResultProvider。
public interface CheckpointStreamWithResultProvider extends Closeable {@Nonnullstatic CheckpointStreamWithResultProvider createSimpleStream(@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}@Nonnullstatic CheckpointStreamWithResultProvider createDuplicatingStream(@Nonnegative long checkpointId,@Nonnull CheckpointedStateScope checkpointedStateScope,@Nonnull CheckpointStreamFactory primaryStreamFactory,@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);try {File outFile = new File(secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),String.valueOf(UUID.randomUUID()));Path outPath = new Path(outFile.toURI());CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);//有两个输出流,primary 和 secondary,secondary 对应本地存储return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);} catch (IOException secondaryEx) {LOG.warn("Exception when opening secondary/local checkpoint output stream. " +"Continue only with the primary stream.", secondaryEx);}return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}
}
所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut 对应外部存储,而secondaryOut 对应本地存储。状态会输出两份。本地状态句柄会存储在 TaskLocalStateStore 中。
todo 结合源码分析:https://blog.jrwang.me/2019/flink-source-code-checkpoint/