热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

《Flink应用实战》(一)广播状态

目录一、基本概念1、什么是状态2、状态的分类3、什么情况下需要保存状态3、为什么要广播状态3、广播状态的应用场景二、广播状态的设计实践1、数据分流2、补全用户信息

目录

一、基本概念

1、什么是状态

2、状态的分类

3、什么情况下需要保存状态

3、为什么要广播状态

3、广播状态的应用场景

二、广播状态的设计实践

1、数据分流

2、补全用户信息

3、评论过滤

4、用户消费优惠券奖励机制

三、广播状态的开发应用

(1)数据分流开发实践参考Flink动态分流到kafka,hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix

(2)补全用户信息开发实践(已测试通过)

(3)评论过滤开发实践(已测试通过)

(4)用户消费优惠券奖励机制开发实践(已测试通过)



一、基本概念


1、什么是状态

在Flink中,状态State是指一个具体的task(任务)/operator(算子)的状态。state数据默认是保存在java的堆内存中。

与checkpoint的区别在于checkpoint表示一个FlinkJob在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。可以理解为checkpoint是把state数据持久化存储了。

2、状态的分类

在Flink中,状态始终与特定算子相关联。有两种类型的状态:算子状态(operator state)和键控状态(keyed state)

(1)算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

(2)键控状态是根据输入数据流中定义的键(Key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个键对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的键。因此,具有相同键的所有数据都会访问相同的状态。

(3)键控状态类似于一个分布式的键值对映射数据结构,只能用于KeyedStream(keyBy算子处理之后)

3、什么情况下需要保存状态

流计算分为无状态有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果,例如,流处理应用程序从传感器接收水库的水位数据,并在水位超过指定高度时发出警告。

有状态的计算则会基于多个事件输出结果,比如:

(1)所有类型的窗口计算。例如,计算过去一小时的平均水位,就是有状态的计算。

(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,就是有状态的计算。

(3)流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是需要有状态的计算。

3、为什么要广播状态

状态只限定在同一任务的算子中,状态不能由相同或不同算子的另一个任务访问。如何其它的任务数据流需要使用另一任务数据流中的状态,则需要将数据流中的状态广播出去,其它任务数据流才能访问。

3、广播状态的应用场景

(1)将低吞吐量流中的数据以状态的方式广播给高吞吐量流使用。

(2)广播流通常作为主数据流的动态计算规则,主数据流需要借助这些规则 进行计算和分流。

(3)常见业务场景:

  1. 关键字、敏感信息过滤。设置过滤规则和关键字,对主数据流进行筛选。

  2. 数据分流。设置不同数据的分流条件和相应的配置信息,对主数据流进行自动分流存储。


二、广播状态的设计实践


1、数据分流

(1)需求分析

主数据流根据配置规则确定数据的存储方式,比如在数据仓库,维表数据量小,不常更新,多用于数据关联筛选查询,而考虑存储到HBase,Redis,MySQL等。而事实表,数据量大,高并发读写,考虑分类存到kafka中,进行进一步的处理,最终形成宽表。


(2)Flink数据流处理流程


  1. mysqlSource为mysql数据表,动态的配置来源表、操作类型、输出表、输出字段等。

  2. 配置数据流通过broadcast声明为广播数据流,需指定广播状态管理器,结构类似于这样:

  3. 通过connect 连接主数据流和广播流

  4. process算子对流中的每个元素调用BroadcastProcessFunction进行处理

  5. 自定义BroadcastProcessFunction方法中,需实现processBroadcastElement 处理广播过来的数据,并将数据写入状态中,processElement 从上下文中获取状态数据,并对主数据流进行处理,最后返回处理后的DataStream。


2、补全用户信息

(1)需求分析

  1. 用户点击事件数据流中,包含用户ID,事件时间,点击事件类型等,但缺少用户的姓名,年龄等基础信息。

  2. 用户注册后基础信息存储到MYSQL。

  3. 在数据流中需要实时补全点击事件中用户的基础信息。

(2)数据结构

  1. 用于广播流的mapState状态管理器

    MapStateDescriptor>>

    点击事件流无key值分组,故mapState状态也常用无key或自定义默认key值设计,这里使用void空值类占位符作为key。

    value值自定义map结构:map("userid" => ("姓名","年龄") ),通过useridkey值匹配对应的用户基础信息。

    1)processBroadcastElement方法中向mapState中写入数据

    broadcastState.put(null,value); //value 规则输入流

    2)processElement方法中获取mapState数据

    String userId = value.f0; // value 点击事件输入流
    //找到对应userid的数据
    Map> map = broadcastState.get(null);
    Tuple2 userInfo = map.get(userId);

  2. 最终输出流

    SingleOutputStreamOperator>

    返回一个包含事件数据和用户数据的Tuple结构


3、评论过滤

(1)需求分析

  1. 实时过滤评论中的敏感信息。

  2. 过滤评论规则支持动态的添加和删除。

(2)数据结构

  1. 用于广播流的mapState状态管理器

    MapStateDescriptor[String, List[String]]

    评论数据流无key分组,mapState的key值可选择自定义一固定值。

    因规则可以多个,则支持动态添加和删除,采用List结构存储多个关键字。

    1)processBroadcastElement方法中向mapState中写入数据

    val ruleList: List[String] = broadcastRule.get("keyword")
    val ruleList2 = ruleList :+ in2._1; //in2 规则输入流
    broadcastRule.put("keyword", ruleList2)

    2)processElement方法中获取mapState数据

    val rule: ReadOnlyBroadcastState[String, List[String]] = readOnlyContext.getBroadcastState(ruleMS)
    val ruleList: List[String] = rule.get("keyword")

  2. 最终输出过滤后评论流

    val result: String = ruleList.fold(in1)((result, elem) => result.replace(elem, "**")) //in1是评论输入流
    out.collect(result)


4、用户消费优惠券奖励机制

(1)需求分析

  1. 用户消费的不同类别会有对应的奖励机制,比如消费满200减20。

  2. 用户消费在没达到奖励条件之前,消费金额累加,达到条件后,发放优惠券并清除消费累计,重新累计。

  3. 同一个类别,可能存在多种奖励机制。

  4. 如果没有达到最低奖励条件,给出鼓励提示。

  5. 支持动态添加和删除奖励条件。

(2)数据结构

  1. 数据流分组

    由于涉及用户不同类别下消费金额的累计,对消费主数据流进行分组

    .keyBy("userID", "categoryName")

    数据流被分组后,类似于这样:(每一个key数据流对应一个mapState状态管理器)

  2. 用于广播流的mapState状态管理器

    MapStateDescriptor[String, List[CouponInfo]]

    规则流每个类别,可能存在多个奖励条件,存储格式:map("categoryName" => List[CouponInfo])

    广播流处理采用带keyed的广播处理函数:KeyedBroadcastProcessFunction

    1)processBroadcastElement方法中向mapState中写入数据

    val couponInfoList: List[CouponInfo] = broadcastState.get(value._2.categoryName)
    val couponInfoList2: List[CouponInfo] = couponInfoList :+ value._2broadcastState.put(value._2.categoryName, couponInfoList2) //value 规则输入流

    2)processElement方法中获取mapState数据

    val couponInfoList: List[CouponInfo] = broadcastState.get(value.categoryName)
    //先降序排列优惠券的金额
    val couponInfoList2: List[CouponInfo] = couponInfoList.sortBy(v => v.orderMoney).reverse
    //获取状态中的金额
    var count = 0
    val totalMoney: Double = allmoney.get()
    Breaks.breakable(for (couponInfo <- couponInfoList2) {if (totalMoney >&#61; couponInfo.orderMoney) {out.collect("在" &#43; value.categoryName &#43; "分类下&#xff0c;订单总金额已经达到了" &#43; couponInfo.orderMoney &#43; "&#xff0c;发放了一个价值" &#43; couponInfo.couponMoney &#43; "的优惠券。请到优惠券频道查看并使用")allmoney.clear()Breaks.break()}count &#43;&#61; 1}
    )

  3. 用于累计用户的金额状态管理器ReducingState

    方法说明
    ReducingState这种状态通过用户传入的reduceFunction&#xff0c;每次调用add方法添加值的时候&#xff0c;会调用reduceFunction&#xff0c;最后合并到一个单一的状态值

    var allmoney: ReducingState[Double] &#61; _override def open(parameters: Configuration): Unit &#61; {var reduceFunction: ReduceFunction[Double] &#61; new ReduceFunction[Double] {override def reduce(value1: Double, value2: Double): Double &#61; value1 &#43; value2}val reducingStateDescriptor: ReducingStateDescriptor[Double] &#61; new ReducingStateDescriptor[Double]("rsd", reduceFunction, createTypeInformation[Double])allmoney &#61; getRuntimeContext.getReducingState(reducingStateDescriptor)
    }//2.累计金额
    allmoney.add(value.money)


三、广播状态的开发应用


&#xff08;1&#xff09;数据分流开发实践参考Flink动态分流到kafka&#xff0c;hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix


&#xff08;2&#xff09;补全用户信息开发实践&#xff08;已测试通过&#xff09;


import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;/*** Desc* 需求:* 使用Flink的BroadcastState来完成* 事件流和配置流(需要广播为State)的关联,并实现配置的动态更新!*/
public class UserInfo {public static void main(String[] args) throws Exception {//1,获取环境StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();//2&#xff0c;读取数据源//数据源1的格式&#xff1a;//{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}//{"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}DataStreamSource> eventDS &#61; env.addSource(new MySource());//数据源2的格式&#xff1a;在mysql中/**DROP TABLE IF EXISTS &#96;user_info&#96;;CREATE TABLE &#96;user_info&#96; (&#96;userID&#96; varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,&#96;userName&#96; varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,&#96;userAge&#96; int(11) NULL DEFAULT NULL,PRIMARY KEY (&#96;userID&#96;) USING BTREE) ENGINE &#61; MyISAM CHARACTER SET &#61; utf8 COLLATE &#61; utf8_general_ci ROW_FORMAT &#61; Dynamic;-- ------------------------------ Records of user_info-- ----------------------------INSERT INTO &#96;user_info&#96; VALUES (&#39;user_1&#39;, &#39;张三&#39;, 10);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_2&#39;, &#39;李四&#39;, 20);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_3&#39;, &#39;王五&#39;, 30);INSERT INTO &#96;user_info&#96; VALUES (&#39;user_4&#39;, &#39;赵六&#39;, 40);SET FOREIGN_KEY_CHECKS &#61; 1;*/DataStreamSource>> configDS &#61; env.addSource(new MySQLSource());//3&#xff0c;数据转换//3.1 广播配置表流MapStateDescriptor>> mapStateDescriptor &#61; new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));BroadcastStream>> broadcastDS &#61; configDS.broadcast(mapStateDescriptor);//3.2 连接广播流BroadcastConnectedStream, Map>> connectDS &#61; eventDS.connect(broadcastDS);//3.3 补全主流用户信息SingleOutputStreamOperator> result &#61; connectDS.process(new MyProcessFunction(mapStateDescriptor));//4,执行result.print();env.execute();}private static class MySource implements SourceFunction> {private boolean isRunning &#61; true;&#64;Overridepublic void run(SourceContext> ctx) throws Exception {Random random &#61; new Random();SimpleDateFormat df &#61; new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");while(isRunning){int id &#61; random.nextInt(4) &#43; 1;String userId &#61; "user_" &#43; id;String eventTime &#61; df.format(new Date());String eventType &#61; "type_" &#43; random.nextInt(3);int productId &#61; random.nextInt(4);ctx.collect(Tuple4.of(userId,eventTime,eventType,productId));Thread.sleep(500);}}&#64;Overridepublic void cancel() {isRunning &#61; false;}}private static class MySQLSource extends RichSourceFunction>> {private Connection conn &#61; null;private PreparedStatement ps &#61; null;private boolean flag &#61; true;private ResultSet rs &#61; null;&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//连接数据库Class.forName("com.mysql.cj.jdbc.Driver");conn &#61; DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?useUnicode&#61;true&characterEncoding&#61;utf8&serverTimezone&#61;UTC", "root", "123456");String sql &#61; "select &#96;userID&#96;, &#96;userName&#96;, &#96;userAge&#96; from &#96;user_info&#96;";ps &#61; conn.prepareStatement(sql);}&#64;Overridepublic void run(SourceContext>> ctx) throws Exception {while (flag) {Map> map &#61; new HashMap<>();rs &#61; ps.executeQuery();while (rs.next()) {String userID &#61; rs.getString("userID");String userName &#61; rs.getString("userName");int userAge &#61; rs.getInt("userAge");map.put(userID, Tuple2.of(userName, userAge));}ctx.collect(map);Thread.sleep(5000);//每隔5s更新一下用户的配置信息!}}&#64;Overridepublic void cancel() {flag &#61; false;}&#64;Overridepublic void close() throws Exception {if(conn !&#61; null) conn.close();if(ps !&#61; null) ps.close();if(rs !&#61; null) rs.close();}}private static class MyProcessFunction extends BroadcastProcessFunction,Map>, Tuple6> {MapStateDescriptor>> mapStateDescriptor;public MyProcessFunction() {}public MyProcessFunction( MapStateDescriptor>> mapStateDescriptor) {this.mapStateDescriptor &#61; mapStateDescriptor;}&#64;Overridepublic void processElement(Tuple4 value, ReadOnlyContext ctx, Collector> out) throws Exception {String userId &#61; value.f0;ReadOnlyBroadcastState>> broadcastState &#61; ctx.getBroadcastState(mapStateDescriptor);if(broadcastState !&#61; null){Map> map &#61; broadcastState.get(null);if(map !&#61; null) {Tuple2 userInfo &#61; map.get(userId);String userName &#61; userInfo.f0;Integer userAge &#61; userInfo.f1;out.collect(Tuple6.of(userId, value.f1, value.f2, value.f3, userName, userAge));}}}&#64;Overridepublic void processBroadcastElement(Map> value, Context ctx, Collector> out) throws Exception {BroadcastState>> broadcastState &#61; ctx.getBroadcastState(mapStateDescriptor);broadcastState.clear();broadcastState.put(null,value);}}
}

&#xff08;3&#xff09;评论过滤开发实践&#xff08;已测试通过&#xff09;


import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{BroadcastConnectedStream, DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector/*** 评论数据过滤*/
object PingLun {def main(args: Array[String]): Unit &#61; {//1、获取环境val env &#61; StreamExecutionEnvironment.getExecutionEnvironment//2、读取数据//2.1 评论主流val plDS: DataStream[String] &#61; env.socketTextStream("127.0.0.1", 9999)//2.2 规则流val ruleDS &#61; env.socketTextStream("127.0.0.1", 9998)//3、数据转换//3.1.1 将规则流中输入进来的数据进行映射成一个元祖&#xff0c;第一个是word关键字&#xff0c;第二个是标记操作类型&#xff0c;是增加还是删除规则val ruleTupleDS: DataStream[(String, String)] &#61; ruleDS.map(_.split(" ")).map(v &#61;> (v(0), v(1)))//3.1 广播规则流val ruleMS: MapStateDescriptor[String, List[String]] &#61; new MapStateDescriptor[String, List[String]]("rule", createTypeInformation[String], createTypeInformation[List[String]])val broadcastDS: BroadcastStream[(String, String)] &#61; ruleTupleDS.broadcast(ruleMS)//3.2 连接数据流val connectDS: BroadcastConnectedStream[String, (String, String)] &#61; plDS.connect(broadcastDS)//3.3 数据流处理val result: DataStream[String] &#61; connectDS.process(new MyProcessFunction(ruleMS))//4、执行result.print()env.execute("PingLun")}class MyProcessFunction(ruleMS: MapStateDescriptor[String, List[String]]) extends BroadcastProcessFunction[String, (String, String), String] {override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, (String, String), String]#ReadOnlyContext, out: Collector[String]): Unit &#61; {val rule: ReadOnlyBroadcastState[String, List[String]] &#61; readOnlyContext.getBroadcastState(ruleMS)if (rule.contains("keyword")) {val ruleList: List[String] &#61; rule.get("keyword")val result: String &#61; ruleList.fold(in1)((result, elem) &#61;> result.replace(elem, "**"))out.collect(result)} else {out.collect(in1)}}override def processBroadcastElement(in2: (String, String), context: BroadcastProcessFunction[String, (String, String), String]#Context, out: Collector[String]): Unit &#61; {val broadcastRule: BroadcastState[String, List[String]] &#61; context.getBroadcastState(ruleMS)if (broadcastRule.contains("keyword")) {val ruleList: List[String] &#61; broadcastRule.get("keyword")if ("&#43;".equals(in2._2)) {val ruleList2 &#61; ruleList :&#43; in2._1broadcastRule.put("keyword", ruleList2)} else if ("-".equals(in2._2)) {val ruleList2: List[String] &#61; ruleList.filter(!_.equals(in2._1))broadcastRule.put("keyword", ruleList2)}} else {if ("&#43;".equals(in2._2)) {broadcastRule.put("keyword", List(in2._1))}}}}}

&#xff08;4&#xff09;用户消费优惠券奖励机制开发实践&#xff08;已测试通过&#xff09;

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState, ReducingState, ReducingStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorimport scala.util.control.Breaks/**** 1、不同类别会有对应的奖励机制&#xff0c;需要把这个奖励机制广播给用户消费对应的流* 2、用户的消费应该是一个高吞吐量流* 3、通过用户消费流连接奖励机制流&#xff0c;然后通过process处理* 4、用户消费流应该根据用户标记以以及类别分组&#61;&#61;&#61;》流是KeyedStream* 5、ProcessFunction应该选中KeyedBroadcastProcessFunction* 6、在KeyedBroadcastProcessFunction中完成奖励机制以及用户消费统计、分析、处理**//*** 订单详情类** &#64;param userID 用户编号* &#64;param categoryName 商品类别* &#64;param money 订单金额* user1 pants 500* user1 pants 600* user2 pants 500* user2 pants 100*/
case class OrderItem(userID: String, categoryName: String, money: Double)/*** 优惠券类** &#64;param categoryName 商品类别* &#64;param orderMoney 订单金额* &#64;param couponMoney 优惠券金额* pants 1000 200 &#43;* pants 900 100 &#43;*/
case class CouponInfo(categoryName: String, orderMoney: Double, couponMoney: Double)object CouponAward {def main(args: Array[String]): Unit &#61; {//1&#xff0c;获取环境val env &#61; StreamExecutionEnvironment.getExecutionEnvironment//2&#xff0c;获取数据源//2.1 用户购买数据val userDS: DataStream[String] &#61; env.socketTextStream("127.0.0.1", 9999)//2.1.1 转成实例对象&#xff0c;并分组val userKeyedStream: KeyedStream[OrderItem, Tuple] &#61; userDS.map(_.split(" ")).map(array &#61;> new OrderItem(array(0), array(1), array(2).toDouble)).keyBy("userID", "categoryName")//2.2 规则流val couponDS &#61; env.socketTextStream("127.0.0.1", 9998)//2.2.1 转成实例对象//支持动态增加或删除优惠券规则val couponInfoDS: DataStream[(String, CouponInfo)] &#61; couponDS.map(_.split(" ")).map(array &#61;> (array(3), new CouponInfo(array(0), array(1).toDouble, array(2).toDouble)))//3&#xff0c;数据转换//3.1 声明规则流广播val mapStateDescriptor: MapStateDescriptor[String, List[CouponInfo]] &#61; new MapStateDescriptor[String, List[CouponInfo]]("msz", createTypeInformation[String], createTypeInformation[List[CouponInfo]])val couponInfoBCS: BroadcastStream[(String, CouponInfo)] &#61; couponInfoDS.broadcast(mapStateDescriptor)//3.2 连接广播流val connectStream: BroadcastConnectedStream[OrderItem, (String, CouponInfo)] &#61; userKeyedStream.connect(couponInfoBCS)//3.3 执行proess数据处理val result: DataStream[String] &#61; connectStream.process(new MykeyedBroadcastFunction(mapStateDescriptor))//4&#xff0c;执行result.print()env.execute("CouponAward")}class MykeyedBroadcastFunction(mapStateDescriptor: MapStateDescriptor[String, List[CouponInfo]]) extends KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String] {//需要一状态来存储对应用户的总金额&#xff0c;金额作累加&#xff0c;达到领优惠券之后输出并金额清除var allmoney: ReducingState[Double] &#61; _override def open(parameters: Configuration): Unit &#61; {var reduceFunction: ReduceFunction[Double] &#61; new ReduceFunction[Double] {override def reduce(value1: Double, value2: Double): Double &#61; value1 &#43; value2}val reducingStateDescriptor: ReducingStateDescriptor[Double] &#61; new ReducingStateDescriptor[Double]("rsd", reduceFunction, createTypeInformation[Double])allmoney &#61; getRuntimeContext.getReducingState(reducingStateDescriptor)}override def processElement(value: OrderItem, readOnlyContext: KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String]#ReadOnlyContext, out: Collector[String]): Unit &#61; {//1.从上下文拿到广播状态val broadcastState: ReadOnlyBroadcastState[String, List[CouponInfo]] &#61; readOnlyContext.getBroadcastState(mapStateDescriptor)//2.累计金额allmoney.add(value.money)//3.判断该商品类别有没优惠券&#xff0c;有的话就发放优惠券并清空累计金额&#xff0c;没有就告诉他别灰心if (broadcastState.contains(value.categoryName)) {val couponInfoList: List[CouponInfo] &#61; broadcastState.get(value.categoryName)//先降序排列优惠券的金额val couponInfoList2: List[CouponInfo] &#61; couponInfoList.sortBy(v &#61;> v.orderMoney).reverse//获取状态中的金额var count &#61; 0val totalMoney: Double &#61; allmoney.get()Breaks.breakable(for (couponInfo <- couponInfoList2) {if (totalMoney >&#61; couponInfo.orderMoney) {out.collect("在" &#43; value.categoryName &#43; "分类下&#xff0c;订单总金额已经达到了" &#43; couponInfo.orderMoney &#43; "&#xff0c;发放了一个价值" &#43; couponInfo.couponMoney &#43; "的优惠券。请到优惠券频道查看并使用")allmoney.clear()Breaks.break()}count &#43;&#61; 1})//没有达到任务的优惠if(count &#61;&#61; couponInfoList2.size){//取要求最小的一个&#xff0c;提示val couponInfo: CouponInfo &#61; couponInfoList2(couponInfoList2.size - 1)out.collect("在"&#43;value.categoryName&#43;"分类下&#xff0c;还需要再消费"&#43;(couponInfo.orderMoney - totalMoney)&#43;"就可以获取到价值"&#43;couponInfo.couponMoney&#43;"的优惠券")}}else{//否则就说明这个类别还没有优惠券out.collect("在"&#43;value.categoryName&#43;"分类下&#xff0c;还没有设置优惠券信息&#xff0c;不过没有关系&#xff0c;现在的消费金额也会参与到后续的优惠券发放统计内")}}override def processBroadcastElement(value: (String, CouponInfo), ctx: KeyedBroadcastProcessFunction[Tuple, OrderItem, (String, CouponInfo), String]#Context, out: Collector[String]): Unit &#61; {val broadcastState: BroadcastState[String, List[CouponInfo]] &#61; ctx.getBroadcastState(mapStateDescriptor)//判断状态管理器中是否已存在相应的规则if (broadcastState.contains(value._2.categoryName)) {//根据操作规则是增加还是删除作处理val couponInfoList: List[CouponInfo] &#61; broadcastState.get(value._2.categoryName)if ("&#43;".equals(value._1)) {val couponInfoList2: List[CouponInfo] &#61; couponInfoList :&#43; value._2broadcastState.put(value._2.categoryName, couponInfoList2)} else if ("-".equals(value._1)) {val couponInfoList2: List[CouponInfo] &#61; couponInfoList.filter(v &#61;> v.orderMoney !&#61; value._2.orderMoney)broadcastState.put(value._2.categoryName, couponInfoList2)}} else {if ("&#43;".equals(value._1)) {broadcastState.put(value._2.categoryName, List(value._2))}}}}}

参考资料&#xff1a;

《Flink基础编程》-- 林子雨

Flink动态分流到kafka&#xff0c;hbase_阿飞不会飞丶的博客-CSDN博客_flink hbase phoenix

Flink&#xff08;51&#xff09;&#xff1a;Flink高级特性之广播状态&#xff08;BroadcastState&#xff09;_电光闪烁的博客-CSDN博客_flink 广播状态

Flink(八) 广播状态模式 Broadcast State Pattern_小雨光的博客-CSDN博客 &#xff08;原博代码有BUG&#xff0c;已本地测式通过&#xff09;


推荐阅读
  • 本文介绍如何使用 Android 的 Canvas 和 View 组件创建一个简单的绘图板应用程序,支持触摸绘画和保存图片功能。 ... [详细]
  • 本文将详细探讨 Java 中提供的不可变集合(如 `Collections.unmodifiableXXX`)和同步集合(如 `Collections.synchronizedXXX`)的实现原理及使用方法,帮助开发者更好地理解和应用这些工具。 ... [详细]
  • 本文探讨了如何在Java中使用JAXB解组两个具有相同名称但不同结构的对象。我们将介绍一个抽象类Bar及其具体实现,并展示如何正确地解析XML文档以获取正确的对象实例。 ... [详细]
  • 2017-2018年度《网络编程与安全》第五次实验报告
    本报告详细记录了2017-2018学年《网络编程与安全》课程第五次实验的具体内容、实验过程、遇到的问题及解决方案。 ... [详细]
  • Google排名优化-面向Google(Search Engine Friendly)的URL设计 ... [详细]
  • 在Java应用程序开发过程中,FTP协议被广泛用于文件的上传和下载操作。本文通过Jakarta Commons Net库中的FTPClient类,详细介绍如何实现文件的上传和下载功能。 ... [详细]
  • 本文探讨了Web开发与游戏开发之间的主要区别,旨在帮助开发者更好地理解两种开发领域的特性和需求。文章基于作者的实际经验和网络资料整理而成。 ... [详细]
  • Java 架构:深入理解 JDK 动态代理机制
    代理模式是 Java 中常用的设计模式之一,其核心在于代理类与委托类共享相同的接口。代理类主要用于为委托类提供预处理、过滤、转发及后处理等功能,以增强或改变原有功能的行为。 ... [详细]
  • 本文探讨了如何利用Java PDFBox库填写PDF表单,并将其正确地附加到PDDocument中,同时确保表单字段的唯一性。 ... [详细]
  • MapReduce原理是怎么剖析的
    这期内容当中小编将会给大家带来有关MapReduce原理是怎么剖析的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。1 ... [详细]
  • Java 实现二维极点算法
    本文介绍了一种使用 Java 编程语言实现的二维极点算法。该算法用于从一组二维坐标中筛选出极点,适用于需要处理几何图形和空间数据的应用场景。文章不仅详细解释了算法的工作原理,还提供了完整的代码示例。 ... [详细]
  • 深入理解Vue.js:从入门到精通
    本文详细介绍了Vue.js的基础知识、安装方法、核心概念及实战案例,帮助开发者全面掌握这一流行的前端框架。 ... [详细]
  • Spring Cloud学习指南:深入理解微服务架构
    本文介绍了微服务架构的基本概念及其在Spring Cloud中的实现。讨论了微服务架构的主要优势,如简化开发和维护、快速启动、灵活的技术栈选择以及按需扩展的能力。同时,也探讨了微服务架构面临的挑战,包括较高的运维要求、分布式系统的复杂性、接口调整的成本等问题。最后,文章提出了实施微服务时应遵循的设计原则。 ... [详细]
  • 前言无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上 ... [详细]
  • 性能测试工具的选择与应用
    本文探讨了性能测试工具的重要性及其在软件测试中的作用,重点介绍了选择合适性能测试工具的考量因素,并对几种常用的性能测试工具进行了对比分析。 ... [详细]
author-avatar
7777-丿M
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有