热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink(10)flinksql窗口函数及其他常见函数

TableAPI窗口时间语义,要配合窗口操作才能发挥作用在TableAPI和SQL中主要有两种窗口GroupWindows(分组窗口)根据时间或行计数间隔,将行聚合到有限的组(Gr

Table API 窗口

时间语义,要配合窗口操作才能发挥作用

在Table API和SQL中 主要有两种窗口

Group Windows(分组窗口)

根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

Over Windows (开窗函数)

针对每个输入行,计算相邻行范围内的聚合

 

Group Windows

group Windows 是使用window(w:GroupWindow)子句定义的,并且必须有as子句 制定一个别名

为了按窗口对表进行分组,窗口的别名 必须在group by子句中,像常规的分组字段一样引用

val table = input.window([w:GroupWindow] as 'w)
.group by ('w,'a)
.select('a,'b.sum)
//滚动窗口,要用Tumble类来定义
//tumbling event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)
//tumble processing-time window
.window(Tumble over 10.minutes on 'proctime as 'w)
//滚动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows on 'proctime as 'w)
//滚动窗口,要用Slide类来定义
//Sliding event-time window
.window(Tumble over 10.minutes every 5.minutes on 'rowtime as 'w)
//Sliding processing-time window
.window(Tumble over 10.minutes every 5.minutes on 'proctime as 'w)
//滑动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows every 5.rows on 'proctime as 'w)
//会话窗口
//会话窗口用Session类来定义
//session Event-time window
.window(Session withGap 10.minutes on 'rowtime as 'w)
//Session Processing-time Window
.window(Session withGap 10.minutes on 'proctime as 'w)

Over Windows

Over window 聚合是标准SQL中已有的(over子句),可以在查询的SELECT子句中定义

Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合

Over windows 使用window 子句定义,并在select 通过别名来引用

val table = input

     .window([w:OverWindow] as ‘w)

     .select(‘a,’b.sum over ‘w ,’c.min over ‘w)

Table API 提供了over 类来配置Over窗口的属性

//无界Over Window
//可以在事件时间或处理时间,以及指定为时间间隔,或计数的范围内,定义Over windows
//无界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//无界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
//有界Over Windows
//有界的Over window 是用间隔的大小指定的
//有界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//有界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

SQL中的Group Windows

GroupWindows 定义在SQL查询的Group By子句中

TUMBLE(time_attr,interval)

定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度

HOP(timer_attr,interval,interval)

定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度

SESSION(time_attr,interval)

定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

SQL中的Over Windows

用Over做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相通的分区、排序和范围

目前仅支持在当前行范围之前的窗口

ORDER BY必须在单一的时间属性上制定

select count(amount) over (

partition by user 

order by proctime

rows between 2 preceding and current row)

from orders

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取数据创建DataStream
val inputStream: DataStream[String] = env.readTextFile("/Users/oumanabuka/Desktop/cem_flink_test/src/main/resources/word.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)){
override def extractTimestamp(element:SensorReading):LOng= element.timestamp*1000L
})
//创建表执行环境
val tableEnv: StreamTableEnvirOnment= StreamTableEnvironment.create(env)
//基于数据流,转换成一张表,然后进行操作
val dataTable: Table = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime as 'ts ,'temperature)
//因为table没有在sql注册表 可以采用字符串+dataTable的形式 进行类似注册
val groupResultTable = dataTable
.window(Tumble over 10.seconds on 'ts as 'tw )
.groupBy('id,'tw)
.select('id,'id.count,'tw.end)
tableEnv.registerTable("sensor",dataTable)
val groupResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| count(id) ,
| tumble_end(ts , interval '10' second)
|from
|sensor
|group by
| id,
| tumble(ts,interval '10' second)
|""".stripMargin)
//转换成流打印输出
groupResultTable.toRetractStream[(String,Long,Timestamp)].print("group result")
val overResultTable :Table = dataTable
.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'w)
.select( 'id,'ts,'id.count over 'w,'temperature.avg over 'w)
val overResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| ts ,
| count(id) over w,
| avg(temperature) over w
| from
| sensor
| window w as(
| partition by id
| order by ts
| rows between 2 preceding and current row
| )
|""".stripMargin)
overResultTable.toAppendStream[Row].print("over result")
env.execute("window sql test")

 

函数(Functions)

flink Table api 和sql 提供了一组用户数据转换的内置函数

《flink(10) flink sql 窗口函数 及其他常见函数》

《flink(10) flink sql 窗口函数 及其他常见函数》

用户自定义函数UDF

用户自定义函数(user-defined functions ,udf)是一个重要的特性,他们显著地扩展了查询的表达能力

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用

函数通过调用registerFunctions()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它

常见的用户自定义函数如下:

标量函数(Scalar Functions)

用户自定义函数的一种 用户定义标量函数,可以讲0、1或多个标量值,映射到新的标量值

为了定义标量函数,必须在prg.apache.flink.table.functions中扩展基类

