热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink学习之路(八)——状态编程和容错机制

文章目录状态有状态的算子和应用程序算子状态(operatorstate)键控状态(keyedstate)状态一致性一致性级别

文章目录

  • 状态
    • 有状态的算子和应用程序
      • 算子状态(operator state)
      • 键控状态(keyed state)
    • 状态一致性
      • 一致性级别
      • 端到端(end-to-end)状态一致性
    • 检查点(checkpoint)
      • Flink 的检查点算法
      • Flink+Kafka 如何实现端到端的 exactly-once 语义
    • 选择一个状态后端(state backend)


状态

流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收温度读数,并在温度超过 90 度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。

  • 所有类型的窗口。例如,计算过去一小时的平均温度,就是有状态的计算。
  • 所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差 20 度以上的温度读数,则发出警告,这是有状态的计算。
  • 流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。

下图展示了无状态流处理和有状态流处理的主要区别。无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。

在这里插入图片描述
上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。

有状态的算子和应用程序

Flink 内置的很多算子,数据源 source,数据存储 sink 都是有状态的,流中的数据都是 buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction
会缓存输入流的数据,ProcessFunction 会保存设置的定时器信息等等。
在 Flink 中,状态始终与特定算子相关联。总的来说,有两种类型的状态:

  • 算子状态(operator state)
  • 键控状态(keyed state)

算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问

在这里插入图片描述Flink 为算子状态提供三种基本数据结构:

  • 列表状态(List state)
    将状态表示为一组数据的列表。
  • 联合列表状态(Union list state)
    也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
  • 广播状态(Broadcast state)
    如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应
    用广播状态。

键控状态(keyed state)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。

在这里插入图片描述
Flink 的 Keyed State 支持以下数据类型:

  • ValueState保存单个的值,值的类型为 T。
    • get 操作: ValueState.value()
    • set 操作: ValueState.update(T value)
  • ListState保存一个列表,列表里的元素的数据类型为 T。基本操作如下:
    • ListState.add(T value)
    • ListState.addAll(List values)
    • ListState.get()返回 Iterable
    • ListState.update(List values)
  • MapState保存 Key-Value 对。
    • MapState.get(UK key)
    • MapState.put(UK key, UV value)
    • MapState.contains(UK key)
    • MapState.remove(UK key)
  • ReducingState
  • AggregatingState

State.clear()是清空操作。

我们可以利用 Keyed State,实现这样一个需求:检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警。

package com.dahuan.state;import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;public class State_KeyedApplicationCaseState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism( 1 );//TODO 基于事件时间env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );//水印之间的间隔&#xff08;以毫秒为单位&#xff09;env.getConfig().setAutoWatermarkInterval( 100 );DataStream<String> inputStream &#61; env.socketTextStream( "localhost", 7777 );DataStream<SensorReading> dataStream &#61; inputStream.map( data -> {String[] split &#61; data.split( "," );return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );} );// TODO 定义一个FlapFunction,检测温度跳变,输出报警SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream &#61; dataStream.keyBy( "id" ).flatMap( new TempChangeWarning( 10.0 ) );resultStream.print();env.execute("State_KeyedApplicationCaseState");}// public static class JDBCMysql extends RichSinkFunction{
// Connection conn &#61; null;
// PreparedStatement p
// }public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {//私有属性,温度跳变预警private Double threshold;public TempChangeWarning(Double threshold){this.threshold &#61; threshold;}//定义一个状态private ValueState<Double> lastTempState;&#64;Overridepublic void open(Configuration parameters) throws Exception {//状态连接lastTempState &#61; getRuntimeContext().getState( new ValueStateDescriptor<Double>("lastTempState",Double.class) );}&#64;Overridepublic void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {//TODO 获取状态的值Double lastTemp &#61; lastTempState.value();//TODO 如果状态不为空&#xff0c;那么就判断两次温度差值if (lastTemp !&#61; null){//TODO 两个温度值比较差值double diff &#61; Math.abs( value.getTemperature() - lastTemp );//if (diff >&#61; threshold ){out.collect(new Tuple3<>(value.getId(),lastTemp,value.getTemperature()));}}//更新状态lastTempState.update( value.getTemperature() );}//TODO 清除状态&#64;Overridepublic void close() throws Exception {lastTempState.clear();}}
}

