作者:sisihg_676 | 来源:互联网 | 2023-08-13 10:34
文章目录
- KeyBy
- Reduce
- Split 和Select
- split
- Select
- 需求:将kafka中数据根据某属性分割开,分成两个流
- Connect和 CoMap
- Connect
- CoMap,CoFlatMap
- Union
常见的map.flatMap,filter类比spark
KeyBy
DataStream → KeyedStream:输入必须是Tuple类型(一般通过map转换),逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
Reduce
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
//求各个渠道的累计个数
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
//reduce //sum
keyedStream.reduce{ (ch1,ch2)=>
(ch1._1,ch1._2+ch2._2)
} .print().setParallelism(1)
类比可知,spark的reduceByKey == keyBy+Reduce
flink保存累计值原理
flink是一种有状态的流计算框架
-
operator state : 主要是保存数据在流程中的处理状态,用于确保语义的exactly-once
-
keyed state : 主要保存数据在计算过程中的累计值
这两种状态都是通过checkpoint机制保存在StateBackend中,StateBackend可以选择保存在内存中(默认使用)或者保存在磁盘文件中。
Split 和Select
split
DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
Select
SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
需求:将kafka中数据根据某属性分割开,分成两个流
package kafka
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object ConsumerApp {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = KafKaUtil.getConsumer("test")
//import org.apache.flink.api.scala._ 这里要加入隐式转换
val dstream = environment.addSource(kafkaConsumer)
// dstream.print()
val logStream = dstream.split {
log =>
var flags: List[String] = null
if ("Apple".equals(log)) {
flags = List(log)
} else {
flags = List("Android")
}
flags
}
val apple = logStream.select("Apple")
apple.print("apple:").setParallelism(1)
val android = logStream.select("Android")
android.print("Android:").setParallelism(1)
environment.execute()
}
}
Connect和 CoMap
Connect
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap
这两个不是算子,只是类比 connect + map / flatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
val connStream = androidStream.connect(appleStream)
val allStream = connStream.map(
(log1: String) => log1 + "1",
(log2: String) => log2 + "2"
)
allStream.print("连接流:")
处理完后类型要一致,并且符合返回的泛型
Union
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。
val unionStream = appleStream.union(androidStream)
unionStream.print("union:")
environment.execute()
Connect与 Union 区别
1 Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2 Connect只能操作两个流,Union可以操作多个