热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink实战开发flinkSQL入门大全

flinkSQL概念介绍TableAPI&SQLApacheFlink具有两个关系API–表API和SQL–用于统一流和批处理。TableAPI是Scala和Java的语言集成查询
flinkSQL概念介绍

Table API & SQL

Apache Flink具有两个关系API – 表API和SQL – 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

TableEnvironment

职责

TableEnvironment是Table API和SQL集成的核心概念。它负责:

  • 在内部目录中注册一个表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表或聚合)函数
  • 把一个DataStreamDataSet转换为一个表Table
  • 持有对ExecutionEnvironmentStreamExecutionEnvironment的引用

创建一个TableEnvironment

一个TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironmentExecutionEnvironment与可选的TableConfig。该TableConfig可用于配置TableEnvironment或定制查询优化和翻译过程

代码:

//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)

TableEnvironment中注册表

TableEnvironment维护一个表的目录,这些表是按名称注册的。有两种类型的表、输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可以用来将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:

  • 现有Table对象,通常是Table API或SQL查询的结果。
  • TableSource,访问外部数据,例如文件,数据库或消息传递系统
  •  DataStreamDataSet来自DataStream或DataSet程序。注册一个DataStreamDataSet

一个输出表可以被注册使用TableSink

代码:

val tableEnv = TableEnvironment.getTableEnvironment(env)
val projTable: Table = tableEnv.scan("X").select(...)
//注册表
tableEnv.registerTable("projectedTable", projTable)

注册一个TableSink

一个已注册的表可以用来将表API或SQL查询的结果发送到外部存储系统,比如数据库、键值存储、消息队列或文件系统(在不同的编码中,例如CSV、Apache Parquet、Avro……)。

说白了就是:table sink的作用就是如何将flink sql查询的数据保存到外部系统,如hdfs或者本地文件,数据库,hbase等。

val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
SQL 语句

FlinkSQL,它实现了SQL标准。SQL查询被指定为常规字符串。

SQL文档描述了Flink对流式和批处理表的SQL支持。

主要包括:sqlQuery和sqlUpdate

sqlQuery:主要用于sql查询

sqlUpdate:用于删除,更新等操作

案例一:如何指定一个查询并将结果作为一张表返回

val tableEnv = TableEnvironment.getTableEnvironment(env)
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)

案例二:指定一个更新查询,该查询将其结果插入到已注册表中

val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
flinkSQL执行计划

表API和SQL查询将转换为DataStreamDataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

  1. 优化逻辑计划,
  2. 转换为DataStream或DataSet程序。

table与DataStream和DataSet API集成

表API和SQL查询可以轻松集成并嵌入到DataStreamDataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或加入元数据,然后使用DataStream或进一步处理数据。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。这种相互作用可以通过将一个DataStream或DataSet转换为一个Table来实现,反之亦然。

Scala的隐式转换

scala表API功能的隐式转换DataSetDataStream以及Table类。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._   Scala DataStream API 之外,还可以通过导入包来启用这些转换

注:flink编程必须导入import org.apache.flink.api.scala._,flinkSQL编程必须导入import org.apache.flink.table.api._

将DataStream或DataSet转换为表

我们可以通过TableEnvironment将获得数据源的DataStream或DataSet转化成Table,在使用flinkSQL的时候这样将会十分便捷。

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream> stream = ...
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

将Table转换为DataStream或DataSet

一个Table可以转换为DataStreamDataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。

当转换一个TableDataStreamDataSet,需要指定将所得的数据类型DataStreamDataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

  • :字段按位置,任意数量的字段映射,支持null值,无类型安全访问。
  • POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
  • 样例Case Class:字段按位置映射,不支持null值,类型安全访问。
  • 元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
  • 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

将表转换为DataStream

一个Table是流媒体查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。

一个Table转换为一个DataStream有两种模式:

  1. 追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
  2. 缩进模式:始终可以使用此模式。它用标志编码INSERTDELETE改变boolean

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

将表转换为DataSet

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
flinkSQL语法

SQL查询是使用sqlQuery()方法指定的TableEnvironment。该方法返回SQL查询的结果为Table。A Table可以在后续的SQL和Table API查询中使用,可以转换为DataSet或DataStream,也可以写入TableSink)。要访问SQL查询中的表,必须在TableEnvironment中注册它。可以从TableSourceTableDataStream或DataSet 注册表。或者,用户还可以在TableEnvironment中注册外部目录以指定数据源的位置。

注意: Flink的SQL支持尚未完成。包含不受支持的SQL功能的查询会导致a TableException。以下部分列出了批处理和流表上SQL的受支持功能。

支持的语法

支持标准的ANSI SQL。Flink不支持DDL语句。

入门案例

需求:

使用flinkSQL,获取文本中的用户的姓名

导入依赖

table API和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL:


org.apache.flink
flink-table_2.11
1.7.0

此外,您需要为Flink的Scala批处理或流API添加依赖项。对于批处理查询,您需要添加:


org.apache.flink
flink-scala_2.11
1.7.0

对于流式查询,您需要添加:


org.apache.flink
flink-streaming-scala_2.11
1.7.0

数据准备

创建一个person.txt,内容如下:

kebe men
wede men
baby wemen
james men

代码

package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
* Created by ${WangZhiHua} on 2018/11/12
*/
object sql_test {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
//读取数据源
val source1 = env.readTextFile("C:/flink_data/person.txt")
val source2: DataStream[Person1] = source1.map(x=>{
val split = x.split(" ")
( Person1(split(0),split(1)))
})
//将DataStream转化成Table
val table1 = tableEnv.fromDataStream(source2)
//注册表,表名为:person
tableEnv.registerTable("person",table1)
//获取表中所有信息
val rs: Table = tableEnv.sqlQuery("select * from person ")
val stream: DataStream[String] = rs
//过滤获取name这一列的数据
.select("name")
//将表转化成DataStream
.toAppendStream[String]
stream.print()
env.execute("flinkSQL")
}
}
/**
* 定义样例类封装数据
*/
case class Person1(name:String ,score:String)

案例二:双流join

数据准备

数据一

2016-07-28 13:00:01.820,000001,10.2
2016-07-28 13:00:01.260,000001,10.2
2016-07-28 13:00:02.980,000001,10.1
2016-07-28 13:00:04.330,000001,10.0
2016-07-28 13:00:05.570,000001,10.0
2016-07-28 13:00:05.990,000001,10.0
2016-07-28 13:00:14.000,000001,10.1
2016-07-28 13:00:20.000,000001,10.2

数据二

2016-07-28 13:00:01.000,000001,10.2
2016-07-28 13:00:04.000,000001,10.1
2016-07-28 13:00:07.000,000001,10.0
2016-07-28 13:00:16.000,000001,10.1

代码

package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
/**
* Created by ${WangZhiHua} on 2018/11/13
*/
object JoinDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取接口传送的数据
val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//使用样例类StockTransaction封装获取的数据
val dataStreamMap1 = dataStream1.map(f => {
val tokens1 = f.split(",")
StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
//使用样例类StockSnapshot封装获取的数据
val dataStreamMap2 = dataStream2.map(f => {
val tokens2 = f.split(",")
StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
/**
* 进行双流join
* 限定范围是:3秒钟的Event time时间窗口
*/
val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.tx_code)
.equalTo(_.md_code)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinStream = joinStream.apply(new InnerJoinFunction)
innerJoinStream.name("innerJoin").print()
print("===================== end =========================")
env.execute("join demo")
}
}
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Inner Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
for(transaction <- scalaT1){
for(snapshot <- scalaT2){
out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Left Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.isEmpty){
for(transaction <- scalaT1){
out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Right Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.isEmpty && scalaT2.nonEmpty){
for(snapshot <- scalaT2){
out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
}
}
}
}

运行结果

《flink实战开发----flinkSQL入门大全》

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

《flink实战开发----flinkSQL入门大全》

 

 

 

 

 

 


推荐阅读
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文介绍了如何利用Struts1框架构建一个简易的四则运算计算器。通过采用DispatchAction来处理不同类型的计算请求,并使用动态Form来优化开发流程,确保代码的简洁性和可维护性。同时,系统提供了用户友好的错误提示,以增强用户体验。 ... [详细]
  • 本文详细介绍了在CentOS 6.5 64位系统上使用阿里云ECS服务器搭建LAMP环境的具体步骤。首先,通过PuTTY工具实现远程连接至服务器。接着,检查当前系统的磁盘空间使用情况,确保有足够的空间进行后续操作,可使用 `df` 命令进行查看。此外,文章还涵盖了安装和配置Apache、MySQL和PHP的相关步骤,以及常见问题的解决方法,帮助用户顺利完成LAMP环境的搭建。 ... [详细]
  • PHP预处理常量详解:如何定义与使用常量 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 本文总结了JavaScript的核心知识点和实用技巧,涵盖了变量声明、DOM操作、事件处理等重要方面。例如,通过`event.srcElement`获取触发事件的元素,并使用`alert`显示其HTML结构;利用`innerText`和`innerHTML`属性分别设置和获取文本内容及HTML内容。此外,还介绍了如何在表单中动态生成和操作``元素,以便更好地处理用户输入。这些技巧对于提升前端开发效率和代码质量具有重要意义。 ... [详细]
  • 本文探讨了 Java 中 Pair 类的历史与现状。虽然 Java 标准库中没有内置的 Pair 类,但社区和第三方库提供了多种实现方式,如 Apache Commons 的 Pair 类和 JavaFX 的 javafx.util.Pair 类。这些实现为需要处理成对数据的开发者提供了便利。此外,文章还讨论了为何标准库未包含 Pair 类的原因,以及在现代 Java 开发中使用 Pair 类的最佳实践。 ... [详细]
  • 2012年9月12日优酷土豆校园招聘笔试题目解析与备考指南
    2012年9月12日,优酷土豆校园招聘笔试题目解析与备考指南。在选择题部分,有一道题目涉及中国人的血型分布情况,具体为A型30%、B型20%、O型40%、AB型10%。若需确保在随机选取的样本中,至少有一人为B型血的概率不低于90%,则需要选取的最少人数是多少?该问题不仅考察了概率统计的基本知识,还要求考生具备一定的逻辑推理能力。 ... [详细]
  • HBase在金融大数据迁移中的应用与挑战
    随着最后一台设备的下线,标志着超过10PB的HBase数据迁移项目顺利完成。目前,新的集群已在新机房稳定运行超过两个月,监控数据显示,新集群的查询响应时间显著降低,系统稳定性大幅提升。此外,数据消费的波动也变得更加平滑,整体性能得到了显著优化。 ... [详细]
author-avatar
一加一等于贰_661
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有