通过 RuntimeContext 注册 StateDescriptor。StateDescriptor 以状态 state 的名字和存储的数据类型为参数。
在 open()方法中创建 state 变量。注意复习之前的 RichFunction 相关知识。

状态一致性

当在分布式系统中引入状态时&#xff0c;自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法&#xff0c;也就是说在成功处理故障并恢复之后得到的结果&#xff0c;与没有发生任何故障时得到的结果相比&#xff0c;前者到底有多正确&#xff1f;举例来说&#xff0c;假设要对最近一小时登录的用户计数。在系统经历故障之后&#xff0c;计数结果是多少&#xff1f;如果有偏差&#xff0c;是有漏掉的计数还是重复计数&#xff1f;

一致性级别

在流处理中&#xff0c;一致性可以分为 3 个级别&#xff1a;

  • at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后&#xff0c;计数结果可能丢失。同样的还有 udp。
  • at-least-once: 这表示计数结果可能大于正确值&#xff0c;但绝不会小于正确值。也就是说&#xff0c;计数程序在发生故障后可能多算&#xff0c;但是绝不会少算。
  • exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。曾经&#xff0c;at-least-once 非常流行。第一代流处理器(如 Storm 和 Samza)刚问世时只保证 at-least-once&#xff0c;原因有二。
  • 保证 exactly-once 的系统实现起来更复杂。这在基础架构层(决定什么代表正确&#xff0c;以及 exactly-once 的范围是什么)和实现层都很有挑战性。
  • 流处理系统的早期用户愿意接受框架的局限性&#xff0c;并在应用层想办法弥补(例如使应用程序具有幂等性&#xff0c;或者用批量计算层再做一遍计算)。

最先保证 exactly-once 的系统(Storm Trident 和 Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证 exactly-once&#xff0c;这些系统无法单独地对每条记录运用应用逻辑&#xff0c;而是同时处理多条(一批)记录&#xff0c;保证对每一批的处理要么全部成功&#xff0c;要么全部失败。这就导致在得到结果前&#xff0c;必须等待一批记录处理结束。因此&#xff0c;用户经常不得不使用两个流处理框架(一个用来保证 exactly-once&#xff0c;另一个用来对每个元素做低延迟处理)&#xff0c;结果使基础设施更加复杂。曾经&#xff0c;用户不得不在保证exactly-once 与获得低延迟和效率之间权衡利弊。Flink 避免了这种权衡。

Flink 的一个重大价值在于&#xff0c;它既保证了 exactly-once&#xff0c;也具有低延迟和高吞吐的处理能力。从根本上说&#xff0c;Flink 通过使自身满足所有需求来避免权衡&#xff0c;它是业界的一次意义
重大的技术飞跃。尽管这在外行看来很神奇&#xff0c;但是一旦了解&#xff0c;就会恍然大悟。

端到端&#xff08;end-to-end&#xff09;状态一致性

目前我们看到的一致性保证都是由流处理器实现的&#xff0c;也就是说都是在 Flink 流处理器内部保证的&#xff1b;而在真实应用中&#xff0c;流处理应用除了流处理器以外还包含了数据源&#xff08;例如 Kafka&#xff09;和输出到持久化系统。
端到端的一致性保证&#xff0c;意味着结果的正确性贯穿了整个流处理应用的始终&#xff1b;每一个组件都保证了它自己的一致性&#xff0c;整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下&#xff1a;

内部保证 —— 依赖 checkpoint

  • source 端 —— 需要外部源可重设数据的读取位置
  • sink 端 —— 需要保证从故障恢复时&#xff0c;数据不会重复写入外部系统
    而对于 sink 端&#xff0c;又有两种具体的实现方式&#xff1a;幂等&#xff08;Idempotent&#xff09;写入和事务性&#xff08;Transactional&#xff09;写入。
  • 幂等写入
    所谓幂等操作&#xff0c;是说一个操作&#xff0c;可以重复执行很多次&#xff0c;但只导致一次结果更改&#xff0c;也就是说&#xff0c;后面再重复执行就不起作用了。
  • 事务写入
    需要构建事务来写入外部系统&#xff0c;构建的事务对应着 checkpoint&#xff0c;等到 checkpoint 真正完成的时候&#xff0c;才把所有对应的结果写入 sink 系统中。

对于事务性写入&#xff0c;具体又有两种实现方式&#xff1a;预写日志&#xff08;WAL&#xff09;和两阶段提交&#xff08;2PC&#xff09;。DataStream API 提供了 GenericWriteAheadSink 模板类TwoPhaseCommitSinkFunction 接口&#xff0c;可以方便地实现这两种方式的事务性写入。

|  |  ||--|--||  |  |

检查点&#xff08;checkpoint&#xff09;

Flink 具体如何保证 exactly-once 呢? 它使用一种被称为"检查点"&#xff08;checkpoint&#xff09;
的特性&#xff0c;在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

假设你和两位朋友正在数项链上有多少颗珠子&#xff0c;如下图所示。你捏住珠子&#xff0c;边数边拨&#xff0c;每拨过一颗珠子就给总数加一。你的朋友也这样数他们手中的珠子。当你分神忘记数到哪里时&#xff0c;怎么办呢? 如果项链上有很多珠子&#xff0c;你显然不想从头再数一遍&#xff0c;尤其是当三人的速度不一样却又试图合作的时候&#xff0c;更是如此(比如想记录前一分钟三人一共数了多少颗珠子&#xff0c;回想一下一分钟滚动窗口)。

在这里插入图片描述于是&#xff0c;你想了一个更好的办法: 在项链上每隔一段就松松地系上一根有色皮筋&#xff0c;将珠子分隔开; 当珠子被拨动的时候&#xff0c;皮筋也可以被拨动; 然后&#xff0c;你安排一个助手&#xff0c;让他在你和朋友拨到皮筋时记录总数。用这种方法&#xff0c;当有人数错时&#xff0c;就不必从头开始数。相反&#xff0c;你向其他人发出错误警示&#xff0c;然后你们都从上一根皮筋处开始重数&#xff0c;助手则会告诉每个人重数时的起始数值&#xff0c;例如在粉色皮筋处的数值是多少。

Flink 检查点的作用就类似于皮筋标记。数珠子这个类比的关键点是: 对于指定的皮筋而言&#xff0c;珠子的相对位置是确定的; 这让皮筋成为重新计数的参考点。总状态(珠子的总数)在每颗珠子被拨动之后更新一次&#xff0c;助手则会保存与每根皮筋对应的检查点状态&#xff0c;如当遇到粉色皮筋时一共数了多少珠子&#xff0c;当遇到橙色皮筋时又是多少。
当问题出现时&#xff0c;这种方法使得重新计数变得简单。

Flink 的检查点算法

Flink 检查点的核心作用是确保状态正确&#xff0c;即使遇到程序中断&#xff0c;也要正确。记住这一基本点之后&#xff0c;我们用一个例子来看检查点是如何运行的。Flink 为用户提供了用来定义状态的工具。例如&#xff0c;以下这个 Scala 程序按照输入记录的第一个字段(一个字符串)进行分组并维护第二个字段的计数状态

val stream: DataStream[(String, Int)] &#61; ...
val counts: DataStream[(String, Int)] &#61; stream.keyBy(record &#61;> record._1) .mapWithState( (in: (String, Int), state: Option[Int]) &#61;>state match {case Some(c) &#61;> ( (in._1, c &#43; in._2), Some(c &#43; in._2) )case None &#61;> ( (in._1, in._2), Some(in._2) )
})

该程序有两个算子: keyBy 算子用来将记录按照第一个元素(一个字符串)进行分组&#xff0c;根据该 key 将数据进行重新分区&#xff0c;然后将记录再发送给下一个算子: 有状态的map 算子(mapWithState)。map 算子在接收到每个元素后&#xff0c;将输入记录的第二个字段的数据加到现有总数中&#xff0c;再将更新过的元素发射出去。下图表示程序的初始状态: 输入流中的 6 条记录被检查点分割线(checkpoint barrier)隔开&#xff0c;所有的 map 算子状态均为 0(计数还未开始)。所有 key 为 a 的记录将被顶层的 map 算子处理&#xff0c;所有 key 为 b的记录将被中间层的 map 算子处理&#xff0c;所有 key 为 c 的记录则将被底层的 map 算子处理。

在这里插入图片描述
上图是程序的初始状态。注意&#xff0c;a、b、c 三组的初始计数状态都是 0&#xff0c;即三个圆柱上的值。ckpt 表示检查点分割线&#xff08;checkpoint barriers&#xff09;。每条记录在处理顺序上严格地遵守在检查点之前或之后的规定&#xff0c;例如[“b”,2]在检查点之前被处理&#xff0c;[“a”,2]则在检查点之后被处理。

当该程序处理输入流中的 6 条记录时&#xff0c;涉及的操作遍布 3 个并行实例(节点、CPU内核等)。那么&#xff0c;检查点该如何保证 exactly-once 呢?

检查点分割线和普通数据记录类似。它们由算子处理&#xff0c;但并不参与计算&#xff0c;而是会触发与检查点相关的行为。当读取输入流的数据源(在本例中与 keyBy 算子内联)

遇到检查点屏障时&#xff0c;它将其在输入流中的位置保存到持久化存储中。如果输入流来自消息传输系统(Kafka)&#xff0c;这个位置就是偏移量。Flink 的存储机制是插件化的&#xff0c;持久化存储可以是分布式文件系统&#xff0c;如 HDFS。下图展示了这个过程。

在这里插入图片描述
当 Flink 数据源(在本例中与 keyBy 算子内联)遇到检查点分界线&#xff08;barrier&#xff09;时&#xff0c;它会将其在输入流中的位置保存到持久化存储中。这让 Flink 可以根据该位置重启。

检查点像普通数据记录一样在算子之间流动。当 map 算子处理完前 3 条数据并收到检查点分界线时&#xff0c;它们会将状态以异步的方式写入持久化存储&#xff0c;如下图所示。

在这里插入图片描述
位于检查点之前的所有记录([“b”,2]、[“b”,3]和[“c”,1])被 map 算子处理之后的情况。此时&#xff0c;持久化存储已经备份了检查点分界线在输入流中的位置(备份操作发生在barrier 被输入算子处理的时候)。map 算子接着开始处理检查点分界线&#xff0c;并触发将状态异步备份到稳定存储中这个动作。 当 map 算子的状态备份和检查点分界线的位置备份被确认之后&#xff0c;

该检查点操作就可以被标记为完成&#xff0c;如下图所示。我们在无须停止或者阻断计算的条件下&#xff0c;在一个逻辑时间点(对应检查点屏障在输入流中的位置)为计算状态拍了快照。通过确保备份的状态和位置指向同一个逻辑时间点&#xff0c;后文将解释如何基于备份恢复计算&#xff0c;从而保证 exactly-once。值得注意的是&#xff0c;当没有出现故障时&#xff0c;Flink 检查点的开销极小&#xff0c;
检查点操作的速度由持久化存储的可用带宽决定。回顾数珠子的例子: 除了因为数错而需要用到皮筋之外&#xff0c;皮筋会被很快地拨过。

在这里插入图片描述
检查点操作完成&#xff0c;状态和位置均已备份到稳定存储中。输入流中的所有数据记录都已处理完成。值得注意的是&#xff0c;备份的状态值与实际的状态值是不同的。备份反映的是检查点的状态。

如果检查点操作失败&#xff0c;Flink 可以丢弃该检查点并继续正常执行&#xff0c;因为之后的某一个检查点可能会成功。虽然恢复时间可能更长&#xff0c;但是对于状态的保证依旧很有力。只有在一系列连续的检查点操作失败之后&#xff0c;Flink 才会抛出错误&#xff0c;因为这通常预示着发生了严重且持久的错误。
现在来看看下图所示的情况: 检查点操作已经完成&#xff0c;但故障紧随其后。

在这里插入图片描述在这种情况下&#xff0c;Flink 会重新拓扑(可能会获取新的执行资源)&#xff0c;将输入流倒回到上一个检查点&#xff0c;然后恢复状态值并从该处开始继续计算。在本例中&#xff0c;[“a”,2]、[“a”,2]和[“c”,2]这几条记录将被重播。

下图展示了这一重新处理过程。从上一个检查点开始重新计算&#xff0c;可以保证在剩下的记录被处理之后&#xff0c;得到的 map 算子的状态值与没有发生故障时的状态值一致。

在这里插入图片描述
Flink 将输入流倒回到上一个检查点屏障的位置&#xff0c;同时恢复 map 算子的状态值。
然后&#xff0c;Flink 从此处开始重新处理。这样做保证了在记录被处理之后&#xff0c;map 算子的状态值与没有发生故障时的一致。

Flink 检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于 Chandy-Lamport 分布式快照算法。

检查点是 Flink 最有价值的创新之一&#xff0c;因为它使 Flink 可以保证 exactly-once&#xff0c;并且不需要牺牲性能。

Flink&#43;Kafka 如何实现端到端的 exactly-once 语义

我们知道&#xff0c;端到端的状态一致性的实现&#xff0c;需要每一个组件都实现&#xff0c;对于 Flink &#43; Kafka 的数据管道系统&#xff08;Kafka 进、Kafka 出&#xff09;而言&#xff0c;各组件怎样保证 exactly-once语义呢&#xff1f;

  • 内部 —— 利用 checkpoint 机制&#xff0c;把状态存盘&#xff0c;发生故障的时候可以恢复&#xff0c;保证内部的状态一致性
  • source —— kafka consumer 作为 source&#xff0c;可以将偏移量保存下来&#xff0c;如果后续任务出现了故障&#xff0c;恢复的时候可以由连接器重置偏移量&#xff0c;重新消费数据&#xff0c;保证一致性
  • sink —— kafka producer 作为 sink&#xff0c;采用两阶段提交 sink&#xff0c;需要实现一个TwoPhaseCommitSinkFunction

内部的 checkpoint 机制我们已经有了了解&#xff0c;那 source 和 sink 具体又是怎样运行的呢&#xff1f;接下来我们逐步做一个分析。

我们知道 Flink 由 JobManager 协调各个 TaskManager 进行 checkpoint 存储&#xff0c;
checkpoint 保存在 StateBackend 中&#xff0c;默认 StateBackend 是内存级的&#xff0c;也可以改为文件级的进行持久化保存

在这里插入图片描述
当 checkpoint 启动时&#xff0c;JobManager 会将检查点分界线&#xff08;barrier&#xff09;注入数据流&#xff1b;barrier 会在算子间传递下去。

在这里插入图片描述
每个算子会对当前的状态做个快照&#xff0c;保存到状态后端。对于 source 任务而言&#xff0c;
就会把当前的 offset 作为状态保存起来。下次从 checkpoint 恢复时&#xff0c;source 任务可以重新提交偏移量&#xff0c;从上次保存的位置开始重新消费数据。

在这里插入图片描述
每个内部的 transform 任务遇到 barrier 时&#xff0c;都会把状态存到 checkpoint 里。

sink 任务首先把数据写入外部 kafka&#xff0c;这些数据都属于预提交的事务&#xff08;还不能被消费&#xff09;&#xff1b;当遇到 barrier 时&#xff0c;把状态保存到状态后端&#xff0c;并开启新的预提交事务。

在这里插入图片描述
当所有算子任务的快照完成&#xff0c;也就是这次的 checkpoint 完成时&#xff0c;JobManager 会向所有任务发通知&#xff0c;确认这次 checkpoint 完成。

当 sink 任务收到确认通知&#xff0c;就会正式提交之前的事务&#xff0c;kafka 中未确认的数据就改为“已确认”&#xff0c;数据就真正可以被消费了。

在这里插入图片描述
所以我们看到&#xff0c;执行过程实际上是一个两段式提交&#xff0c;每个算子执行完成&#xff0c;会进行“预提交”&#xff0c;直到执行完 sink 操作&#xff0c;会发起“确认提交”&#xff0c;如果执行失败&#xff0c;预提交会放弃掉。

具体的两阶段提交步骤总结如下&#xff1a;

  • 第一条数据来了之后&#xff0c;开启一个 kafka 的事务&#xff08;transaction&#xff09;&#xff0c;正常写入kafka 分区日志但标记为未提交&#xff0c;这就是“预提交”
  • jobmanager 触发 checkpoint 操作&#xff0c;barrier 从 source 开始向下传递&#xff0c;遇到barrier 的算子将状态存入状态后端&#xff0c;并通知 jobmanager
  • sink 连接器收到 barrier&#xff0c;保存当前状态&#xff0c;存入 checkpoint&#xff0c;通知jobmanager&#xff0c;并开启下一阶段的事务&#xff0c;用于提交下个检查点的数据
  • jobmanager 收到所有任务的通知&#xff0c;发出确认信息&#xff0c;表示 checkpoint 完成
  • sink 任务收到 jobmanager 的确认信息&#xff0c;正式提交这段时间的数据
  • 外部 kafka 关闭事务&#xff0c;提交的数据可以正常消费了。

所以我们也可以看到&#xff0c;如果宕机需要通过 StateBackend 进行恢复&#xff0c;只能恢复所有确认提交的操作。

选择一个状态后端(state backend)


  • MemoryStateBackend
    内存级的状态后端&#xff0c;会将键控状态作为内存中的对象进行管理&#xff0c;将它们存储在 TaskManager 的 JVM 堆上&#xff1b;而将 checkpoint 存储在 JobManager 的内存中。
  • FsStateBackend
    将 checkpoint 存到远程的持久化文件系统&#xff08;FileSystem&#xff09;上。而对于本地状态&#xff0c;跟 MemoryStateBackend 一样&#xff0c;也会存在 TaskManager 的 JVM 堆上。
  • RocksDBStateBackend
    将所有状态序列化后&#xff0c;存入本地的 RocksDB 中存储。
    注意&#xff1a;RocksDB 的支持并不直接包含在 flink 中&#xff0c;需要引入依赖&#xff1a;

<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-statebackend-rocksdb_2.12artifactId><version>1.10.1version>
dependency>

设置状态后端为 FsStateBackend&#xff0c;并配置检查点和重启策略&#xff1a;

package com.dahuan.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class State_Checkpoint {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();//1.检查点配置//状态检查点之间的时间间隔&#xff08;以毫秒为单位&#xff09;env.enableCheckpointing( 300L );//TODO 高级选项//检查点模式定义了在出现故障时系统可以保证的一致性。env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE );//设置检查点在被丢弃之前可能花费的最长时间。env.getCheckpointConfig().setCheckpointTimeout( 60000L );//并发检查点尝试的最大次数。env.getCheckpointConfig().setMaxConcurrentCheckpoints( 2 );//触发下一个检查点之前的最小暂停。env.getCheckpointConfig().setMinPauseBetweenCheckpoints( 100L );//设置是否有较新的保存点时作业恢复是否应回退到检查点。env.getCheckpointConfig().setPreferCheckpointForRecovery( true );//设置可容忍的检查点失败数&#xff0c;默认值为0&#xff0c;表示我们不容忍任何检查点失败。env.getCheckpointConfig().setTolerableCheckpointFailureNumber( 0 );//2.重启策略//重新启动要配置的策略配置//固定延迟重启/*** 生成FixedDelayRestartStrategyConfiguration。* &#64;param restartAttempts FixedDelayRestartStrategy的重启尝试次数* &#64;param delayBetweenAttempts FixedDelayRestartStrategy的重启尝试之间的延迟* &#64;return FixedDelayRestartStrategy*///TODO 每隔10秒中重启尝试3次env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, 10000L ) );//失败率重启/*** 生成FailureRateRestartStrategyConfiguration。* &#64;param failureRate作业失败之前&#xff0c;给定间隔{&#64;code failureInterval}中的最大重新启动次数* &#64;param failureInterval失败的时间间隔* &#64;param delayInterval重新启动尝试之间的延迟*///TODO 10分钟之内尝试三次&#xff0c;每次时间间隔为1分钟env.setRestartStrategy( RestartStrategies.failureRateRestart( 3, Time.minutes( 10 ), Time.minutes( 1 ) ) );env.execute( "State_Checkpoint" );}
}