Scalar Function并实现(一个或多个)求值(ecal)方法

标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval

//自定义标量函数
class HashCode(factory :Int)extends ScalarFunction{
//必须要实现一个eval方法,它的参数是当前传入的字段 输出是一个Int类型的hash值 注意eval方法的参数 是传入字段 与类的行参无关
def eval(str:String):Int={
str.hashCode*factory
}
}

使用自定义标量函数

val hashCode = new HashCode(10)
val resultTable = dataTable
.select('id ,'ts ,hashCode('id) )
//sql实现
tableEnv.registerFunction("hashcode",hashCode)
val resultSqlTable = tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|hashcode(id)
|from
|sensor
|""".stripMargin)

表函数(Table Functions)

用户定义的表函数,也可以将0、1或多个标量值作为输入函数,与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值 有点像 行转列

为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction 并实现(一个或多个)求值方法

表函数的行为由其求值方法决定,求值方法必须是public的并命名为eval

自定义表函数

//自定义表函数table function 对一个string 输出用某个分割符切分之后的(word wordlength) 这里的[]里指定的是输出的类型
class Split(sparator:String) extends TableFunction[(String,Int)]{
def eval(str:String):Unit={
str.split(sparator).foreach(
word=>collect((word,word.length))
)
}
}

使用自定义表函数(这里因为自定义表函数输出了一个类似于表的结构 所以要用later的方法调

//创建udf实例
val split = new Split("_")
//调用Table api TableFunction使用的时候,需用调用joinLateral方法 或者leftOuterJoinLateral 类似于之前sql 中的 lateral view 那种交叉连接
val tableFunctiOnreusltTable= dataTable
.joinLateral(split('id) as ('word,'length))
.select('id,'ts,'word,'length)
//SQL实现
tableEnv.registerFunction("split",split)
val tableFunctiOnresultSqlTable= tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|word ,
|length
|from
|sensor ,
|lateral table(split(id)) as splitid(word,length)
|""".stripMargin)
tableFunctionreusltTable.toAppendStream[Row].print("table function result")

聚合函数(Aggregate Functions)

用户自定义聚合函数可以把表中的数据,聚合成一个标量值

用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的

工作原理:

1.需要一个累加器,用来保存聚合中间结果的数据结构

可以通过createAccumulator()方法创建空累加器

随后对每个行调用函数的accumulate()方法来更新累加器  

处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果

定义聚合函数

class MyAgg() extends AggregateFunction[Int,Double]{
override def get Value(accumulator :(String,Double)):Int ={
}
override def createAccumulator():Double={
}
def accumulate(accumulator:,Double,field:String):Unit{

}
}

表聚合函数(Table Aggregate Functions)

用户定义的表聚合函数(User-Defined Table Aggregate Functions UDTAGGs),可以把一个表中数据,聚合为具有多行多列的结果表

用户定义表聚合函数,是通过继承TableAggregateFunction抽象类来事项的

 跟聚合函数类似 无非是getvalue 换成了emitvalue:发出数据


推荐阅读
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • C++字符字符串处理及字符集编码方案
    本文介绍了C++中字符字符串处理的问题,并详细解释了字符集编码方案,包括UNICODE、Windows apps采用的UTF-16编码、ASCII、SBCS和DBCS编码方案。同时说明了ANSI C标准和Windows中的字符/字符串数据类型实现。文章还提到了在编译时需要定义UNICODE宏以支持unicode编码,否则将使用windows code page编译。最后,给出了相关的头文件和数据类型定义。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 自动轮播,反转播放的ViewPagerAdapter的使用方法和效果展示
    本文介绍了如何使用自动轮播、反转播放的ViewPagerAdapter,并展示了其效果。该ViewPagerAdapter支持无限循环、触摸暂停、切换缩放等功能。同时提供了使用GIF.gif的示例和github地址。通过LoopFragmentPagerAdapter类的getActualCount、getActualItem和getActualPagerTitle方法可以实现自定义的循环效果和标题展示。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 学习Java异常处理之throws之抛出并捕获异常(9)
    任务描述本关任务:在main方法之外创建任意一个方法接收给定的两个字符串,把第二个字符串的长度减1生成一个整数值,输出第一个字符串长度是 ... [详细]
  • 解决文件名过长下载失败问题的jQuery方案
    本文介绍了使用jQuery解决文件名过长导致下载失败的问题。原方案中存在文件名部分丢失的问题,通过动态生成隐藏域表单并提交的方式来解决。详细的解决方案和代码示例在文章中给出。 ... [详细]
  • 本文介绍了在MFC下利用C++和MFC的特性动态创建窗口的方法,包括继承现有的MFC类并加以改造、插入工具栏和状态栏对象的声明等。同时还提到了窗口销毁的处理方法。本文详细介绍了实现方法并给出了相关注意事项。 ... [详细]
author-avatar
倩女墨镜aaaaa_259
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有