热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink系列之Flink中Broadcast和Counter整理和实战

title:Flink系列六、FlinkBroadcast编程实战6.1理论Flink的批处理和Spark的批处理,都支持两个非常好的特性:广播变量





title: Flink系列




六、Flink Broadcast 编程实战

6.1 理论

Flink 的批处理 和 Spark 的批处理,都支持两个非常好的特性: 广播变量 + 累加器

广播变量允许编程人员在每台机器上保持1个只读的缓存变量,而不是传送变量的副本给tasks,广播变量创建后,它可以运行在集群中的任何function上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的

一句话解释,可以理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。如果不使用broadcast,则在每个节点中的每个task中都需要拷贝一份dataset数据集,比较浪费内存(也就是一个节点中可能会存在多份dataset数据)。


用法:

// 1:初始化数据
DataSet toBroadcast = env.fromElements(1, 2, 3)
// 2:广播数据
withBroadcastSet(toBroadcast, "broadcastSetName");
// 3:获取数据
Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

注意:

1:广播出去的变量存在于每个节点的内存中,所以这个数据集不能太大。因为广播出去的数据,会常驻内存,除非程序执行结束。
2:广播变量在初始化广播出去以后不支持修改,这样才能保证每个节点的数据都是一致的。

6.2 案例

package com.aa.flinkjava.broadcast;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
* @Author AA
* @Date 2022/2/24 19:37
* @Project bigdatapre
* @Package com.aa.flinkjava.broadcast
* Flink BroadCast 测试
* 在这里做一个join的连接实现
*/

public class FlinkBroadCastDemo {
public static void main(String[] args) throws Exception {
//1、获取运行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
//2、造数据
ArrayList<Tuple2<String,Integer>> list &#61; new ArrayList<>();
list.add(new Tuple2<>("zhangsan",20));
list.add(new Tuple2<>("lisi",21));
list.add(new Tuple2<>("wangwu",22));
//3、读取造的数据
DataSource<Tuple2<String, Integer>> dataSource &#61; executionEnvironment.fromCollection(list);
dataSource.print("dataSource : ");
//4、帮tuple2转化为hashmap 。 map中的key是用户姓名&#xff0c;value是用户年龄
// DataSet>的数据类型可以直接修饰强制转换。
DataSet<HashMap<String, Integer>> toBroadcast &#61; dataSource.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
&#64;Override
public HashMap<String, Integer> map(Tuple2<String, Integer> tuple2) throws Exception {
HashMap<String, Integer> hashMap &#61; new HashMap<>();
hashMap.put(tuple2.f0,tuple2.f1);
return hashMap;
}
});
//5、再造一份 join 使用的数据
DataSource<String> data2 &#61; executionEnvironment.fromElements("zhangsan", "lisi", "wangwu");
data2.print("data2 : ");
//6、执行广播数据的一些操作
// 下面这个DataSet类型也是强制转换的的
DataSet<String> result &#61; data2.map(new RichMapFunction<String, String>() {
List<HashMap<String, Integer>> broadCastMap &#61; new ArrayList<HashMap<String, Integer>>();
HashMap<String, Integer> allMap &#61; new HashMap<String, Integer>();
&#64;Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.broadCastMap &#61; getRuntimeContext().getBroadcastVariable("bdMapName");
for (HashMap map : broadCastMap) {
allMap.putAll(map);
}
}
/**
* &#64;param s s是data2中间的一个一个的元素&#xff0c;其实就是"zhangsan", "lisi", "wangwu" 这些值
* 根据 name("zhangsan", "lisi", "wangwu") 去广播变量中匹配获取相应的年龄
* &#64;return
* &#64;throws Exception
*/

&#64;Override
public String map(String s) throws Exception {
Integer age &#61; allMap.get(s);
return s &#43; "," &#43; age; //输出拼接的结果
}
}).withBroadcastSet(toBroadcast, "bdMapName");
//7、打印输出
result.print();
}
}

七、Flink Counter 编程实战

7.1 理论

​ Accumulator 即累加器&#xff0c;与 Mapreduce Counter 的应用场景差不多&#xff0c;都能很好地观察 Task 在运行期间的数据变化。可以在 Flink job 任务中的算子函数中操作累加器&#xff0c;但是只能在任务执行结束之后才能获得累加器的最终结果。

​ Counter 是一个具体的累加器 (Accumulator) 实现&#xff1a;IntCounter, LongCounter 和 DoubleCounter

用法&#xff1a;

// 1、创建累加器
private IntCounter numlines &#61; new IntCounter();
// 2、注册累加器
getRuntimeContext().addAccumulator("num", this.numLines);
// 3、使用累加器
this.numlines.add(1);
// 4、获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num")

7.2 案例

package com.aa.flinkjava.counter;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
/**
* &#64;Author AA
* &#64;Date 2022/2/25 14:27
* &#64;Project bigdatapre
* &#64;Package com.aa.flinkjava.counter
* Flink 累加器 示例
* 统计输入数据源的流入数据的次数。
*/

public class FlinkCounterDemo {
public static void main(String[] args) throws Exception {
//1、获取运行环境
ExecutionEnvironment executionEnvironment &#61; ExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(3);
//2、读取造的数据
DataSource<String> dataSource &#61; executionEnvironment.fromElements("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
//3、定义一点逻辑&#xff0c;给累加器放进去
MapOperator<String, String> result &#61; dataSource.map(new RichMapFunction<String, String>() {
//3-1 创建累加器对象
private IntCounter numlines &#61; new IntCounter();
&#64;Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//3-2 需要注册累加器
/*
在逻辑上来说&#xff0c;相当于在这个 application应用内部定义了一个变量 num 用来做统计。
但是&#xff0c;物理上&#xff0c;其实这个 num 变量是由分散在所有 Task 内部的 numlines 组成的。
一个 num 包含了很多个 numlines。其实最终拿到的结果&#xff0c;就是把所有 Task 中的 numlines 加起来&#xff0c;就是 num 的值。
*/

this.getRuntimeContext().addAccumulator("num", this.numlines);
}
&#64;Override
public String map(String s) throws Exception {
//另外注意&#xff0c;可能有小伙伴觉得可以在这里定义普通变量统计也行&#xff0c;
// 注意&#xff1a;若并行度为1&#xff0c;使用普通的累加求和也可以&#xff0c;但是设置多个并行度&#xff0c;则普通的累加求和结果就不准啦。
//每运行一次就 向累加器中 添加1
this.numlines.add(1);
return s; //这里没有做什么逻辑&#xff0c;就是给来的数据原样输出了。但是上面统计了累加次数了。
}
});
//4、给结果输出出去
result.writeAsText("D:\\flinkcount3");
//5、执行
JobExecutionResult jobExecutionResult &#61; executionEnvironment.execute();
//6、看看累加器的结果
Integer num &#61; jobExecutionResult.getAccumulatorResult("num");
System.out.println("累加器的输出的结果是&#xff1a; " &#43; num);
}
}




声明&#xff1a;
        文章中代码及相关语句为自己根据相应理解编写&#xff0c;文章中出现的相关图片为自己实践中的截图和相关技术对应的图片&#xff0c;若有相关异议&#xff0c;请联系删除。感谢。转载请注明出处&#xff0c;感谢。



By luoyepiaoxue2014

B站&#xff1a; https://space.bilibili.com/1523287361 点击打开链接
微博地址&#xff1a; http://weibo.com/luoyepiaoxue2014 点击打开链接







推荐阅读
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 标题: ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
author-avatar
中国人TM
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有