热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(10)flink中的算子流的分割与join

文章目录KeyByReduce


文章目录




      • KeyBy
      • Reduce

        • flink保存累计值原理
      • Split 和Select

        • split
        • Select

          • 需求:将kafka中数据根据某属性分割开,分成两个流
      • Connect和 CoMap

        • Connect
        • CoMap,CoFlatMap
        • Union

          • Connect与 Union 区别



常见的map.flatMap,filter类比spark

KeyBy

DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。

Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{ (ch1,ch2)=>
(ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)

类比可知,spark的reduceByKey == keyBy+Reduce

flink保存累计值原理

flink是一种有状态的流计算框架

  1. operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once

  2. keyed state : 主要保存数据在计算过程中的累计值

这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。

Split 和Select


split


DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。

Select


SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

需求:将kafka中数据根据某属性分割开,分成两个流

package kafka
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object ConsumerApp {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = KafKaUtil.getConsumer("test")
//import org.apache.flink.api.scala._ 这里要加入隐式转换
val dstream = environment.addSource(kafkaConsumer)
// dstream.print()
val logStream = dstream.split {
log =>
var flags: List[String] = null
if ("Apple".equals(log)) {
flags = List(log)
} else {
flags = List("Android")
}
flags
}
val apple = logStream.select("Apple")
apple.print("apple:").setParallelism(1)
val android = logStream.select("Android")
android.print("Android:").setParallelism(1)
environment.execute()
}
}

Connect和 CoMap


Connect


DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap,CoFlatMap

这两个不是算子,只是类比 connect + map / flatMap

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。

val connStream = androidStream.connect(appleStream)
val allStream = connStream.map(
(log1: String) => log1 + "1",
(log2: String) => log2 + "2"
)
allStream.print("连接流:")

处理完后类型要一致,并且符合返回的泛型

Union


DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。

val unionStream = appleStream.union(androidStream)
unionStream.print("union:")
environment.execute()

Connect与 Union 区别

1 Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2 Connect只能操作两个流,Union可以操作多个


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • 我知道那里有很多类似的问题,但我还没有找到任何与我的场景完全匹配的问题,所以请不要对重复标志太满意。我正在使用Spark3.0.1在AzureDatabrick ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 开发笔记:Spark Java API 之 CountVectorizer
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了SparkJavaAPI之CountVectorizer相关的知识,希望对你有一定的参考价值。 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • druid接入kafka indexing service整个流程
    先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ... [详细]
  • SparkRDD宽窄依赖及Stage划分
    1.术语解释:Master(Standalone):资源管理的主节点(进程)ClusterManager:在集群上获取资源的外部服务(例如standalone,Mesos,Yarn ... [详细]
  • Spark面试题汇总大全
    1RDD简介RDD是Spark最基本也是最根本的数据抽象,它具备像MapReduce等数据流模型的容错性,并且允许开发人员在大型集群上执行基于内存的计 ... [详细]
author-avatar
sisihg_676
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有