热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink(10)flinksql窗口函数及其他常见函数

TableAPI窗口时间语义,要配合窗口操作才能发挥作用在TableAPI和SQL中主要有两种窗口GroupWindows(分组窗口)根据时间或行计数间隔,将行聚合到有限的组(Gr

Table API 窗口

时间语义,要配合窗口操作才能发挥作用

在Table API和SQL中 主要有两种窗口

Group Windows(分组窗口)

根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

Over Windows (开窗函数)

针对每个输入行,计算相邻行范围内的聚合

 

Group Windows

group Windows 是使用window(w:GroupWindow)子句定义的,并且必须有as子句 制定一个别名

为了按窗口对表进行分组,窗口的别名 必须在group by子句中,像常规的分组字段一样引用

val table = input.window([w:GroupWindow] as 'w)
.group by ('w,'a)
.select('a,'b.sum)
//滚动窗口,要用Tumble类来定义
//tumbling event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)
//tumble processing-time window
.window(Tumble over 10.minutes on 'proctime as 'w)
//滚动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows on 'proctime as 'w)
//滚动窗口,要用Slide类来定义
//Sliding event-time window
.window(Tumble over 10.minutes every 5.minutes on 'rowtime as 'w)
//Sliding processing-time window
.window(Tumble over 10.minutes every 5.minutes on 'proctime as 'w)
//滑动计数窗口 这里指定时间 是为了按照时间排序
.window(Tumble over 10.rows every 5.rows on 'proctime as 'w)
//会话窗口
//会话窗口用Session类来定义
//session Event-time window
.window(Session withGap 10.minutes on 'rowtime as 'w)
//Session Processing-time Window
.window(Session withGap 10.minutes on 'proctime as 'w)

Over Windows

Over window 聚合是标准SQL中已有的(over子句),可以在查询的SELECT子句中定义

Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合

Over windows 使用window 子句定义,并在select 通过别名来引用

val table = input

     .window([w:OverWindow] as ‘w)

     .select(‘a,’b.sum over ‘w ,’c.min over ‘w)

Table API 提供了over 类来配置Over窗口的属性

//无界Over Window
//可以在事件时间或处理时间,以及指定为时间间隔,或计数的范围内,定义Over windows
//无界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//无界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
//无界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
//有界Over Windows
//有界的Over window 是用间隔的大小指定的
//有界的事件时间over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
//有界的时间时间 计数 over window
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
//有界的处理时间over window
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

SQL中的Group Windows

GroupWindows 定义在SQL查询的Group By子句中

TUMBLE(time_attr,interval)

定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度

HOP(timer_attr,interval,interval)

定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度

SESSION(time_attr,interval)

定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔

SQL中的Over Windows

用Over做窗口聚合时,所有聚合必须在同一窗口上定义,也就是说必须是相通的分区、排序和范围

目前仅支持在当前行范围之前的窗口

ORDER BY必须在单一的时间属性上制定

select count(amount) over (

partition by user 

order by proctime

rows between 2 preceding and current row)

from orders

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//读取数据创建DataStream
val inputStream: DataStream[String] = env.readTextFile("/Users/oumanabuka/Desktop/cem_flink_test/src/main/resources/word.txt")
val dataStream: DataStream[SensorReading] = inputStream
.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)){
override def extractTimestamp(element:SensorReading):LOng= element.timestamp*1000L
})
//创建表执行环境
val tableEnv: StreamTableEnvirOnment= StreamTableEnvironment.create(env)
//基于数据流,转换成一张表,然后进行操作
val dataTable: Table = tableEnv.fromDataStream(dataStream,'id,'timestamp.rowtime as 'ts ,'temperature)
//因为table没有在sql注册表 可以采用字符串+dataTable的形式 进行类似注册
val groupResultTable = dataTable
.window(Tumble over 10.seconds on 'ts as 'tw )
.groupBy('id,'tw)
.select('id,'id.count,'tw.end)
tableEnv.registerTable("sensor",dataTable)
val groupResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| count(id) ,
| tumble_end(ts , interval '10' second)
|from
|sensor
|group by
| id,
| tumble(ts,interval '10' second)
|""".stripMargin)
//转换成流打印输出
groupResultTable.toRetractStream[(String,Long,Timestamp)].print("group result")
val overResultTable :Table = dataTable
.window(Over partitionBy 'id orderBy 'ts preceding 2.rows as 'w)
.select( 'id,'ts,'id.count over 'w,'temperature.avg over 'w)
val overResultSqlTable :Table = tableEnv.sqlQuery(
"""
|select
| id ,
| ts ,
| count(id) over w,
| avg(temperature) over w
| from
| sensor
| window w as(
| partition by id
| order by ts
| rows between 2 preceding and current row
| )
|""".stripMargin)
overResultTable.toAppendStream[Row].print("over result")
env.execute("window sql test")

 

函数(Functions)

flink Table api 和sql 提供了一组用户数据转换的内置函数

《flink(10) flink sql 窗口函数 及其他常见函数》

《flink(10) flink sql 窗口函数 及其他常见函数》

用户自定义函数UDF

用户自定义函数(user-defined functions ,udf)是一个重要的特性,他们显著地扩展了查询的表达能力

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用

函数通过调用registerFunctions()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它

常见的用户自定义函数如下:

标量函数(Scalar Functions)

用户自定义函数的一种 用户定义标量函数,可以讲0、1或多个标量值,映射到新的标量值

为了定义标量函数,必须在prg.apache.flink.table.functions中扩展基类

Scalar Function并实现(一个或多个)求值(ecal)方法

标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval

//自定义标量函数
class HashCode(factory :Int)extends ScalarFunction{
//必须要实现一个eval方法,它的参数是当前传入的字段 输出是一个Int类型的hash值 注意eval方法的参数 是传入字段 与类的行参无关
def eval(str:String):Int={
str.hashCode*factory
}
}

使用自定义标量函数

val hashCode = new HashCode(10)
val resultTable = dataTable
.select('id ,'ts ,hashCode('id) )
//sql实现
tableEnv.registerFunction("hashcode",hashCode)
val resultSqlTable = tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|hashcode(id)
|from
|sensor
|""".stripMargin)

表函数(Table Functions)

用户定义的表函数,也可以将0、1或多个标量值作为输入函数,与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值 有点像 行转列

为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction 并实现(一个或多个)求值方法

表函数的行为由其求值方法决定,求值方法必须是public的并命名为eval

自定义表函数

//自定义表函数table function 对一个string 输出用某个分割符切分之后的(word wordlength) 这里的[]里指定的是输出的类型
class Split(sparator:String) extends TableFunction[(String,Int)]{
def eval(str:String):Unit={
str.split(sparator).foreach(
word=>collect((word,word.length))
)
}
}

使用自定义表函数(这里因为自定义表函数输出了一个类似于表的结构 所以要用later的方法调

//创建udf实例
val split = new Split("_")
//调用Table api TableFunction使用的时候,需用调用joinLateral方法 或者leftOuterJoinLateral 类似于之前sql 中的 lateral view 那种交叉连接
val tableFunctiOnreusltTable= dataTable
.joinLateral(split('id) as ('word,'length))
.select('id,'ts,'word,'length)
//SQL实现
tableEnv.registerFunction("split",split)
val tableFunctiOnresultSqlTable= tableEnv.sqlQuery(
"""
|select
|id ,
|ts ,
|word ,
|length
|from
|sensor ,
|lateral table(split(id)) as splitid(word,length)
|""".stripMargin)
tableFunctionreusltTable.toAppendStream[Row].print("table function result")

聚合函数(Aggregate Functions)

用户自定义聚合函数可以把表中的数据,聚合成一个标量值

用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的

工作原理:

1.需要一个累加器,用来保存聚合中间结果的数据结构

可以通过createAccumulator()方法创建空累加器

随后对每个行调用函数的accumulate()方法来更新累加器  

处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果

定义聚合函数

class MyAgg() extends AggregateFunction[Int,Double]{
override def get Value(accumulator :(String,Double)):Int ={
}
override def createAccumulator():Double={
}
def accumulate(accumulator:,Double,field:String):Unit{

}
}

表聚合函数(Table Aggregate Functions)

用户定义的表聚合函数(User-Defined Table Aggregate Functions UDTAGGs),可以把一个表中数据,聚合为具有多行多列的结果表

用户定义表聚合函数,是通过继承TableAggregateFunction抽象类来事项的

 跟聚合函数类似 无非是getvalue 换成了emitvalue:发出数据


推荐阅读
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
  • [转]doc,ppt,xls文件格式转PDF格式http:blog.csdn.netlee353086articledetails7920355确实好用。需要注意的是#import ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 检查在所有可能的“?”替换中,给定的二进制字符串中是否出现子字符串“10”带 1 或 0 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • 本文详细介绍了如何使用Python中的smtplib库来发送带有附件的邮件,并提供了完整的代码示例。作者:多测师_王sir,时间:2020年5月20日 17:24,微信:15367499889,公司:上海多测师信息有限公司。 ... [详细]
author-avatar
倩女墨镜aaaaa_259
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有