热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(10)flink中的算子流的分割与join

文章目录KeyByReduce


文章目录




      • KeyBy
      • Reduce

        • flink保存累计值原理
      • Split 和Select

        • split
        • Select

          • 需求:将kafka中数据根据某属性分割开,分成两个流
      • Connect和 CoMap

        • Connect
        • CoMap,CoFlatMap
        • Union

          • Connect与 Union 区别



常见的map.flatMap,filter类比spark

KeyBy

DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{ (ch1,ch2)=>
(ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)

类比可知,spark的reduceByKey == keyBy+Reduce

flink保存累计值原理

flink是一种有状态的流计算框架

  1. operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once

  2. keyed state : 主要保存数据在计算过程中的累计值

这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

Split 和Select


split


DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

Select


SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

需求:将kafka中数据根据某属性分割开,分成两个流

package kafka
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object ConsumerApp {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = KafKaUtil.getConsumer("test")
//import org.apache.flink.api.scala._ 这里要加入隐式转换
val dstream = environment.addSource(kafkaConsumer)
// dstream.print()
val logStream = dstream.split {
log =>
var flags: List[String] = null
if ("Apple".equals(log)) {
flags = List(log)
} else {
flags = List("Android")
}
flags
}
val apple = logStream.select("Apple")
apple.print("apple:").setParallelism(1)
val android = logStream.select("Android")
android.print("Android:").setParallelism(1)
environment.execute()
}
}

Connect和 CoMap


Connect


DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap,CoFlatMap

这两个不是算子,只是类比 connect + map / flatMap

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

val connStream = androidStream.connect(appleStream)
val allStream = connStream.map(
(log1: String) => log1 + "1",
(log2: String) => log2 + "2"
)
allStream.print("连接流:")

处理完后类型要一致,并且符合返回的泛型

Union


DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

val unionStream = appleStream.union(androidStream)
unionStream.print("union:")
environment.execute()

Connect与 Union 区别

1 Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2 Connect只能操作两个流,Union可以操作多个


推荐阅读
  • 本文介绍了如何将包含复杂对象的字典保存到文件,并从文件中读取这些字典。 ... [详细]
  • MySQL初级篇——字符串、日期时间、流程控制函数的相关应用
    文章目录:1.字符串函数2.日期时间函数2.1获取日期时间2.2日期与时间戳的转换2.3获取年月日、时分秒、星期数、天数等函数2.4时间和秒钟的转换2. ... [详细]
  • C#实现文件的压缩与解压
    2019独角兽企业重金招聘Python工程师标准一、准备工作1、下载ICSharpCode.SharpZipLib.dll文件2、项目中引用这个dll二、文件压缩与解压共用类 ... [详细]
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Python 序列图分割与可视化编程入门教程
    本文介绍了如何使用 Python 进行序列图的快速分割与可视化。通过一个实际案例,详细展示了从需求分析到代码实现的全过程。具体包括如何读取序列图数据、应用分割算法以及利用可视化库生成直观的图表,帮助非编程背景的用户也能轻松上手。 ... [详细]
  • 在Android应用开发中,实现与MySQL数据库的连接是一项重要的技术任务。本文详细介绍了Android连接MySQL数据库的操作流程和技术要点。首先,Android平台提供了SQLiteOpenHelper类作为数据库辅助工具,用于创建或打开数据库。开发者可以通过继承并扩展该类,实现对数据库的初始化和版本管理。此外,文章还探讨了使用第三方库如Retrofit或Volley进行网络请求,以及如何通过JSON格式交换数据,确保与MySQL服务器的高效通信。 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 更新vuex的数据为什么用mutation?
    更新vuex的数据为什么用mutation?,Go语言社区,Golang程序员人脉社 ... [详细]
  • 如果应用程序经常播放密集、急促而又短暂的音效(如游戏音效)那么使用MediaPlayer显得有些不太适合了。因为MediaPlayer存在如下缺点:1)延时时间较长,且资源占用率高 ... [详细]
  • Spring Boot 中配置全局文件上传路径并实现文件上传功能
    本文介绍如何在 Spring Boot 项目中配置全局文件上传路径,并通过读取配置项实现文件上传功能。通过这种方式,可以更好地管理和维护文件路径。 ... [详细]
author-avatar
sisihg_676
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有