推荐阅读
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文介绍了如何利用 Delphi 中的 IdTCPServer 和 IdTCPClient 控件实现高效的文件传输。这些控件在默认情况下采用阻塞模式,并且服务器端已经集成了多线程处理,能够支持任意大小的文件传输,无需担心数据包大小的限制。与传统的 ClientSocket 相比,Indy 控件提供了更为简洁和可靠的解决方案,特别适用于开发高性能的网络文件传输应用程序。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • Flowable 流程图路径与节点展示:已执行节点高亮红色标记,增强可视化效果
    在Flowable流程图中,通常仅显示当前节点,而路径则需自行获取。特别是在多次驳回的情况下,节点可能会出现混乱。本文重点探讨了如何准确地展示流程图效果,包括已结束的流程和正在执行的流程。具体实现方法包括生成带有高亮红色标记的图片,以增强可视化效果,确保用户能够清晰地了解每个节点的状态。 ... [详细]
  • 在《ChartData类详解》一文中,我们将深入探讨 MPAndroidChart 中的 ChartData 类。本文将详细介绍如何设置图表颜色(Setting Colors)以及如何格式化数据值(Formatting Data Values),通过 ValueFormatter 的使用来提升图表的可读性和美观度。此外,我们还将介绍一些高级配置选项,帮助开发者更好地定制和优化图表展示效果。 ... [详细]
  • 在Android平台中,播放音频的采样率通常固定为44.1kHz,而录音的采样率则固定为8kHz。为了确保音频设备的正常工作,底层驱动必须预先设定这些固定的采样率。当上层应用提供的采样率与这些预设值不匹配时,需要通过重采样(resample)技术来调整采样率,以保证音频数据的正确处理和传输。本文将详细探讨FFMpeg在音频处理中的基础理论及重采样技术的应用。 ... [详细]
  • POJ 2482 星空中的星星:利用线段树与扫描线算法解决
    在《POJ 2482 星空中的星星》问题中,通过运用线段树和扫描线算法,可以高效地解决星星在窗口内的计数问题。该方法不仅能够快速处理大规模数据,还能确保时间复杂度的最优性,适用于各种复杂的星空模拟场景。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 本文详细探讨了使用纯JavaScript开发经典贪吃蛇游戏的技术细节和实现方法。通过具体的代码示例,深入解析了游戏逻辑、动画效果及用户交互的实现过程,为开发者提供了宝贵的参考和实践经验。 ... [详细]
  • Java能否直接通过HTTP将字节流绕过HEAP写入SD卡? ... [详细]
  • 提升Android开发效率:Clean Code的最佳实践与应用
    在Android开发中,提高代码质量和开发效率是至关重要的。本文介绍了如何通过Clean Code的最佳实践来优化Android应用的开发流程。以SQLite数据库操作为例,详细探讨了如何编写高效、可维护的SQL查询语句,并将其结果封装为Java对象。通过遵循这些最佳实践,开发者可以显著提升代码的可读性和可维护性,从而加快开发速度并减少错误。 ... [详细]
author-avatar
榴莲味蛋筒
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有