1.聚合算子简介
常见的聚合算子 sum,max,min等
聚合算子可以在在keyedStream 流上进行滚动的聚合(即累计的操作),而且同一个 keyedStream 流上只能调用一次 聚合算子
sum 示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject aggregationTest {//defined the dataSource's typecase class StockPrice(stockId:String, timeStamp:Long, price:Double)def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//generate dsval stockList = List(StockPrice("stock_1", 66666, 1), StockPrice("stock_1", 8888, 2), StockPrice("stock_2", 77777, 1), StockPrice("stock_2", 999, 3), StockPrice("stock_3", 3333, 1))val ds = env.fromCollection(stockList)//transformationval keyedStream = ds.keyBy("stockId")val sumedStream = keyedStream.sum(2)sumedStream.print()env.execute()}}
输出结果:
max示例:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//defined the dataSource's type
case class StockPrice(stockId:String, timeStamp:Long, price:Double)object maxTest {def main(args: Array[String]): Unit = {//create envval env = StreamExecutionEnvironment.getExecutionEnvironment//generate dsval stockList = List(StockPrice("stock_1", 66666, 1), StockPrice("stock_1", 8888, 2), StockPrice("stock_2", 77777, 1), StockPrice("stock_2", 999, 3), StockPrice("stock_3", 3333, 1))val ds = env.fromCollection(stockList)//transformationval keyedStream = ds.keyBy("stockId")val maxedStream = keyedStream.max(2)maxedStream.print()env.execute()}}
输出结果: