热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink基于Table的窗口、聚合操作

我们知道,窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在DataStream API中提供了对不同类型的窗口进行定义和处理

我们知道,窗口可以将无界流切割成大小有限的“桶”(bucket)来做计算,通过截取有限数据集来处理无限的流数据。在 DataStream API 中提供了对不同类型的窗口进行定义和处理的接口,而在 Table API 和 SQL 中,类似的功能也都可以实现。

 


1.窗口


1.1分组窗口(Group Window,1.12版本之前)

在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:

TUMBLE(ts, INTERVAL '1' HOUR)

这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下:

val result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, " +
"COUNT(url) AS cnt " +
"FROM EventTable " +
"GROUP BY " + // 使用窗口和用户名进行分组
"user, " +
"TUMBLE(ts, INTERVAL '1' HOUR)" // 定义 1 小时滚动窗口
)

这里定义了 1 小时的滚动窗口,将窗口和用户 user 一起作为分组的字段。用聚合函数

COUNT()对分组数据的个数进行了聚合统计,并将结果字段重命名为cnt;用TUPMBLE_END()函数获取滚动窗口的结束时间,重命名为 endT 提取出来。

分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。

 


1.2窗口表值函数(Window TVFs,1.13之后)

从 1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions,Windowing TVFs)来定义窗口。窗口表值函数是 Flink 定义的多态表函数(PTF),可以将表进行扩展后返回。表函数(table function)可以看作是返回一个表的函数

目前 Flink 提供了以下几个窗口 TVF:



  • 滚动窗口(Tumbling Windows);

  • 滑动窗口(Hop Windows,跳跃窗口);

  • 累积窗口(Cumulate Windows);

  • 会话窗口(Session Windows,目前尚未完全支持)。

窗口表值函数可以完全替代传统的分组窗口函数。窗口表值函数更符合 SQL 标准,性能得到了优化,拥有更强大的功能;可以支持基于窗口的复杂计算,例如窗口 Top-N、窗口联结(window join)等等。当然,目前窗口 TVF 的功能还不完善,会话窗口和很多高级功能还不支持,不过正在快速地更新完善。可以预见在未来的版本中,窗口 TVF 将越来越强大,将会是窗口处理的唯一入口。

在窗口表值函数的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3个列:“窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。

窗口表值函数在 SQL 中的声明方式,与以前的分组窗口是类似的,直接调用TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。


1.2.1滚动窗口(TUMBLE)

滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样,是长度固定、时间对齐、无重叠的窗口,一般用于周期性的统计计算。

在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口,只有一个核心参数就是窗口大小(size)。在 SQL 中不考虑计数窗口,所以滚动窗口就是滚动时间窗口,参数中还需要将当前的时间属性字段传入;另外,窗口 TVF 本质上是表函数,可以对表进行扩展,所以还应该把当前查询的表作为参数整体传入。具体声明如下:

TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)

这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。


1.2.2滑动窗口(HOP)

滑动窗口的使用与滚动窗口类似,可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口;除了也要传入表名、时间属性外,还需要传入窗口大(size)和滑动步长(slide)两个参数。HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));

这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。


1.2.3累计窗口(COMULATE)

滚动窗口和滑动窗口,可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求:我们的统计周期可能较长,因此希望中间每隔一段时间就输出一次当前的统计值;与滑动窗口不同的是,在一个统计周期内,我们会多次输出统计值,它们应该是不断叠加累积的。

例如,我们按天来统计网站的 PV(Page View,页面浏览量),如果用 1 天的滚动窗口,那需要到每天 24 点才会计算一次,输出频率太低;如果用滑动窗口,计算频率可以更高,但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是,还是按照自然日统计每天的PV,不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”(Cumulate Window)。



 

累积窗口是窗口 TVF 中新增的窗口功能,它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)。所谓的最大窗口长度其实就是我们所说的“统计周期”,最终目的就是统计这段时间内的数据。如图所示,开始时,创建的第一个窗口大小就是步长 step;之后的每个窗口都会在之前的基础上再扩展 step 的长度,直到达到最大窗口长度。在 SQL 中可以用 CUMULATE()函数来定义,具体如下:

CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))

这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。

 


2.聚合查询(Aggregation)

在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合(Aggregation)查询。Flink 中的 SQL 是流处理与标准 SQL 结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式。

 


2.1分组聚合

SQL 中一般所说的聚合我们都很熟悉,主要是通过内置的一些聚合函数来实现的,比如SUM()、MAX()、MIN()、AVG()以及 COUNT()。它们的特点是对多条输入数据进行计算,得到一个唯一的值,属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数:

val eventCountTable = tableEnv.sqlQuery("select COUNT(*) from EventTable")

而更多的情况下,我们可以通过 GROUP BY 子句来指定分组的键(key),从而对数据按照某个字段做一个分组统计。比如,可以按照用户名进行分组,统计每个用户点击 url 的次数:

SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user

这种聚合方式,就叫作“分组聚合”(group aggregation)。从概念上讲,SQL 中的分组聚合可以对应 DataStream API 中 keyBy()之后的聚合转换,它们都是按照某个 key 对数据进行了划分,各自维护状态来进行聚合统计的。在流处理中,分组聚合同样是一个持续查询,而且是一个更新查询,得到的是一个动态表;每当流中有一个新的数据到来时,都会导致结果表的更新操作。因此,想要将结果表转换成流或输出到外部系统,必须采用撤回流(retract stream)或更新插入流(upsert stream)的编码方式;如果在代码中直接转换成 DataStream 打印输出,需要调用 toChangelogStream()。

另外,在持续查询的过程中,由于用于分组的 key 可能会不断增加,因此计算结果所需要维护的状态也会持续增长。为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表环境中配置状态的生存时间(TTL):

val tableEnv = ...
// 获取表环境的配置
val tableCOnfig= tableEnv.getConfig();
// 配置状态保持时间
tableConfig.setIdleStateRetention(Duration.ofMinutes(60))
或者也可以直接设置配置项 table.exec.state.ttl:
val tableEnv
= ...
val configuration
= tableEnv.getConfig().getConfiguration()
configuration.setString(
"table.exec.state.ttl", "60 min")

 

这两种方式是等效的。需要注意,配置 TTL 有可能会导致统计结果不准确,这其实是以牺牲正确性为代价换取了资源的释放。

此外,在 Flink SQL 的分组聚合中同样可以使用 DISTINCT 进行去重的聚合处理;可以使用 HAVING 对聚合结果进行条件筛选;还可以使用 GROUPING SETS(分组集)设置多个分组情况分别统计。这些语法跟标准 SQL 中的用法一致,这里就不再详细展开了。

可以看到,分组聚合既是 SQL 原生的聚合查询,也是流处理中的聚合操作,这是实际应用中最常见的聚合方式。当然,使用的聚合函数一般都是系统内置的,如果希望实现特殊需求也可以进行自定义


2.2窗口聚合

在流处理中,往往需要将无限数据流划分成有界数据集,这就是所谓的“窗口”。上边已经介绍了窗口的声明方式,这相当于 DataStream API 中的窗口分配器(window assigner),只是明确了窗口的形式以及数据如何分配;而窗口具体的计算处理操作,在DataStream API 中还需要窗口函数(window function)来进行定义。

在 Flink 的 Table API 和 SQL 中,窗口的计算是通过“窗口聚合”(window aggregation)来实现的。与分组聚合类似,窗口聚合也需要调用 SUM()、MAX()、MIN()、COUNT()一类的聚合函数,通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时,需要将窗口信息作为分组 key 的一部分定义出来。在 Flink 1.12 版本之前,是直接把窗口自身作为分组 key 放在GROUP BY 之后的,所以也叫“分组窗口聚合”;而 1.13 版本开始使用了“窗口表值函数”(Windowing TVF),窗口本身返回的是就是一张表,所以窗口会出现在 FROM后面,GROUP BY 后面的则是窗口新增的字段 window_start 和 window_end。

比如,我们将分组窗口的聚合,用窗口 TVF 重新实现一下:

