热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink:状态编程

Flink中的状态在流处理中,数据是连续不断到来的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果,也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的

Flink 中的状态

在流处理中,数据是连续不断到来的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果,也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。


有状态算子

算子任务可以分为无状态和有状态。

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如 map、filter、flatMap。

有状态的算子任务,除当前数据之外,还需要一些其他数据(状态)来得到计算结果,如聚合算子和窗口算子。

有状态算子处理过程



  1. 算子任务接到上游发来的数据。

  2. 获取当前状态。

  3. 根据业务逻辑进行计算,更新状态。

  4. 得到计算结果,输出发送到下游任务。


状态管理

Flink 将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改。



  • 状态的访问权限。同一个分区(也就是slot)上执行的任务实例可能会包含多个 Key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。

  • 容错性。状态只保存在内存中是不够的,需要将它持久化,这样发生故障后可以恢复状态。

  • 分布式应用的横向扩展性。数据量增大时,应该对计算资源扩容,调大并行度。


状态的分类



  • 托管状态:由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现。

    • 算子状态:状态作用范围限定为当前的算子任务实例。

    • 按键分区状态:状态是根据输入流中定义的键来维护和访问的。



  • 原始状态:自定义的,Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当做最原始的字节数组来存储。


按键分区状态

支持的结构类型



  • 值状态:状态中就保存一个值。

public interface ValueState extends State {
// 获取当前状态
T value() throws IOException;
// 对状态进行更新
void update(T var1) throws IOException;
}

为了让运行时上下文清楚到底是哪个状态,需要一个状态描述器 ValueStateDescriptor 来提供状态的基本信息。



  • 列表状态:将需要保存的数据,以列表的形式组织起来。

public interface ListState extends MergingState> {
// 传入一个列表values,直接对状态进行覆盖
void update(List var1) throws Exception;
// 向列表中添加多个元素,以列表values形式传入。
void addAll(List var1) throws Exception;
}

列表状态的状态描述器是 ListStateDescriptor



  • 映射状态:把一些键值对作为状态整体保存起来。

public interface MapState extends State {
// 传入一个Key,查询对应的Value值
UV get(UK var1) throws Exception;
// 传入一个键值对,更新Key对应的Value值
void put(UK var1, UV var2) throws Exception;
// 将传入的映射map中所有键值对全部添加到映射状态中
void putAll(Map var1) throws Exception;
// 将指定Key的键值对删除
void remove(UK var1) throws Exception;
// 判断是否存在指定Key
boolean contains(UK var1) throws Exception;
// 获取映射状态中所有的键值对
Iterable> entries() throws Exception;
// 获取映射状态中所有的键
Iterable keys() throws Exception;
// 获取映射状态中所有的值
Iterable values() throws Exception;
// 获取映射状态的迭代器
Iterator> iterator() throws Exception;
// 判断映射状态是否为空
boolean isEmpty() throws Exception;
}


  • 归约状态:将归约聚合之后的值作为状态保存下来。

// 这个接口保存的是一个聚合值,调用add方法时是将新数据和之前的状态进行归约,并且用结果来更新状态。
public interface ReducingState extends MergingState {
}

// 第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。
public ReducingStateDescriptor(String name, ReduceFunction reduceFunction, Class typeClass) {}


  • 聚合状态:是一个值,用来保存添加进来的所有数据的聚合结果。

// 聚合逻辑是由在描述器中传入一个更加一般化的聚合函数
public AggregatingStateDescriptor(String name, AggregateFunction aggFunction, Class stateType) {}

代码示例

SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
stream.keyBy(event -> event.user).flatMap(new MyFlatMap()).print();

// 用于Keyed State测试
public static class MyFlatMap extends RichFlatMapFunction{
// 定义状态
ValueState myValueState;
ListState myListState;
MapState myMapState;
ReducingState myReducingState;
AggregatingState myAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
myValueState = getRuntimeContext().getState(new ValueStateDescriptor("valueState", Event.class));
myListState = getRuntimeContext().getListState(new ListStateDescriptor("listState", Event.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor("mapState", String.class, Long.class));
myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor("reducingState",
new ReduceFunction() {
@Override
public Event reduce(Event event, Event t1) throws Exception {
return new Event(event.user, event.url, t1.timestamp);
}
},
Event.class));
myAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor("aggregatingState",
new AggregateFunction() {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Event event, Long aLong) {
return aLong + 1;
}
@Override
public String getResult(Long aLong) {
return "count:" + aLong;
}
@Override
public Long merge(Long aLong, Long acc1) {
return aLong + acc1;
}
},
Long.class));
}
@Override
public void flatMap(Event event, Collector collector) throws Exception {
// 访问和更新状态
myValueState.update(event);
myListState.add(event);
myMapState.put(event.user, myMapState.get(event.user) == null ? 1: myMapState.get(event.user) + 1);
System.out.println(event.user + myMapState.get(event.user));
myReducingState.add(event);
System.out.println(event.user + myReducingState.get());
myAggregatingState.add(event);
System.out.println(myAggregatingState.get());
}
}

