热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink基础(二十):TableAPI和FlinkSQL(五)函数

FlinkTable和SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。系统内置函数FlinkTableAPI和SQL为用户提供

Flink Table 和 SQL内置了很多SQL中支持的函数;如果有无法满足的需要,则可以实现用户自定义的函数(UDF)来解决。


系统内置函数

Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数,Table API和SQL都已经做了实现,其它还在快速开发扩展中。

以下是一些典型函数的举例,全部的内置函数,可以参考官网介绍。



  • 比较函数

SQL:

value1 = value2

value1 > value2

Table API:

ANY1 === ANY2

ANY1 > ANY2



  • 逻辑函数

SQL:

boolean1 OR boolean2

boolean IS FALSE

NOT boolean

Table API:

BOOLEAN1 || BOOLEAN2

BOOLEAN.isFalse

!BOOLEAN



  • 算术函数

SQL:

numeric1 + numeric2

POWER(numeric1, numeric2)

Table API:

NUMERIC1 + NUMERIC2

NUMERIC1.power(NUMERIC2)



  • 字符串函数

SQL:

string1 || string2

UPPER(string)

CHAR_LENGTH(string)

Table API:

STRING1 + STRING2

STRING.upperCase()

STRING.charLength()



  • 时间函数

SQL:

DATE string

TIMESTAMP string

CURRENT_TIME

INTERVAL string range

Table API:

STRING.toDate

STRING.toTimestamp

currentTime()

NUMERIC.days

NUMERIC.minutes



  • 聚合函数

SQL:

COUNT(*)

SUM([ ALL | DISTINCT ] expression)

RANK()

ROW_NUMBER()

Table API:

FIELD.count

FIELD.sum0


UDF

用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现。


注册用户自定义函数UDF

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为Scala 的Table API注册函数。

函数通过调用registerFunction()方法在TableEnvironment中注册。当用户定义的函数被注册时,它被插入到TableEnvironment的函数目录中,这样Table API或SQL解析器就可以识别并正确地解释它。


标量函数(Scalar Functions)

用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。

为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。

在下面的代码中,我们定义自己的HashCode函数,在TableEnvironment中注册它,并在查询中调用它。

// 自定义一个标量函数
class HashCodeFunction extends ScalarFunction {
private var factor: Int = 0
override def open(context: FunctionContext): Unit = {
// 获取参数 "hashcode_factor"
// 如果不存在,则使用默认值 "12"
factor = context.getJobParameter("hashcode_factor", "12").toInt
}
def eval(s: String): Int = {
s.hashCode * factor
}
}

主函数中调用,计算sensor id的哈希值(前面部分照抄,流环境、表环境、读取source、建表):

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions.{FunctionContext, ScalarFunction}
import org.apache.flink.types.Row
object ScalarFunctionExample {
def main(args: Array[String]): Unit = {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.getConfig.addJobParameter("hashcode_factor", "31")
tEnv.createTemporaryView("sensor", stream)
// 在 Table API 里不经注册直接“内联”调用函数
tEnv.from("sensor").select(call(classOf[HashCodeFunction], $"id"))
// sql 写法
// 注册函数
tEnv.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction])
// 在 Table API 里调用注册好的函数
tEnv.from("sensor").select(call("hashCode", $"id"))
tEnv
.sqlQuery("SELECT id, hashCode(id) FROM sensor")
.toAppendStream[Row]
.print()
env.execute()
}
class HashCodeFunction extends ScalarFunction {
private var factor: Int = 0
override def open(context: FunctionContext): Unit = {
// 获取参数 "hashcode_factor"
// 如果不存在,则使用默认值 "12"
factor = context.getJobParameter("hashcode_factor", "12").toInt
}
def eval(s: String): Int = {
s.hashCode * factor
}
}
}

表函数(Table Functions)

与用户定义的标量函数类似,用户定义的表函数,可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。

为了定义一个表函数,必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是public的,并命名为eval。求值方法的参数类型,决定表函数的所有有效参数。

返回表的类型由TableFunction的泛型类型确定。求值方法使用protected collect(T)方法发出输出行。

在Table API中,Table函数需要与.joinLateral或.leftOuterJoinLateral一起使用。

joinLateral算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它的表达式)计算得到的所有行连接起来。

而leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。

在SQL中,则需要使用Lateral Table(),或者带有ON TRUE条件的左连接。

下面的代码中,我们将定义一个表函数,在表环境中注册它,并在查询中调用它。

自定义TableFunction:

// 自定义TableFunction
@FunctionHint(output = new DataTypeHint("ROW"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
// use collect(...) to emit a row
str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}

完整代码:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
object TableFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env
.fromElements(
"hello#world",
"atguigu#bigdata"
)
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
tEnv.createTemporaryView("MyTable", stream, $"s")
// 注册函数
tEnv.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
// 在 Table API 里调用注册好的函数
tEnv
.from("MyTable")
.joinLateral(call("SplitFunction", $"s"))
.select($"s", $"word", $"length")
.toAppendStream[Row]
.print()
tEnv
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $"s"))
.select($"s", $"word", $"length")
// 在 SQL 里调用注册好的函数
tEnv.sqlQuery(
"SELECT s, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(s))")
tEnv.sqlQuery(
"SELECT s, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(s)) ON TRUE")
env.execute()
}
@FunctionHint(output = new DataTypeHint("ROW"))
class SplitFunction extends TableFunction[Row] {
def eval(str: String): Unit = {
// use collect(...) to emit a row
str.split("#").foreach(s => collect(Row.of(s, Int.box(s.length))))
}
}
}

聚合函数(Aggregate Functions)

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。

上图中显示了一个聚合的例子。

假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。