val result = tableEnv.sqlQuery(
"SELECT " +
"user, " +
"window_end AS endT, " +
"COUNT(url) AS cnt " +
"FROM TABLE( " +
"TUMBLE( TABLE EventTable, " +
"DESCRIPTOR(ts), " +
"INTERVAL '1' HOUR)) " +
"GROUP BY user, window_start, window_end "
)

这里我们以 ts 作为时间属性字段、基于 EventTable 定义了 1 小时的滚动窗口,希望统计出每小时每个用户点击 url 的次数。用来分组的字段是用户名 user,以及表示窗口的window_start 和 window_end;而 TUMBLE()是表值函数,所以得到的是一个表(Table),我们的聚合查询就是在这个 Table 中进行的。这就是 11.3.3 小节中窗口聚合的实现方式。

Flink SQL 目前提供了滚动窗口 TUMBLE()、滑动窗口 HOP()和累积窗口(CUMULATE)三种表值函数(TVF)。在具体应用中,我们还需要提前定义好时间属性。下面是一段窗口聚合的完整代码,以累积窗口为例:

package com.zhen.flink.table
import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @Author FengZhen
* @Date 10/11/22 1:57 PM
* @Description TODO
*/
object TimeAndWindowTest {
def main(args: Array[String]): Unit
= {
val env
= StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(
1)
// 创建表环境
val tableEnv = StreamTableEnvironment.create(env)
// 1.在创建表的DDL中指定时间属性字段
tableEnv.executeSql("CREATE TABLE eventTableTest (" +
" uid STRING," +
" url STRING," +
" ts BIGINT," +
" et AS TO_TIMESTAMP( FROM_UNIXTIME(ts/1000))," +
// " proc_time AS PROCTIME(), " + //处理时间
" WATERMARK FOR et AS et - INTERVAL '3' SECOND " +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/Users/FengZhen/Desktop/accumulate/0_project/flink_learn/src/main/resources/data/input/clicks.txt', " +
" 'format' = 'csv' " +
")")
// 2.在将流转换成表的时候指定时间属性字段
// 读取数据源,并分配时间戳、生成水位线
val eventStream = env
.fromElements(
Event(
"Alice", "./home", 1000L),
Event(
"Bob", "./cart", 1000L),
Event(
"Alice", "./prod?id=1", 25 * 60 * 1000L),
Event(
"Alice", "./prod?id=4", 55 * 60 * 1000L),
Event(
"Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
Event(
"Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
Event(
"Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
)
//如果数据为标准的升序数据,直接assignAscendingTimestamps
//.assignAscendingTimestamps(_.timestamp)
//如果数据为乱序数据,则assignTimestampsAndWatermarks
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(
2))
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): LOng= element.timestamp
})
)
// 将数据流转换成表,并指定时间属性
// val eventTable = tableEnv.fromDataStream(
// eventStream,
// $("url"),
// $("user").as("uid"),
// $("timestamp").as("ts"),
// $("et").rowtime() //新增一个字段,表示为当前的时间属性字段
// )
//可以直接将原有字段指定位rowtime
val eventTable = tableEnv.fromDataStream(
eventStream,
$(
"url"),
$(
"user").as("uid"),
$(
"timestamp").rowtime().as("ts"),
// $("proc_time").proctime() // 处理时间
)
/**
* 1.
* (
* `uid` STRING,
* `url` STRING,
* `ts` BIGINT,
* `et` TIMESTAMP(3) *ROWTIME* AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000)),
* `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),
* WATERMARK FOR `et`: TIMESTAMP(3) AS `et` - INTERVAL '3' SECOND
* )
*/
tableEnv.
from("eventTableTest").printSchema()
/**
* 2.
* (
* `url` STRING,
* `uid` STRING,
* `ts` TIMESTAMP(3) *ROWTIME*,
* `proc_time` TIMESTAMP_LTZ(3) *PROCTIME*
* )
*/
eventTable.printSchema()
// 3.测试累计窗口
tableEnv.createTemporaryView("eventTable", eventTable)
val resultTable
= tableEnv.sqlQuery(
"""
|SELECT
| uid, window_end as endT, count(url) as cnt
|FROM TABLE(
| CUMULATE(
| TABLE eventTable,
| DESCRIPTOR(ts),
| INTERVAL '30' MINUTE,
| INTERVAL '1' HOUR
| )
|)
|GROUP BY uid, window_start, window_end
|""".stripMargin)
// 转换成流打印输出
val resultDataStream = tableEnv.toDataStream(resultTable)
resultDataStream.print(
"resultDataStream")
env.execute()
}
}

 


