热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

1flink理论批处理与流处理+简单示例

【README】1.本文包含了批处理与流处理的代码示例;批处理:把数据攒在一起(或攒一段时间或攒一定内存大小)ÿ

【README】

1.本文包含了 批处理与流处理的代码示例;


  • 批处理:把数据 攒在一起(或攒一段时间或攒一定内存大小),然后再处理,这叫批处理;
  • 流处理:数据每来一个就处理一个;

2.特点:


数据处理方式特点
批处理1.高延时;
流处理1.低延时;

3.引入flink的maven依赖:

org.apache.flinkflink-java1.14.4org.apache.flinkflink-streaming-java_2.121.14.4org.apache.flinkflink-clients_2.121.14.4



【1】flink批处理离线数据(数据有限)


【1.1】代码

1)数据源,我们保存在本地文本文件中,命名为  hello.txt

hello world
hello flink
how are you
thank you
hello zhangsan
hello lisi

2)批处理代码:

/*** @Description 批处理,word count程序(离线数据)* @author xiao tang* @version 1.0.0* @createTime 2022年04月09日*/
public class WordCount {public static void main(String[] args) throws Exception {// 创建执行环境ExecutionEnvironment env &#61; ExecutionEnvironment.getExecutionEnvironment();// 从文件中读取数据String inputPath &#61; "D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt";DataSource dataSource &#61; env.readTextFile(inputPath);// 对数据集处理&#xff0c;按照空格分词展开&#xff0c;转为 (word,1) 二元组统计DataSet> resultSet &#61; dataSource.flatMap(new MyFlatMapper()).groupBy(0) // 按照第1个位置的word分组.sum(1); // 将第2个位置上的数据求和resultSet.print();}public static class MyFlatMapper implements FlatMapFunction> {&#64;Overridepublic void flatMap(String value, Collector> collector) throws Exception {// 按照空格分词String[] words &#61; value.split(" ");// 遍历所有word&#xff0c;包装成word 输出Arrays.stream(words).forEach(x->{collector.collect(new Tuple2<>(x, 1));});}}
}

批处理打印结果&#xff1a;


(you,2)
(flink,1)
(world,1)
(hello,4)
(lisi,1)
(zhangsan,1)
(are,1)
(thank,1)
(how,1)


批处理的结果是最终结果&#xff1b;




【2】flink流处理离线数据&#xff08;数据有限&#xff09;

/*** &#64;Description 流数据&#xff08;无限数据&#xff09;* &#64;author xiao tang* &#64;version 1.0.0* &#64;createTime 2022年04月09日*/
public class StreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv &#61; StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从文件中读取数据String inputPath &#61; "D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\hello.txt";DataStream dataStream &#61; streamEnv.readTextFile(inputPath);// 定义流操作DataStream> resultStream &#61; dataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务&#xff08;流终止操作&#xff09;streamEnv.execute();}
}

打印结果&#xff1a;


2> (world,1)
1> (thank,1)
2> (flink,1)
1> (hello,1)
2> (how,1)
2> (you,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (zhangsan,1)
1> (hello,4)
2> (lisi,1)
1> (are,1)


流处理的结果是一个动态变化的有状态的结果&#xff1b;

有状态的意思说白了就是&#xff1a;后面的处理结果依赖前面的处理结果&#xff0c;如对hello计数为3&#xff0c;它是在前面hello计数为2的基础上做的处理&#xff1b;




【3】flink流处理在线数据&#xff08;数据无限&#xff09;

我们引入了 netcat&#xff08;nc&#xff09;&#xff0c;底层使用socket模拟向某端口写入数据&#xff1b;

然后 flink监控该端口的数据&#xff0c;并做处理&#xff1b;


【3.1】 flink处理类

处理类监听了 nc所在机器的的端口&#xff0c;即 192.168.163.201:7777&#xff1b;

/*** &#64;Description socket文本流词计数* &#64;author xiao tang* &#64;version 1.0.0* &#64;createTime 2022年04月09日*/
public class SocketTextStreamWordCount {public static void main(String[] args) throws Exception {// 流处理执行环境StreamExecutionEnvironment streamEnv &#61; StreamExecutionEnvironment.getExecutionEnvironment();streamEnv.setParallelism(2); // 设置并行度// 从 flinkjava parametertool 获取参数&#xff08;或有&#xff09;
// ParameterTool parameterTool &#61; ParameterTool.fromArgs(args);
// String host &#61; parameterTool.get("host");
// int port &#61; parameterTool.getInt("port");// 从socket文本流读取数据DataStream inputDataStream &#61; streamEnv.socketTextStream("192.168.163.201", 7777);// 定义流操作DataStream> resultStream &#61; inputDataStream.flatMap(new WordCount.MyFlatMapper()).keyBy(0).sum(1);// 打印结果resultStream.print();// 执行任务&#xff08;流终止操作&#xff09;streamEnv.execute();}
}

演示效果&#xff1a;

 


推荐阅读
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • springmvc学习笔记(十):控制器业务方法中通过注解实现封装Javabean接收表单提交的数据
    本文介绍了在springmvc学习笔记系列的第十篇中,控制器的业务方法中如何通过注解实现封装Javabean来接收表单提交的数据。同时还讨论了当有多个注册表单且字段完全相同时,如何将其交给同一个控制器处理。 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
author-avatar
书友49457861
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有