热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

StreamProcessingforEveryonewithSQLandApacheFlink

Wheredidwecomefrom?Withthe 0.9.0-milestone1 release,ApacheFlinkaddedanAPItoproces-s-relati

Where did we come from?

With the 0.9.0-milestone1 release, Apache Flink added an API to process relational data with SQL-like expressions called the Table API. The central concept of this API is a Table, a structured data set or stream on which relational operations can be applied. The Table API is tightly integrated with the DataSet and DataStream API. A Table can be easily created from a DataSet or DataStream and can also be converted back into a DataSet or DataStream as the following example shows

从0.9开始,引入Table API来支持关系型操作,

Stream Processing for Everyone with SQL and Apache Flink
val execEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// obtain a DataSet from somewhere
val tempData: DataSet[(String, Long, Double)] =

// convert the DataSet to a Table
val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF)
// compute your result
val avgTempCTable: Table = tempTable
 .where('location.like("room%"))
 .select(
   ('time / (3600 * 24)) as 'day, 
   'Location as 'room, 
   (('tempF - 32) * 0.556) as 'tempC
  )
 .groupBy('day, 'room)
 .select('day, 'room, 'tempC.avg as 'avgTempC)
// convert result Table back into a DataSet and print it
avgTempCTable.toDataSet[Row].print()
Stream Processing for Everyone with SQL and Apache Flink

可以看到可以很简单的把dataset转换为Table,指定其元数据即可

然后对于table就可以进行各种关系型操作,

最后还可以把Table再转换回dataset

Although the example shows Scala code, there is also an equivalent Java version of the Table API. The following picture depicts the original architecture of the Table API.

Stream Processing for Everyone with SQL and Apache Flink

对于table的关系型操作,最终通过code generation还是会转换为dataset的逻辑

 

Table API joining forces with SQL

the community was also well aware of the multitude of dedicated “SQL-on-Hadoop” solutions in the open source landscape (Apache Hive, Apache Drill,Apache Impala, Apache Tajo, just to name a few).

Given these alternatives, we figured that time would be better spent improving Flink in other ways than implementing yet another SQL-on-Hadoop solution.

What we came up with was a revised architecture for a Table API that supports SQL (and Table API) queries on streaming and static data sources. 
We did not want to reinvent the wheel and decided to build the new Table API on top of Apache Calcite, a popular SQL parser and optimizer framework. Apache Calcite is used by many projects including Apache Hive, Apache Drill, Cascading, and many more. Moreover, the Calcite community put SQL on streams on their roadmap which makes it a perfect fit for Flink’s SQL interface.

虽然社区已经有很多的Sql-on-Hadoop方案,flink希望把时间花在更有价值的地方,而不是再实现一套

但是当前这样的需要非常强烈,所以在revise Table API的基础上实现对SQL的支持  
对于SQL的支持,借助于Calcite,并且Calcite已经把SQL on streams放在roadmap上,有希望成为streaming sql的标准

Calcite is central in the new design as the following architecture sketch shows:

Stream Processing for Everyone with SQL and Apache Flink

The new architecture features two integrated APIs to specify relational queries, the Table API and SQL. 
Queries of both APIs are validated against a catalog of registered tables and converted into Calcite’s representation for logical plans. 
In this representation, stream and batch queries look exactly the same. 
Next, Calcite’s cost-based optimizer applies transformation rules and optimizes the logical plans. 
Depending on the nature of the sources (streaming or static) we use different rule sets. 
Finally, the optimized plan is translated into a regular Flink DataStream or DataSet program. This step involves again code generation to compile relational expressions into Flink functions.

这里Table API和SQL都统一的转换为Calcite的逻辑plans,然后再通过Calcite Optimizer进行优化,最终通过code generation转换为Flink的函数

With this effort, we are adding SQL support for both streaming and static data to Flink. 
However, we do not want to see this as a competing solution to dedicated, high-performance SQL-on-Hadoop solutions, such as Impala, Drill, and Hive. 
Instead, we see the sweet spot of Flink’s SQL integration primarily in providing access to streaming analytics to a wider audience. 
In addition, it will facilitate integrated applications that use Flink’s API’s as well as SQL while being executed on a single runtime engine

再次说明,支持SQL并不是为了再造一个专用的SQL-on-Hadoop solutions;而是为了让更多的人可以来使用Flink,说白了,这块不是当前的战略重点

 

How will Flink’s SQL on streams look like?

So far we discussed the motivation for and architecture of Flink’s stream SQL interface, but how will it actually look like?

Stream Processing for Everyone with SQL and Apache Flink
// get environments
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(execEnv)

// configure Kafka connection
val kafkaProps = ...
// define a JSON encoded Kafka topic as external table
val sensorSource = new KafkaJsonSource[(String, Long, Double)](
    "sensorTopic",
    kafkaProps,
    ("location", "time", "tempF"))

// register external table
tableEnv.registerTableSource("sensorData", sensorSource)

// define query in external table
val roomSensors: Table = tableEnv.sql(
    "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " +
    "FROM sensorData " +
    "WHERE location LIKE 'room%'"
  )

// define a JSON encoded Kafka topic as external sink
val roomSensorSink = new KafkaJsonSink(...)

// define sink for room sensor data and execute query
roomSensors.toSink(roomSensorSink)
execEnv.execute()
Stream Processing for Everyone with SQL and Apache Flink

跟Table API相比,可以通过纯粹的SQL来做相应的操作

 

当前SQL不支持,windows aggregation,

但是Calcite的Streaming SQL是支持的,比如,

Stream Processing for Everyone with SQL and Apache Flink
SELECT STREAM 
  TUMBLE_END(time, INTERVAL '1' DAY) AS day, 
  location AS room, 
  AVG((tempF - 32) * 0.556) AS avgTempC
FROM sensorData
WHERE location LIKE 'room%'
GROUP BY TUMBLE(time, INTERVAL '1' DAY), location
Stream Processing for Everyone with SQL and Apache Flink

可以用Table API实现,

val avgRoomTemp: Table = tableEnv.ingest("sensorData")
  .where('location.like("room%"))
  .partitionBy('location)
  .window(Tumbling every Days(1) on 'time as 'w)
  .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)

 

What’s up next?

The Flink community is actively working on SQL support for the next minor version Flink 1.1.0. In the first version, SQL (and Table API) queries on streams will be limited to selection, filter, and union operators. Compared to Flink 1.0.0, the revised Table API will support many more scalar functions and be able to read tables from external sources and write them back to external sinks. A lot of work went into reworking the architecture of the Table API and integrating Apache Calcite.

In Flink 1.2.0, the feature set of SQL on streams will be significantly extended. Among other things, we plan to support different types of window aggregates and maybe also streaming joins. For this effort, we want to closely collaborate with the Apache Calcite community and help extending Calcite’s support for relational operations on streaming data when necessary.

1.2会有window aggregates和streaming joins,值得期待。。。

推荐阅读
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • 微软评估和规划(MAP)的工具包介绍及应用实验手册
    本文介绍了微软评估和规划(MAP)的工具包,该工具包是一个无代理工具,旨在简化和精简通过网络范围内的自动发现和评估IT基础设施在多个方案规划进程。工具包支持库存和使用用于SQL Server和Windows Server迁移评估,以及评估服务器的信息最广泛使用微软的技术。此外,工具包还提供了服务器虚拟化方案,以帮助识别未被充分利用的资源和硬件需要成功巩固服务器使用微软的Hyper - V技术规格。 ... [详细]
  • 【Mysql】九、Mysql高级篇 索引
    MYSQL索引一、什么是索引?二、索引数据结构1、mysql数据库的四种索引2、BTREE结构三、索引分类、创建索引、查看索引1、单值索引2、复合索引3、函数索引4、 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 本文讨论了在数据库打开和关闭状态下,重新命名或移动数据文件和日志文件的情况。针对性能和维护原因,需要将数据库文件移动到不同的磁盘上或重新分配到新的磁盘上的情况,以及在操作系统级别移动或重命名数据文件但未在数据库层进行重命名导致报错的情况。通过三个方面进行讨论。 ... [详细]
  • 本文详细介绍了MySQL表分区的创建、增加和删除方法,包括查看分区数据量和全库数据量的方法。欢迎大家阅读并给予点评。 ... [详细]
  • ALTERTABLE通过更改、添加、除去列和约束,或者通过启用或禁用约束和触发器来更改表的定义。语法ALTERTABLEtable{[ALTERCOLUMNcolu ... [详细]
  • mysql-cluster集群sql节点高可用keepalived的故障处理过程
    本文描述了mysql-cluster集群sql节点高可用keepalived的故障处理过程,包括故障发生时间、故障描述、故障分析等内容。根据keepalived的日志分析,发现bogus VRRP packet received on eth0 !!!等错误信息,进而导致vip地址失效,使得mysql-cluster的api无法访问。针对这个问题,本文提供了相应的解决方案。 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 先看官方文档TheJavaTutorialshavebeenwrittenforJDK8.Examplesandpracticesdescribedinthispagedontta ... [详细]
  • Spring框架《一》简介
    Spring框架《一》1.Spring概述1.1简介1.2Spring模板二、IOC容器和Bean1.IOC和DI简介2.三种通过类型获取bean3.给bean的属性赋值3.1依赖 ... [详细]
author-avatar
法国警方解放军_847
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有