TableAPI窗口时间语义,要配合窗口操作才能发挥作用在TableAPI和SQL中主要有两种窗口GroupWindows(分组窗口)根据时间或行计数间隔,将行聚合到有限的组(Gr
Table API 窗口
时间语义,要配合窗口操作才能发挥作用
在Table API和SQL中 主要有两种窗口
Group Windows(分组窗口)
根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数
Over Windows (开窗函数)
针对每个输入行,计算相邻行范围内的聚合
Group Windows
group Windows 是使用window(w:GroupWindow)子句定义的,并且必须有as子句 制定一个别名
为了按窗口对表进行分组,窗口的别名 必须在group by子句中,像常规的分组字段一样引用
val table = input.window([w:GroupWindow] as 'w)
.group by ('w,'a)
.select('a,'b.sum)
//滚动窗口,要用Tumble类来定义
//tumbling event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)
//tumble processing-time window
.window(Tumble over 10.minutes on 'proctime as 'w)
//滚动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows on 'proctime as 'w)
//滚动窗口,要用Slide类来定义
//Sliding event-time window
.window(Tumble over 10.minutes every 5.minutes on 'rowtime as 'w)
//Sliding processing-time window
.window(Tumble over 10.minutes every 5.minutes on 'proctime as 'w)
//滑动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows every 5.rows on 'proctime as 'w)
//会话窗口
//会话窗口用Session类来定义
//session Event-time window
.window(Session withGap 10.minutes on 'rowtime as 'w)
//Session Processing-time Window
.window(Session withGap 10.minutes on 'proctime as 'w)
Over Windows
Over window 聚合是标准SQL中已有的(over子句),可以在查询的SELECT子句中定义
Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合
Over windows 使用window 子句定义,并在select 通过别名来引用
val table = input
.window([w:OverWindow] as ‘w)
.select(‘a,’b.sum over ‘w ,’c.min over ‘w)
Table API 提供了over 类来配置Over窗口的属性
//无界Over Window
//可以在事件时间或处理时间,以及指定为时间间隔,或计数的范围内,定义Over windows
//无界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//无界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
//有界Over Windows
//有界的Over window 是用间隔的大小指定的
//有界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//有界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
SQL中的Group Windows
GroupWindows 定义在SQL查询的Group By子句中
TUMBLE(time_attr,interval)
定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度
HOP(timer_attr,interval,interval)
定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度
SESSION(time_attr,interval)
定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔
SQL中的Over Windows
用Over做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相通的分区、排序和范围
目前仅支持在当前行范围之前的窗口
ORDER BY必须在单一的时间属性上制定
select count(amount) over (
partition by user
order by proctime
rows between 2 preceding and current row)
from orders
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取数据创建DataStream
val inputStream: DataStream[String] = env.readTextFile("/Users/oumanabuka/Desktop/cem_flink_test/src/main/resources/word.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)){
override def extractTimestamp(element:SensorReading):LOng= element.timestamp*1000L
})
//创建表执行环境
val tableEnv: StreamTableEnvirOnment= StreamTableEnvironment.create(env)
//基于数据流,转换成一张表,然后进行操作
val dataTable: Table = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime as 'ts ,'temperature)
//因为table没有在sql注册表 可以采用字符串+dataTable的形式 进行类似注册
val groupResultTable = dataTable
.window(Tumble over 10.seconds on 'ts as 'tw )
.groupBy('id,'tw)
.select('id,'id.count,'tw.end)
tableEnv.registerTable("sensor",dataTable)
val groupResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| count(id) ,
| tumble_end(ts , interval '10' second)
|from
|sensor
|group by
| id,
| tumble(ts,interval '10' second)
|""".stripMargin)
//转换成流打印输出
groupResultTable.toRetractStream[(String,Long,Timestamp)].print("group result")
val overResultTable :Table = dataTable
.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'w)
.select( 'id,'ts,'id.count over 'w,'temperature.avg over 'w)
val overResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| ts ,
| count(id) over w,
| avg(temperature) over w
| from
| sensor
| window w as(
| partition by id
| order by ts
| rows between 2 preceding and current row
| )
|""".stripMargin)
overResultTable.toAppendStream[Row].print("over result")
env.execute("window sql test")
函数(Functions)
flink Table api 和sql 提供了一组用户数据转换的内置函数
用户自定义函数UDF
用户自定义函数(user-defined functions ,udf)是一个重要的特性,他们显著地扩展了查询的表达能力
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
函数通过调用registerFunctions()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它
常见的用户自定义函数如下:
标量函数(Scalar Functions)
用户自定义函数的一种 用户定义标量函数,可以讲0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须在prg.apache.flink.table.functions中扩展基类
Scalar Function并实现(一个或多个)求值(ecal)方法
标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval
//自定义标量函数
class HashCode(factory :Int)extends ScalarFunction{
//必须要实现一个eval方法,它的参数是当前传入的字段 输出是一个Int类型的hash值 注意eval方法的参数 是传入字段 与类的行参无关
def eval(str:String):Int={
str.hashCode*factory
}
}
使用自定义标量函数
val hashCode = new HashCode(10)
val resultTable = dataTable
.select('id ,'ts ,hashCode('id) )
//sql实现
tableEnv.registerFunction("hashcode",hashCode)
val resultSqlTable = tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|hashcode(id)
|from
|sensor
|""".stripMargin)
表函数(Table Functions)
用户定义的表函数,也可以将0、1或多个标量值作为输入函数,与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值 有点像 行转列
为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction 并实现(一个或多个)求值方法
表函数的行为由其求值方法决定,求值方法必须是public的并命名为eval
自定义表函数
//自定义表函数table function 对一个string 输出用某个分割符切分之后的(word wordlength) 这里的[]里指定的是输出的类型
class Split(sparator:String) extends TableFunction[(String,Int)]{
def eval(str:String):Unit={
str.split(sparator).foreach(
word=>collect((word,word.length))
)
}
}
使用自定义表函数(这里因为自定义表函数输出了一个类似于表的结构 所以要用later的方法调
//创建udf实例
val split = new Split("_")
//调用Table api TableFunction使用的时候,需用调用joinLateral方法 或者leftOuterJoinLateral 类似于之前sql 中的 lateral view 那种交叉连接
val tableFunctiOnreusltTable= dataTable
.joinLateral(split('id) as ('word,'length))
.select('id,'ts,'word,'length)
//SQL实现
tableEnv.registerFunction("split",split)
val tableFunctiOnresultSqlTable= tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|word ,
|length
|from
|sensor ,
|lateral table(split(id)) as splitid(word,length)
|""".stripMargin)
tableFunctionreusltTable.toAppendStream[Row].print("table function result")
聚合函数(Aggregate Functions)
用户自定义聚合函数可以把表中的数据,聚合成一个标量值
用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的
工作原理:
1.需要一个累加器,用来保存聚合中间结果的数据结构
可以通过createAccumulator()方法创建空累加器
随后对每个行调用函数的accumulate()方法来更新累加器
处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果
定义聚合函数
class MyAgg() extends AggregateFunction[Int,Double]{
override def get Value(accumulator :(String,Double)):Int ={
}
override def createAccumulator():Double={
}
def accumulate(accumulator:,Double,field:String):Unit{
}
}
表聚合函数(Table Aggregate Functions)
用户定义的表聚合函数(User-Defined Table Aggregate Functions UDTAGGs),可以把一个表中数据,聚合为具有多行多列的结果表
用户定义表聚合函数,是通过继承TableAggregateFunction抽象类来事项的
跟聚合函数类似 无非是getvalue 换成了emitvalue:发出数据