title: Flink系列
六、Flink Broadcast 编程实战
6.1 理论
Flink 的批处理 和 Spark 的批处理,都支持两个非常好的特性: 广播变量 + 累加器
广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks,广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。
用法:
// 1:初始化数据
DataSet toBroadcast = env.fromElements(1, 2, 3)
// 2:广播数据
withBroadcastSet(toBroadcast, "broadcastSetName");
// 3:获取数据
Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
注意:
1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。
6.2 案例
package com.aa.flinkjava.broadcast;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class FlinkBroadCastDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<String,Integer>> list &#61; new ArrayList<>();
list.add(new Tuple2<>("zhangsan",20));
list.add(new Tuple2<>("lisi",21));
list.add(new Tuple2<>("wangwu",22));
DataSource<Tuple2<String, Integer>> dataSource &#61; executionEnvironment.fromCollection(list);
dataSource.print("dataSource : ");
DataSet<HashMap<String, Integer>> toBroadcast &#61; dataSource.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
&#64;Override
public HashMap<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
HashMap<String, Integer> hashMap &#61; new HashMap<>();
hashMap.put(tuple2.f0,tuple2.f1);
return hashMap;
}
});
DataSource<String> data2 &#61; executionEnvironment.fromElements("zhangsan", "lisi", "wangwu");
data2.print("data2 : ");
DataSet<String> result &#61; data2.map(new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap &#61; new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap &#61; new HashMap<String, Integer>();
&#64;Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.broadCastMap &#61; getRuntimeContext().getBroadcastVariable("bdMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
&#64;Override
public String map(String s) throws Exception {
Integer age &#61; allMap.get(s);
return s &#43; "," &#43; age;
}
}).withBroadcastSet(toBroadcast, "bdMapName");
result.print();
}
}
七、Flink Counter 编程实战
7.1 理论
Accumulator 即累加器&#xff0c;与 Mapreduce Counter 的应用场景差不多&#xff0c;都能很好地观察 Task 在运行期间的数据变化。可以在 Flink job 任务中的算子函数中操作累加器&#xff0c;但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter 是一个具体的累加器 (Accumulator) 实现&#xff1a;IntCounter, LongCounter 和 DoubleCounter
用法&#xff1a;
// 1、创建累加器
private IntCounter numlines &#61; new IntCounter();
// 2、注册累加器
getRuntimeContext().addAccumulator("num", this.numLines);
// 3、使用累加器
this.numlines.add(1);
// 4、获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num")
7.2 案例
package com.aa.flinkjava.counter;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
public class FlinkCounterDemo {
public static void main(String[] args) throws Exception {
ExecutionEnvironment executionEnvironment &#61; ExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(3);
DataSource<String> dataSource &#61; executionEnvironment.fromElements("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
MapOperator<String, String> result &#61; dataSource.map(new RichMapFunction<String, String>() {
private IntCounter numlines &#61; new IntCounter();
&#64;Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.getRuntimeContext().addAccumulator("num", this.numlines);
}
&#64;Override
public String map(String s) throws Exception {
this.numlines.add(1);
return s;
}
});
result.writeAsText("D:\\flinkcount3");
JobExecutionResult jobExecutionResult &#61; executionEnvironment.execute();
Integer num &#61; jobExecutionResult.getAccumulatorResult("num");
System.out.println("累加器的输出的结果是&#xff1a; " &#43; num);
}
}
声明&#xff1a;
文章中代码及相关语句为自己根据相应理解编写&#xff0c;文章中出现的相关图片为自己实践中的截图和相关技术对应的图片&#xff0c;若有相关异议&#xff0c;请联系删除。感谢。转载请注明出处&#xff0c;感谢。
By luoyepiaoxue2014
B站&#xff1a; https://space.bilibili.com/1523287361 点击打开链接
微博地址&#xff1a; http://weibo.com/luoyepiaoxue2014 点击打开链接