热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

从0到1Flink的成长之路(二十)Flink高级特性(二)之存储State数据结构

存储State数据结构前面说过:有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来。Flink为了方便不同分类的State的存储和管理,提供了如下API数据结构来存




存储 State 数据结构

前面说过:有状态计算其实就是需要考虑历史数据,而历史数据需要搞个地方存储起来。
Flink为了方便不同分类的State的存储和管理,提供了如下API/数据结构来存储State。
在这里插入图片描述
1)、Keyed State
Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。保存
Keyed state的数据结构:  ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态。它可以通过update方法更新状态值,通过value()方法获取状态值,如求按用户id统计用户交易总额
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值,如统计按用户id统计用户经常登录的IP
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
MapState:即状态值为一个map,用户通过put或putAll方法添加元素需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中,相当于只是持有了这个状态的句柄。

2)、Operator State
Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口,保存Operator state的数据结构:ListState和BroadcastState
举例来说,Flink中的FlinkKafkaConsumer,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
在这里插入图片描述
State 代码示例
在这里插入图片描述
1 Keyed State
下图以WordCount 的 sum 所使用的StreamGroupedReduce类为例,讲解了如何在代码中使用
在这里插入图片描述
官网代码示例
managed-keyed-state
需求:
使用KeyState中的ValueState获取数据中的最大值(实际中直接使用maxBy即可)
编码步骤


//-1.定义一个状态用来存放最大值 private transient ValueState maxValueState;
//-2.创建一个状态描述符对象 ValueStateDescriptor descriptor = new
ValueStateDescriptor(“maxValueState”, Long.class); //-3.根据状态描述符获取State
maxValueState = getRuntimeContext().getState(maxValueStateDescriptor);
//-4.使用State Long historyValue = maxValueState.value(); //判断当前值和历史值谁大
if (historyValue == null || currentValue > historyValue) //-5.更新状态
maxValueState.update(currentValue);


代码示例

package xx.xxxxxx.flink.state;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** Flink State 中KeyedState,默认情况下框架自己维护,此外可以手动维护 */
public class StreamKeyedStateDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource> tupleStream = env.fromElements(
Tuple3.of("上海", "普陀区", 488L), Tuple3.of("上海", "徐汇区", 212L),
Tuple3.of("北京", "西城区", 823L), Tuple3.of("北京", "海淀区", 234L),
Tuple3.of("上海", "杨浦区", 888L), Tuple3.of("上海", "浦东新区", 666L),
Tuple3.of("北京", "东城区", 323L), Tuple3.of("上海", "黄浦区", 111L)
);
// 3. 数据转换-transformation
// TODO:直接使用maxBy的状态,Flink自动维护了State
SingleOutputStreamOperator> maxOperator = tupleStream
.keyBy(0)
.maxBy(2);
/*
maxBy> (上海,普陀区,488)
maxBy> (上海,普陀区,488)
maxBy> (北京,西城区,823)
maxBy> (北京,西城区,823)
maxBy> (上海,杨浦区,888)
maxBy> (上海,杨浦区,888)
maxBy> (北京,西城区,823)
maxBy> (上海,杨浦区,888)
*/
maxOperator.printToErr("maxBy");
// TODO: 使用KeyedState中的ValueState来模拟maxBy的底层,手动维护状态
SingleOutputStreamOperator stateOperator = tupleStream
.keyBy(0)
.map(new RichMapFunction, String>() {
// a. 定义一个状态存储最大值
private transient ValueState maxValueState;
@Override
public void open(Configuration parameters) throws Exception {
// b. 创建状态描述符
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor(
"maxValueState", Long.class
);
// c. 实例化状态对象
maxValueState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public String map(Tuple3 value) throws Exception {
// d. 获取历史状态
Long historyValue = maxValueState.value();
// max 最大值比较
Long currentValue = value.f2;
if (null == historyValue || currentValue > historyValue) {
// e. 更新状态
maxValueState.update(currentValue);
return value.f0 + ", " + value.f1 + ", " + currentValue ; } else {
return value.f0 + ", " + value.f1 + ", " + historyValue ; } }
});
/*
state> 上海, 普陀区, 488
state> 上海, 徐汇区, 488
state> 北京, 西城区, 823
state> 北京, 海淀区, 823
state> 上海, 杨浦区, 888
state> 上海, 浦东新区, 888
state> 北京, 东城区, 823
state> 上海, 黄浦区, 888
*/
stateOperator.print("state");
// 5. 触发执行-execute
env.execute(StreamKeyedStateDemo.class.getSimpleName());
} }

2 Operator State
下图对WordCoun示例中的FromElementsFunction类进行详解并分享,如何在代码中使用operator state:
在这里插入图片描述
官网代码示例
managed-operator-state
需求:
使用ListState存储offset,模拟Kafka的offset维护
编码步骤:


//-1.声明一个OperatorState来记录offset private ListState offsetState =
null; private Long offset = 0L; //-2.创建状态描述器 ListStateDescriptor
descriptor = new ListStateDescriptor(“offsetState”, Long.class);
//-3.根据状态描述器获取State offsetState =
context.getOperatorStateStore().getListState(descriptor);
//-4.获取State中的值 Iterator iterator =
offsetState.get().iterator(); if (iterator.hasNext()) {//迭代器中有值
offset = iterator.next();//取出的值就是offset } offset += 1L;
ctx.collect(“subTaskId:” + getRuntimeContext().getIndexOfThisSubtask()


  • “,当前的offset为:” + offset); if (offset % 5 == 0) {//每隔5条消息,模拟一个异常 //-5.保存State到Checkpoint中
    offsetState.clear();//清理内存中存储的offset到Checkpoint中 //-6.将offset存入State中
    offsetState.add(offset);

代码示例

package xx.xxxxx.flink.state;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* Flink State 中OperatorState,自定义数据源Kafka消费数据,保存消费偏移量数据并进行Checkpoint
*/
public class StateOperatorStateDemo {
/**
* 自定义数据源Source,模拟从Kafka消费数据(类似FlinkKafkaConsumer),并实现offset状态维护
*/
private static class FlinkKafkaSource extends RichParallelSourceFunction
implements CheckpointedFunction {
private volatile boolean isRunning = true ;
// TODO:a. 声明OperatorState存储offset
private ListState offsetState = null ;
private Long offset = 0L ; @Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// TODO: b. 创建状态描述
ListStateDescriptor descriptor = new ListStateDescriptor<>(
"offsetState", Long.class
);
// TODO: c. 实例化状态对象
offsetState = context.getOperatorStateStore().getListState(descriptor);
// 判断状态是否从Checkpoint或SnapShot恢复,是的话获取偏移量
if(context.isRestored()){
// TODO:d. 从State获取值
offset = offsetState.get().iterator().next();
} }
// 快照方法会在Checkpoint的时候执行,把当前的State存入到Checkpoint中
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// TODO: e. 将状态数据保存至Checkpoint中
offsetState.clear();
// TODO: f. 将list存储State中
offsetState.update(Arrays.asList(offset));
}@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning){
// 获取索引,分区ID
int partitiOnId= getRuntimeContext().getIndexOfThisSubtask();
// 更新偏移量信息
offset = offset + 1L ;
// 输出操作
ctx.collect("partition: " + partitionId + ", offset: " + offset);
TimeUnit.SECONDS.sleep(1);
// 每隔5条消息,模拟一个异常
if(offset % 5 == 0){
throw new RuntimeException("程序出现异常,遇到Bug啦................") ; } } }@Override
public void cancel() {
isRunning = false ; } }
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// TODO: 设置检查点Checkpoint相关属性,保存状态
env.enableCheckpointing(1000) ; // 每隔1s执行一次Checkpoint
env.setStateBackend(new FsStateBackend("file:///D:/ckpt/")) ; // 状态数据保存本地文件系统
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);// 当应用取消时,Checkpoint数据保存,不删除
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 固定延迟重启策略: 程序出现异常的时候,重启2次,每次延迟3秒钟重启,超过2次,程序退出
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));
// 2. 数据源-source
DataStreamSource kafkaDataStream = env.addSource(new FlinkKafkaSource());
// 4. 数据终端-sink
kafkaDataStream.printToErr();
// 5. 触发执行-execute
env.execute(StateOperatorStateDemo.class.getSimpleName());
} }


推荐阅读
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • 文章目录Golang定时器Timer和Tickertime.Timertime.NewTimer()实例time.AfterFunctime.Tickertime.NewTicke ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本文深入解析了Java 8并发编程中的`AtomicInteger`类,详细探讨了其源码实现和应用场景。`AtomicInteger`通过硬件级别的原子操作,确保了整型变量在多线程环境下的安全性和高效性,避免了传统加锁方式带来的性能开销。文章不仅剖析了`AtomicInteger`的内部机制,还结合实际案例展示了其在并发编程中的优势和使用技巧。 ... [详细]
  • 传统上,Java 的 String 类一直使用 char 数组来存储字符数据。然而,在 Java 9 及更高版本中,String 类的内部实现改为使用 byte 数组。本文将探讨这一变化的原因及其带来的好处。 ... [详细]
  • 本文介绍了如何在Python中使用插值方法将不同分辨率的数据统一到相同的分辨率。 ... [详细]
  • 本文详细介绍了 com.apollographql.apollo.api.internal.Optional 类中的 orNull() 方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 利用ZFS和Gluster实现分布式存储系统的高效迁移与应用
    本文探讨了在Ubuntu 18.04系统中利用ZFS和Gluster文件系统实现分布式存储系统的高效迁移与应用。通过详细的技术分析和实践案例,展示了这两种文件系统在数据迁移、高可用性和性能优化方面的优势,为分布式存储系统的部署和管理提供了宝贵的参考。 ... [详细]
  • 提升Android开发效率:Clean Code的最佳实践与应用
    在Android开发中,提高代码质量和开发效率是至关重要的。本文介绍了如何通过Clean Code的最佳实践来优化Android应用的开发流程。以SQLite数据库操作为例,详细探讨了如何编写高效、可维护的SQL查询语句,并将其结果封装为Java对象。通过遵循这些最佳实践,开发者可以显著提升代码的可读性和可维护性,从而加快开发速度并减少错误。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
author-avatar
清新的淡淡茶绿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有