热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

从0到1Flink的成长之路(二十)Flink高级特性(二)之存储State数据结构

存储State数据结构前面说过:有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来。Flink为了方便不同分类的State的存储和管理,提供了如下API数据结构来存




存储 State 数据结构

前面说过:有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来。
Flink为了方便不同分类的State的存储和管理,提供了如下API/数据结构来存储State。
在这里插入图片描述
1)、Keyed State
Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。保存
Keyed state的数据结构:  ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值,如统计按用户id统计用户经常登录的IP
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
MapState:即状态值为一个map,用户通过put或putAll方法添加元素需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中,相当于只是持有了这个状态的句柄。

2)、Operator State
Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口,保存Operator state的数据结构:ListState和BroadcastState
举例来说,Flink中的FlinkKafkaConsumer,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
在这里插入图片描述
State 代码示例
在这里插入图片描述
1 Keyed State
下图以WordCount 的 sum 所使用的StreamGroupedReduce类为例,讲解了如何在代码中使用
在这里插入图片描述
官网代码示例
managed-keyed-state
需求:
使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
编码步骤


//-1.定义一个状态用来存放最大值 private transient ValueState maxValueState;
//-2.创建一个状态描述符对象 ValueStateDescriptor descriptor = new
ValueStateDescriptor(“maxValueState”, Long.class); //-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
//-4.使用State Long historyValue = maxValueState.value(); //判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue) //-5.更新状态
maxValueState.update(currentValue);


代码示例

package xx.xxxxxx.flink.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** Flink State 中KeyedState,默认情况下框架自己维护,此外可以手动维护 */
public class StreamKeyedStateDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource> tupleStream = env.fromElements(
Tuple3.of("上海", "普陀区", 488L), Tuple3.of("上海", "徐汇区", 212L),
Tuple3.of("北京", "西城区", 823L), Tuple3.of("北京", "海淀区", 234L),
Tuple3.of("上海", "杨浦区", 888L), Tuple3.of("上海", "浦东新区", 666L),
Tuple3.of("北京", "东城区", 323L), Tuple3.of("上海", "黄浦区", 111L)
);
// 3. 数据转换-transformation
// TODO:直接使用maxBy的状态,Flink自动维护了State
SingleOutputStreamOperator> maxOperator = tupleStream
.keyBy(0)
.maxBy(2);
/*
maxBy> (上海,普陀区,488)
maxBy> (上海,普陀区,488)
maxBy> (北京,西城区,823)
maxBy> (北京,西城区,823)
maxBy> (上海,杨浦区,888)
maxBy> (上海,杨浦区,888)
maxBy> (北京,西城区,823)
maxBy> (上海,杨浦区,888)
*/
maxOperator.printToErr("maxBy");
// TODO: 使用KeyedState中的ValueState来模拟maxBy的底层,手动维护状态
SingleOutputStreamOperator stateOperator = tupleStream
.keyBy(0)
.map(new RichMapFunction, String>() {
// a. 定义一个状态存储最大值
private transient ValueState maxValueState;
@Override
public void open(Configuration parameters) throws Exception {
// b. 创建状态描述符
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor(
"maxValueState", Long.class
);
// c. 实例化状态对象
maxValueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public String map(Tuple3 value) throws Exception {
// d. 获取历史状态
Long historyValue = maxValueState.value();
// max 最大值比较
Long currentValue = value.f2;
if (null == historyValue || currentValue > historyValue) {
// e. 更新状态
maxValueState.update(currentValue);
return value.f0 + ", " + value.f1 + ", " + currentValue ; } else {
return value.f0 + ", " + value.f1 + ", " + historyValue ; } }
});
/*
state> 上海, 普陀区, 488
state> 上海, 徐汇区, 488
state> 北京, 西城区, 823
state> 北京, 海淀区, 823
state> 上海, 杨浦区, 888
state> 上海, 浦东新区, 888
state> 北京, 东城区, 823
state> 上海, 黄浦区, 888
*/
stateOperator.print("state");
// 5. 触发执行-execute
env.execute(StreamKeyedStateDemo.class.getSimpleName());
} }

2 Operator State
下图对WordCoun示例中的FromElementsFunction类进行详解并分享,如何在代码中使用operator state:
在这里插入图片描述
官网代码示例
managed-operator-state
需求:
使用ListState存储offset,模拟Kafka的offset维护
编码步骤:


//-1.声明一个OperatorState来记录offset private ListState offsetState =
null; private Long offset = 0L; //-2.创建状态描述器 ListStateDescriptor
descriptor = new ListStateDescriptor(“offsetState”, Long.class);
//-3.根据状态描述器获取State offsetState =
context.getOperatorStateStore().getListState(descriptor);
//-4.获取State中的值 Iterator iterator =
offsetState.get().iterator(); if (iterator.hasNext()) {//迭代器中有值
offset = iterator.next();//取出的值就是offset } offset += 1L;
ctx.collect(“subTaskId:” + getRuntimeContext().getIndexOfThisSubtask()


  • “,当前的offset为:” + offset); if (offset % 5 == 0) {//每隔5条消息,模拟一个异常 //-5.保存State到Checkpoint中
    offsetState.clear();//清理内存中存储的offset到Checkpoint中 //-6.将offset存入State中
    offsetState.add(offset);

代码示例

package xx.xxxxx.flink.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* Flink State 中OperatorState,自定义数据源Kafka消费数据,保存消费偏移量数据并进行Checkpoint
*/
public class StateOperatorStateDemo {
/**
* 自定义数据源Source,模拟从Kafka消费数据(类似FlinkKafkaConsumer),并实现offset状态维护
*/
private static class FlinkKafkaSource extends RichParallelSourceFunction
implements CheckpointedFunction {
private volatile boolean isRunning = true ;
// TODO:a. 声明OperatorState存储offset
private ListState offsetState = null ;
private Long offset = 0L ; @Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// TODO: b. 创建状态描述
ListStateDescriptor descriptor = new ListStateDescriptor<>(
"offsetState", Long.class
);
// TODO: c. 实例化状态对象
offsetState = context.getOperatorStateStore().getListState(descriptor);
// 判断状态是否从Checkpoint或SnapShot恢复,是的话获取偏移量
if(context.isRestored()){
// TODO:d. 从State获取值
offset = offsetState.get().iterator().next();
} }
// 快照方法会在Checkpoint的时候执行,把当前的State存入到Checkpoint中
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// TODO: e. 将状态数据保存至Checkpoint中
offsetState.clear();
// TODO: f. 将list存储State中
offsetState.update(Arrays.asList(offset));
}@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning){
// 获取索引,分区ID
int partitiOnId= getRuntimeContext().getIndexOfThisSubtask();
// 更新偏移量信息
offset = offset + 1L ;
// 输出操作
ctx.collect("partition: " + partitionId + ", offset: " + offset);
TimeUnit.SECONDS.sleep(1);
// 每隔5条消息,模拟一个异常
if(offset % 5 == 0){
throw new RuntimeException("程序出现异常,遇到Bug啦................") ; } } }@Override
public void cancel() {
isRunning = false ; } }
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO: 设置检查点Checkpoint相关属性,保存状态
env.enableCheckpointing(1000) ; // 每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckpt/")) ; // 状态数据保存本地文件系统
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);// 当应用取消时,Checkpoint数据保存,不删除
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
// 2. 数据源-source
DataStreamSource kafkaDataStream = env.addSource(new FlinkKafkaSource());
// 4. 数据终端-sink
kafkaDataStream.printToErr();
// 5. 触发执行-execute
env.execute(StateOperatorStateDemo.class.getSimpleName());
} }


推荐阅读
  • sklearn数据集库中的常用数据集类型介绍
    本文介绍了sklearn数据集库中常用的数据集类型,包括玩具数据集和样本生成器。其中详细介绍了波士顿房价数据集,包含了波士顿506处房屋的13种不同特征以及房屋价格,适用于回归任务。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 本文讨论了如何使用IF函数从基于有限输入列表的有限输出列表中获取输出,并提出了是否有更快/更有效的执行代码的方法。作者希望了解是否有办法缩短代码,并从自我开发的角度来看是否有更好的方法。提供的代码可以按原样工作,但作者想知道是否有更好的方法来执行这样的任务。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • Android自定义控件绘图篇之Paint函数大汇总
    本文介绍了Android自定义控件绘图篇中的Paint函数大汇总,包括重置画笔、设置颜色、设置透明度、设置样式、设置宽度、设置抗锯齿等功能。通过学习这些函数,可以更好地掌握Paint的用法。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Annotation的大材小用
    为什么80%的码农都做不了架构师?最近在开发一些通用的excel数据导入的功能,由于涉及到导入的模块很多,所以开发了一个比较通用的e ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • go channel 缓冲区最大限制_Golang学习笔记之并发.协程(Goroutine)、信道(Channel)
    原文作者:学生黄哲来源:简书Go是并发语言,而不是并行语言。一、并发和并行的区别•并发(concurrency)是指一次处理大量事情的能力 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 3.223.28周学习总结中的贪心作业收获及困惑
    本文是对3.223.28周学习总结中的贪心作业进行总结,作者在解题过程中参考了他人的代码,但前提是要先理解题目并有解题思路。作者分享了自己在贪心作业中的收获,同时提到了一道让他困惑的题目,即input details部分引发的疑惑。 ... [详细]
  • 摘要: 在测试数据中,生成中文姓名是一个常见的需求。本文介绍了使用C#编写的随机生成中文姓名的方法,并分享了相关代码。作者欢迎读者提出意见和建议。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • 面向对象之3:封装的总结及实现方法
    本文总结了面向对象中封装的概念和好处,以及在Java中如何实现封装。封装是将过程和数据用一个外壳隐藏起来,只能通过提供的接口进行访问。适当的封装可以提高程序的理解性和维护性,增强程序的安全性。在Java中,封装可以通过将属性私有化并使用权限修饰符来实现,同时可以通过方法来访问属性并加入限制条件。 ... [详细]
author-avatar
清新的淡淡茶绿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有