状态生存时间

在实际应用中,很多状态会随着时间的推移而增长,如果不加以限制,最终会导致存储空间的耗尽。

可以在代码中调用 clear() 方法来清除状态,但是有时候逻辑要求不能这么做,这时就需要配置一个状态生存时间,当状态存在时间超过这个值就将它清除。

状态创建的时候,设置失效时间 = 当前时间 + TTL,之后如果有对状态的访问或修改,可以再对失效时间进行更新。配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的 enableTimeToLive() 方法启动 TTL 功能。

ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>("valueState", Event.class);
// 配置状态TTL
StateTtlConfig ttlCOnfig= StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
myValueState = getRuntimeContext().getState(valueStateDescriptor);

算子状态

特点

算子状态就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 Key 无关,所以不同 Key 的数据只要被分发到同一个并行子任务,就会访问到同一个算子状态。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。


类型



  • 列表状态:将状态表示为一组数据的列表。与 Keyed State 中的列表状态的区别是,在算子状态的上下文中,不会按键分别处理状态,所以每一个并行子任务上只会保留一个“列表”。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

  • 联合列表状态:将状态表示为一组数据的列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

  • 广播状态:有时我们希望算子并行子任务都保特同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样。


代码示例

因为不存在 Key,所有数据发往哪个分区是不能预测的。所以当发生故障重启之后,我们不能保证某个数据跟之前一样,进入同一个并行子任务、访问同一个状态。所以需要自行设计状态的快照保存和恢复逻辑。



  • CheckpointedFunction 接口

public interface CheckpointedFunction {
// 保存状态快照到检查点,调用这个方法
void snapshotState(FunctionSnapshotContext var1) throws Exception;
// 初始化状态时调用这个方法,也会在恢复状态时调用
void initializeState(FunctionInitializationContext var1) throws Exception;
}

SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
// 批量缓存输出
stream.addSink(new BufferSink(10));

public static class BufferSink implements SinkFunction, CheckpointedFunction {
// 定义当前类的属性,批量
private final int threshold;
public BufferSink(int threshold) {
this.threshold = threshold;
}
private List bufferedElements;
// 定义一个算子状态
private ListState checkPointedState;
@Override
public void invoke(Event value, Context context) throws Exception {
// 缓存到列表
bufferedElements.add(value);
// 判断是否到达阈值,如果达到,就批量写入
if (bufferedElements.size() == threshold) {
// 用打印到控制台模拟写入外部系统
for (Event event: bufferedElements) {
System.out.println(event);
}
System.out.println("========输出完毕========");
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// 清空状态
checkPointedState.clear();
// 对状态进行持久化,复制缓存的列表到列表状态
for (Event event: bufferedElements) {
checkPointedState.add(event);
}
}
@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
// 定义算子状态
ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("buffered", Event.class);
checkPointedState = functionInitializationContext.getOperatorStateStore().getListState(stateDescriptor);
// 如果从故障恢复,需要将ListState中的所有元素复制到列表中
if (functionInitializationContext.isRestored()) {
for (Event event: checkPointedState.get()) {
bufferedElements.add(event);
}
}
}
}

广播状态

特点

将状态广播出去,所有并行子任务的状态都是相同的,并行度调整时只要直接复制就可以了。

由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就 是一个典型的广播状态。

在代码上,可以直接调用 DataStream 的 broadcast() 方法,传入一个“映射状态描述器”说明状态的名称和类型,就可以得到一个“广播流”,进而将要处理的数据流与这条广播流进行连接,就会得到“广播连接流”。注意广播状态只能用在广播连接流中。


代码示例

电商应用中,往往需要判断用户先后发生的行为的“组合模式”,比如“登录-下单”,检测出这些连续的行为进行统计,就可以了解平台的运用状况以及用户的行为习惯。

// 用户的行为数据流
DataStreamSource actiOnStream= env.fromElements(
new Action("Alice", "login"),
new Action("Alice", "pay"),
new Action("Bob", "login"),
new Action("Bob", "order")
);
// 行为模式流,基于它构建广播流
DataStreamSource