2.3开窗聚合(Over)

在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数”。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。

与标准 SQL 中一致,Flink SQL 中的开窗函数也是通过 OVER 子句来实现的,所以有时开窗聚合也叫作“OVER 聚合”(Over Aggregation)。基本语法如下:

SELECT
<聚合函数> OVER (
[PARTITION BY
<字段 1>[, <字段 2>, ...]]
ORDER BY
<时间属性字段>
<开窗范围>),
...
FROM ...

这里 OVER 关键字前面是一个聚合函数,它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分:



  • PARTITION BY(可选)

用来指定分区的键(key),类似于 GROUP BY 的分组,这部分是可选的;



  • ORDER BY

OVER 窗口是基于当前行扩展出的一段数据范围,选择的标准可以基于时间也可以基于数量。无论是哪种定义,数据都应该是以某种顺序排列好的;而表中的数据本身是无序的。所以在 OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中,目前只支持按照时间属性的升序排列,所以这里 ORDER BY 后面的字段必须是定义好的时间属性。



  • 开窗范围

对于开窗函数而言,还有一个必须要指定的就是开窗的范也就是到底要扩展多少行来做聚合。围,这个范围是由 BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW,也就是定义一个“从之前某一行到当前行”的范围,所以一般的形式为:

BETWEEN ... PRECEDING AND CURRENT ROW

前面我们提到,开窗选择的范围可以基于时间,也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择:范围间隔(RANGE intervals)和行间隔(ROW intervals)。



  • 范围间隔

范围间隔以 RANGE 为前缀,就是基于 ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据:

RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW



  • 行间隔

行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据(最终聚合会包括当前行,所以一共 6 条数据):

ROWS BETWEEN 5 PRECEDING AND CURRENT ROW

下面是一个具体示例:

SELECT user, ts,
COUNT(url) OVER (
PARTITION BY user
ORDER BY ts
RANGE BETWEEN INTERVAL
'1' HOUR PRECEDING AND CURRENT ROW
) AS cnt
FROM EventTable

这里我们以 ts 作为时间属性字段,对 EventTable 中的每行数据都选取它之前 1 小时的所有数据进行聚合,统计每个用户访问 url 的总次数,并重命名为 cnt。最终将表中每行的 user,ts 以及扩展出 cnt 提取出来。

可以看到,整个开窗聚合的结果,是对每一行数据都有一个对应的聚合值,因此就像将表中扩展出了一个新的列一样。由于聚合范围上界只能到当前行,新到的数据一般不会影响之前数据的聚合结果,所以结果表只需要不断插入(INSERT)就可以了。执行上面 SQL 得到的结果表,可以用 toDataStream()直接转换成流打印输出。

开窗聚合与窗口聚合(窗口 TVF 聚合)本质上不同,不过也还是有一些相似之处的:它们都是在无界的数据流上划定了一个范围,截取出有限数据集进行聚合统计;这其实都是“窗口”的思路。事实上,在 Table API 中确实就定义了两类窗口:分组窗口(GroupWindow)和开窗窗口(OverWindow);而在 SQL 中,也可以用 WINDOW 子句来在 SELECT 外部单独定义一个 OVER 窗口:

SELECT user, ts,
COUNT(url) OVER w AS cnt,
MAX(CHAR_LENGTH(url)) OVER w AS max_url
FROM EventTable
WINDOW w AS (
PARTITION BY user
ORDER BY ts
ROWS BETWEEN
2 PRECEDING AND CURRENT ROW)

上面的 SQL 中定义了一个选取之前 2 行数据的 OVER 窗口,并重命名为 w;接下来就可以基于它调用多个聚合函数,扩展出更多的列提取出来。比如这里除统计 url 的个数外,还统计了 url 的最大长度:首先用 CHAR_LENGTH()函数计算出 url 的长度,再调用聚合函数 MAX()进行聚合统计。这样,我们就可以方便重复引用定义好的 OVER 窗口了,大大增强了代码的可读性。