AggregateFunction的工作原理如下。



  • 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。

  • 随后,对每个输入行调用函数的accumulate()方法来更新累加器。

  • 处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。

AggregationFunction要求必须实现的方法:



  • createAccumulator()

  • accumulate()

  • getValue()

除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。



  • retract()

  • merge()

  • resetAccumulator()

接下来我们写一个自定义AggregateFunction,计算一下每个sensor的平均温度值。

// 定义AggregateFunction的Accumulator
class AvgTempAcc {
var sum: Double = 0.0
var count: Int = 0
}
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count
override def createAccumulator(): AvgTempAcc = new AvgTempAcc
def accumulate(accumulator: AvgTempAcc, temp: Double): Unit ={
accumulator.sum += temp
accumulator.count += 1
}
}

接下来就可以在代码中调用了。

// 创建一个聚合函数实例
val avgTemp = new AvgTemp()
// Table API的调用
val resultTable = sensorTable
.groupBy($"id")
.aggregate(avgTemp($"temperature") as $"avgTemp")
.select($"id", $"avgTemp")
// SQL的实现
tableEnv.createTemporaryView("sensor", sensorTable)
tableEnv.registerFunction("avgTemp", avgTemp)
val resultSqlTable = tableEnv.sqlQuery(
"""
|SELECT
|id, avgTemp(temperature)
|FROM
|sensor
|GROUP BY id
""".stripMargin)
// 转换成流打印输出
resultTable.toRetractStream[(String, Double)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")

表聚合函数(Table Aggregate Functions)

用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值,现在变成了一张表。

比如现在我们需要找到表中所有饮料的前2个最高价格,即执行top2()表聚合。我们需要检查5行中的每一行,得到的结果将是一个具有排序后前2个值的表。

用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。

TableAggregateFunction的工作原理如下。



  • 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。

  • 随后,对每个输入行调用函数的accumulate()方法来更新累加器。

  • 处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。

AggregationFunction要求必须实现的方法:



  • createAccumulator()

  • accumulate()

除了上述方法之外,还有一些可选择实现的方法。



  • retract()

  • merge()

  • resetAccumulator()

  • emitValue()

  • emitUpdateWithRetract()

接下来我们写一个自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值。

// 先定义一个 Accumulator
class Top2TempAcc{
var highestTemp: Double = Int.MinValue
var secondHighestTemp: Double = Int.MinValue
}
// 自定义 TableAggregateFunction
class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc]{
override def createAccumulator(): Top2TempAcc = new Top2TempAcc
def accumulate(acc: Top2TempAcc, temp: Double): Unit ={
if( temp > acc.highestTemp ){
acc.secOndHighestTemp= acc.highestTemp
acc.highestTemp = temp
} else if( temp > acc.secondHighestTemp ){
acc.secOndHighestTemp= temp
}
}
def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit ={
out.collect(acc.highestTemp, 1)
out.collect(acc.secondHighestTemp, 2)
}
}

接下来就可以在代码中调用了。

// 创建一个表聚合函数实例
val top2Temp = new Top2Temp()
// Table API的调用
val resultTable = sensorTable
.groupBy($"id")
.flatAggregate(top2Temp($"temperature") as ($"temp", $"rank"))
.select($"id", $"temp", $"rank")
// 转换成流打印输出
resultTable.toRetractStream[(String, Double, Int)].print("agg temp")
resultSqlTable.toRetractStream[Row].print("agg temp sql")

 



推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文讨论了编写可保护的代码的重要性,包括提高代码的可读性、可调试性和直观性。同时介绍了优化代码的方法,如代码格式化、解释函数和提炼函数等。还提到了一些常见的坏代码味道,如不规范的命名、重复代码、过长的函数和参数列表等。最后,介绍了如何处理数据泥团和进行函数重构,以提高代码质量和可维护性。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • 本文介绍了Oracle存储过程的基本语法和写法示例,同时还介绍了已命名的系统异常的产生原因。 ... [详细]
  • Java SE从入门到放弃(三)的逻辑运算符详解
    本文详细介绍了Java SE中的逻辑运算符,包括逻辑运算符的操作和运算结果,以及与运算符的不同之处。通过代码演示,展示了逻辑运算符的使用方法和注意事项。文章以Java SE从入门到放弃(三)为背景,对逻辑运算符进行了深入的解析。 ... [详细]
  • 从Oracle安全移植到国产达梦数据库的DBA实践与攻略
    随着我国对信息安全和自主可控技术的重视,国产数据库在党政机关、军队和大型央企等行业中得到了快速应用。本文介绍了如何降低从Oracle到国产达梦数据库的技术门槛,保障用户现有业务系统投资。具体包括分析待移植系统、确定移植对象、数据迁移、PL/SQL移植、校验移植结果以及应用系统的测试和优化等步骤。同时提供了移植攻略,包括待移植系统分析和准备移植环境的方法。通过本文的实践与攻略,DBA可以更好地完成Oracle安全移植到国产达梦数据库的工作。 ... [详细]
  • Summarize function is doing alignment without timezone ?
    Hi.Imtryingtogetsummarizefrom00:00otfirstdayofthismonthametric, ... [详细]
  • linux时间字符串转正常时间 ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • MySQL 数据库基础学习 一、SQL的作用及分类 二、数据类型 三、存储引擎  (建库建表、数据插入等))
    MySQL 数据库基础学习 一、SQL的作用及分类 二、数据类型 三、存储引擎 (建库建表、数据插入等)) ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • ZendCache:让网站飞起来
    php教程|php手册zend,ZendCache,PHP,Cache,comphp教程-php手册对商业网站来说,效率是十分重要的。在一些电子商务的站点上,服务器的响应速度决定了 ... [详细]
author-avatar
咪咪的小天地计_395
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有