patternStream = env.fromElements(
new Pattern("login", "pay"),
new Pattern("login", "order")
);
// 定义广播状态描述器
MapStateDescriptor descriptor = new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class));
BroadcastStream

broadcastStream = patternStream.broadcast(descriptor);
// 连接两条流进行处理
SingleOutputStreamOperator> matches = actionStream.keyBy(data -> data.userId)
.connect(broadcastStream)
.process(new PatternDetector());
matches.print();

// 实现自定义的KeyedBroadcastProcessFunction
public static class PatternDetector extends KeyedBroadcastProcessFunction> {
// 定义一个keyedState,保存上一次用户行为
ValueState preActionState;
@Override
public void open(Configuration parameters) throws Exception {
preActiOnState= getRuntimeContext().getState(new ValueStateDescriptor("last-Action", String.class));
}
@Override
public void processElement(Action action, KeyedBroadcastProcessFunction>.ReadOnlyContext readOnlyContext, Collector> collector) throws Exception {
// 从广播状态中获取匹配模式
ReadOnlyBroadcastState patternState = readOnlyContext.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
Pattern pattern = patternState.get(null);
// 拿到上一次行为
String preAction = preActionState.value();
// 判断是否匹配
if (preAction != null && pattern != null) {
if (pattern.action1.equals(preAction) && pattern.action2.equals(action.action)) {
collector.collect(new Tuple2<>(readOnlyContext.getCurrentKey(), pattern));
}
}
// 更新状态
preActionState.update(action.action);
}
@Override
public void processBroadcastElement(Pattern pattern, KeyedBroadcastProcessFunction>.Context context, Collector> collector) throws Exception {
// 从上下文中获取广播状态,并用当前数据更新状态
BroadcastState patternState = context.getBroadcastState(new MapStateDescriptor<>("pattern", Types.VOID, Types.POJO(Pattern.class)));
patternState.put(null, pattern);
}
}

// 定义用户行为事件和模式的POJO类
public static class Action {
public String userId;
public String action;
public Action() {
}
public Action(String userId, String action) {
this.userId = userId;
this.action = action;
}
@Override
public String toString() {
return "Action{" +
"userId='" + userId + '\'' +
", action='" + action + '\'' +
'}';
}
}
public static class Pattern {
public String action1;
public String action2;
public Pattern() {
}
public Pattern(String action1, String action2) {
this.action1 = action1;
this.action2 = action2;
}
@Override
public String toString() {
return "Pattern{" +
"action1='" + action1 + '\'' +
", action2='" + action2 + '\'' +
'}';
}
}

状态持久化和状态后端

Flink 对状态进行持久化的方式,就是将当前所有分布式状态迸行“快照”保存,写入一个检查点或者保存点,保存到外部存储系统中。具体的存储介质,一般是分布式文件系统。


检查点

有状态流应用中的检查点,其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态。

默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的 enableCheckpointin() 方法就可以开启检查点。


状态后端

检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令,TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中,完成之后向 JobManager 返回确认信息。这个过程是分布式的, JobManger 收到所有 TaskManager 的返回信息后,就会确认当前检查点成功保存。

状态后端

在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端。

状态后端主要负责两件事:一是本地的状态管理,二是将检查点写入远程的持久化存储。



  • 哈希表状态后端:哈希表状态后端在内部会直接把状态当作对象,保存在 TaskManager 的 JVM 堆上。将本地状态全部放入内存中,这样可以获得最快的读写速度,使计算性能达到最佳。但是代价是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。

  • 内嵌 RocksDB 状态后端:RocksDB 是一种内嵌的键值存储介质,可以把数据持久化到本地硬盘。数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。



推荐阅读
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文详细介绍了PHP中与URL处理相关的三个函数:http_build_query、parse_str和查询字符串的解析。通过示例和语法说明,讲解了这些函数的使用方法和作用,帮助读者更好地理解和应用。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 动态规划算法的基本步骤及最长递增子序列问题详解
    本文详细介绍了动态规划算法的基本步骤,包括划分阶段、选择状态、决策和状态转移方程,并以最长递增子序列问题为例进行了详细解析。动态规划算法的有效性依赖于问题本身所具有的最优子结构性质和子问题重叠性质。通过将子问题的解保存在一个表中,在以后尽可能多地利用这些子问题的解,从而提高算法的效率。 ... [详细]
  • Android JSON基础,音视频开发进阶指南目录
    Array里面的对象数据是有序的,json字符串最外层是方括号的,方括号:[]解析jsonArray代码try{json字符串最外层是 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
author-avatar
mobiledu2502885385
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有