2.4 TopN


2.4.1 普通TopN

在 Flink SQL 中,是通过 OVER 聚合和一个条件筛选来实现 Top N 的。具体来说,是通过将一个特殊的聚合函数ROW_NUMBER()应用到OVER窗口上,统计出每一行排序后的行号,作为一个字段提取出来;然后再用 WHERE 子句筛选行号小于等于 N 的那些行返回。

基本语法如下:

SELECT ...
FROM (
SELECT ...,
ROW_NUMBER() OVER (
[PARTITION BY
<字段 1>[, <字段 1>...]]
ORDER BY
<排序字段 1> [asc|desc][, <排序字段 2> [asc|desc]...]
) AS row_num
FROM ...)
WHERE row_num
<= N [AND <其它条件>]

这里的 OVER 窗口定义与之前的介绍基本一致,目的就是利用 ROW_NUMBER()函数为每一行数据聚合得到一个排序之后的行号。行号重命名为 row_num,并在外层的查询中以row_num <= N 作为条件进行筛选,就可以得到根据排序字段统计的 Top N 结果了。

需要对关键字额外做一些说明:



  • WHERE

用来指定 Top N 选取的条件,这里必须通过 row_num <= N 或者 row_num


  • PARTITION BY

是可选的,用来指定分区的字段,这样我们就可以针对不同的分组分别统计 Top N 了。



  • ORDER BY

指定了排序的字段,因为只有排序之后,才能进行前 N 个最大/最小的选取。每个排序字段后可以用 asc 或者 desc 来指定排序规则:asc 为升序排列,取出的就是最小的 N 个值;desc为降序排序,对应的就是最大的 N 个值。默认情况下为升序,asc 可以省略。

这是因为 OVER 窗口目前并不完善,不过针对 Top N 这样一个经典应用场景,Flink SQL专门用 OVER 聚合做了优化实现。所以只有在 Top N 的应用场景中,OVER 窗口 ORDER BY后才可以指定其他排序字段;而要想实现 Top N,就必须按照上面的格式进行定义,否则 Flink SQL 的优化器将无法正常解析。而且,目前 Table API 中并不支持 ROW_NUMBER()函数,所以也只有 SQL 中这一种通用的 Top N 实现方式。

另外要注意,Top N 的实现必须写成上面的嵌套查询形式。这是因为行号 row_num 是内部子查询聚合的结果,不可能在内部作为筛选条件,只能放在外层的 WHERE 子句中。

下面是一个具体的示例,我们统计每个用户的访问事件中,按照字符长度排序的前两个url:

SELECT user, url, ts, row_num
FROM (
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY user
ORDER BY CHAR_LENGTH(url) desc
) AS row_num
FROM EventTable)
WHERE row_num
<= 2

这里我们以用户来分组,以访问 url 的字符长度作为排序的字段,降序排列后用聚合统计出每一行的行号,这样就相当于在 EventTable 基础上扩展出了一列 row_num。而后筛选出行号小于等于 2 的所有数据,就得到了每个用户访问的长度最长的两个 url。

需要特别说明的是,这里的 Top N 聚合是一个更新查询。新数据到来后,可能会改变之前数据的排名,所以会有更新(UPDATE)操作。这是 ROW_NUMBER()聚合函数的特性决定的。因此,如果执行上面的 SQL 得到结果表,需要调用 toChangelogStream()才能转换成流打印输出。


2.4.2窗口TopN

除了直接对数据进行 Top N 的选取,我们也可以针对窗口来做 Top N。

例如电商行业,实际应用中往往有这样的需求:统计一段时间内的热门商品。这就需要先开窗口,在窗口中统计每个商品的点击量;然后将统计数据收集起来,按窗口进行分组,并按点击量大小降序排序,选取前 N 个作为结果返回。

我们已经知道,Top N 聚合本质上是一个表聚合函数,这和窗口表值函数(TVF)有天然的联系。尽管如此,想要基于窗口 TVF 实现一个通用的 Top N 聚合函数还是比较麻烦的,目前Flink SQL尚不支持。不过我们同样可以借鉴之前的思路,使用OVER窗口统计行号来实现。

具体来说,可以先做一个窗口聚合,将窗口信息 window_start、window_end 连同每个商品的点击量一并返回,这样就得到了聚合的结果表,包含了窗口信息、商品和统计的点击量。

接下来就可以像一般的 Top N 那样定义 OVER 窗口了,按窗口分组,按点击量排序,用ROW_NUMBER()统计行号并筛选前 N 行就可以得到结果。所以窗口 Top N 的实现就是窗口聚合与 OVER 聚合的结合使用。

下面是一个具体案例的代码实现。由于用户访问事件 Event 中没有商品相关信息,因此我们统计的是每小时内有最多访问行为的用户,取前两名,相当于是一个每小时活跃用户的查询。

package com.zhen.flink.table
import java.time.Duration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.$
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* @Author FengZhen
* @Date 10/17/22 3:15 PM
* @Description 窗口TopN
*/
object TopNWindowExample {
def main(args: Array[String]): Unit
= {
val env
= StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(
1)
// 创建表环境
val tableEnv = StreamTableEnvironment.create(env)
// 读取数据源,并分配时间戳、生成水位线
val eventStream = env
.fromElements(
Event(
"Alice", "./home", 1000L),
Event(
"Bob", "./cart", 1000L),
Event(
"Alice", "./prod?id=1", 25 * 60 * 1000L),
Event(
"Alice", "./prod?id=4", 55 * 60 * 1000L),
Event(
"Bob", "./prod?id=5", 3600 * 1000L + 60 * 1000L),
Event(
"Cary", "./home", 3600 * 1000L + 30 * 60 * 1000L),
Event(
"Cary", "./prod?id=7", 3600 * 1000L + 59 * 60 * 1000L)
)
//如果数据为标准的升序数据,直接assignAscendingTimestamps
//.assignAscendingTimestamps(_.timestamp)
//如果数据为乱序数据,则assignTimestampsAndWatermarks
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(
2))
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): LOng= element.timestamp
})
)
// 将数据流转换成表,并指定时间属性
val eventTable = tableEnv.fromDataStream(
eventStream,
$(
"url"),
$(
"user").as("uid"),
$(
"timestamp").as("ts"),
$(
"et").rowtime() //新增一个字段,表示为当前的时间属性字段
)
tableEnv.createTemporaryView(
"eventTable", eventTable)
// 窗口TOP N. 选取每小时内活跃度最大的前两个用户
// 1.进行窗口聚合统计,统计每个用户的访问量
val urlCountWindowTable = tableEnv.sqlQuery(
"""
|SELECT uid, COUNT(url) AS cnt, window_start, window_end
|FROM TABLE(
| TUMBLE(TABLE eventTable, DESCRIPTOR(et), INTERVAL '1' HOUR)
|)
|GROUP BY uid, window_start, window_end
|
|""".stripMargin)
tableEnv.createTemporaryView("urlCountWindowTable", urlCountWindowTable)
// 2.提取count值最大的前两个用户
val top2ResultTable = tableEnv.sqlQuery(
"""
|SELECT
| window_start, window_end, uid, cnt, row_num
|FROM(
| SELECT
| *,
| ROW_NUMBER() OVER(
| PARTITION BY window_start, window_end
| ORDER BY cnt DESC
| ) AS row_num
| FROM urlCountWindowTable
|) WHERE row_num <= 2
|""".stripMargin)

val top2ResultDataStream
= tableEnv.toChangelogStream(top2ResultTable)
top2ResultDataStream.print()
env.execute()
}
}

+I[1970-01-01T00:00, 1970-01-01T01:00, Alice, 3, 1]
+I[1970-01-01T00:00, 1970-01-01T01:00, Bob, 1, 2]
+I[1970-01-01T01:00, 1970-01-01T02:00, Cary, 2, 1]
+I[1970-01-01T01:00, 1970-01-01T02:00, Bob, 1, 2]

 



推荐阅读
author-avatar
帅帅考拉_955
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有