热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink实战开发flinkSQL入门大全

flinkSQL概念介绍TableAPI&SQLApacheFlink具有两个关系API–表API和SQL–用于统一流和批处理。TableAPI是Scala和Java的语言集成查询
flinkSQL概念介绍

Table API & SQL

Apache Flink具有两个关系API – 表API和SQL – 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

TableEnvironment

职责

TableEnvironment是Table API和SQL集成的核心概念。它负责:

  • 在内部目录中注册一个表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表或聚合)函数
  • 把一个DataStreamDataSet转换为一个表Table
  • 持有对ExecutionEnvironmentStreamExecutionEnvironment的引用

创建一个TableEnvironment

一个TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironmentExecutionEnvironment与可选的TableConfig。该TableConfig可用于配置TableEnvironment或定制查询优化和翻译过程

代码:

//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)

TableEnvironment中注册表

TableEnvironment维护一个表的目录,这些表是按名称注册的。有两种类型的表、输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可以用来将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:

  • 现有Table对象,通常是Table API或SQL查询的结果。
  • TableSource,访问外部数据,例如文件,数据库或消息传递系统
  •  DataStreamDataSet来自DataStream或DataSet程序。注册一个DataStreamDataSet

一个输出表可以被注册使用TableSink

代码:

val tableEnv = TableEnvironment.getTableEnvironment(env)
val projTable: Table = tableEnv.scan("X").select(...)
//注册表
tableEnv.registerTable("projectedTable", projTable)

注册一个TableSink

一个已注册的表可以用来将表API或SQL查询的结果发送到外部存储系统,比如数据库、键值存储、消息队列或文件系统(在不同的编码中,例如CSV、Apache Parquet、Avro……)。

说白了就是:table sink的作用就是如何将flink sql查询的数据保存到外部系统,如hdfs或者本地文件,数据库,hbase等。

val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
SQL 语句

FlinkSQL,它实现了SQL标准。SQL查询被指定为常规字符串。

SQL文档描述了Flink对流式和批处理表的SQL支持。

主要包括:sqlQuery和sqlUpdate

sqlQuery:主要用于sql查询

sqlUpdate:用于删除,更新等操作

案例一:如何指定一个查询并将结果作为一张表返回

val tableEnv = TableEnvironment.getTableEnvironment(env)
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)

案例二:指定一个更新查询,该查询将其结果插入到已注册表中

val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
flinkSQL执行计划

表API和SQL查询将转换为DataStreamDataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

  1. 优化逻辑计划,
  2. 转换为DataStream或DataSet程序。

table与DataStream和DataSet API集成

表API和SQL查询可以轻松集成并嵌入到DataStreamDataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或加入元数据,然后使用DataStream或进一步处理数据。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。这种相互作用可以通过将一个DataStream或DataSet转换为一个Table来实现,反之亦然。

Scala的隐式转换

scala表API功能的隐式转换DataSetDataStream以及Table类。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._   Scala DataStream API 之外,还可以通过导入包来启用这些转换

注:flink编程必须导入import org.apache.flink.api.scala._,flinkSQL编程必须导入import org.apache.flink.table.api._

将DataStream或DataSet转换为表

我们可以通过TableEnvironment将获得数据源的DataStream或DataSet转化成Table,在使用flinkSQL的时候这样将会十分便捷。

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream> stream = ...
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

将Table转换为DataStream或DataSet

一个Table可以转换为DataStreamDataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。

当转换一个TableDataStreamDataSet,需要指定将所得的数据类型DataStreamDataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

  • :字段按位置,任意数量的字段映射,支持null值,无类型安全访问。
  • POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
  • 样例Case Class:字段按位置映射,不支持null值,类型安全访问。
  • 元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
  • 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

将表转换为DataStream

一个Table是流媒体查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。

一个Table转换为一个DataStream有两种模式:

  1. 追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
  2. 缩进模式:始终可以使用此模式。它用标志编码INSERTDELETE改变boolean

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

将表转换为DataSet

val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
flinkSQL语法

SQL查询是使用sqlQuery()方法指定的TableEnvironment。该方法返回SQL查询的结果为Table。A Table可以在后续的SQL和Table API查询中使用,可以转换为DataSet或DataStream,也可以写入TableSink)。要访问SQL查询中的表,必须在TableEnvironment中注册它。可以从TableSourceTableDataStream或DataSet 注册表。或者,用户还可以在TableEnvironment中注册外部目录以指定数据源的位置。

注意: Flink的SQL支持尚未完成。包含不受支持的SQL功能的查询会导致a TableException。以下部分列出了批处理和流表上SQL的受支持功能。

支持的语法

支持标准的ANSI SQL。Flink不支持DDL语句。

入门案例

需求:

使用flinkSQL,获取文本中的用户的姓名

导入依赖

table API和SQL捆绑在flink-tableMaven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL:


org.apache.flink
flink-table_2.11
1.7.0

此外,您需要为Flink的Scala批处理或流API添加依赖项。对于批处理查询,您需要添加:


org.apache.flink
flink-scala_2.11
1.7.0

对于流式查询,您需要添加:


org.apache.flink
flink-streaming-scala_2.11
1.7.0

数据准备

创建一个person.txt,内容如下:

kebe men
wede men
baby wemen
james men

代码

package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
* Created by ${WangZhiHua} on 2018/11/12
*/
object sql_test {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
//读取数据源
val source1 = env.readTextFile("C:/flink_data/person.txt")
val source2: DataStream[Person1] = source1.map(x=>{
val split = x.split(" ")
( Person1(split(0),split(1)))
})
//将DataStream转化成Table
val table1 = tableEnv.fromDataStream(source2)
//注册表,表名为:person
tableEnv.registerTable("person",table1)
//获取表中所有信息
val rs: Table = tableEnv.sqlQuery("select * from person ")
val stream: DataStream[String] = rs
//过滤获取name这一列的数据
.select("name")
//将表转化成DataStream
.toAppendStream[String]
stream.print()
env.execute("flinkSQL")
}
}
/**
* 定义样例类封装数据
*/
case class Person1(name:String ,score:String)

案例二:双流join

数据准备

数据一

2016-07-28 13:00:01.820,000001,10.2
2016-07-28 13:00:01.260,000001,10.2
2016-07-28 13:00:02.980,000001,10.1
2016-07-28 13:00:04.330,000001,10.0
2016-07-28 13:00:05.570,000001,10.0
2016-07-28 13:00:05.990,000001,10.0
2016-07-28 13:00:14.000,000001,10.1
2016-07-28 13:00:20.000,000001,10.2

数据二

2016-07-28 13:00:01.000,000001,10.2
2016-07-28 13:00:04.000,000001,10.1
2016-07-28 13:00:07.000,000001,10.0
2016-07-28 13:00:16.000,000001,10.1

代码

package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
/**
* Created by ${WangZhiHua} on 2018/11/13
*/
object JoinDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取接口传送的数据
val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//使用样例类StockTransaction封装获取的数据
val dataStreamMap1 = dataStream1.map(f => {
val tokens1 = f.split(",")
StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
//使用样例类StockSnapshot封装获取的数据
val dataStreamMap2 = dataStream2.map(f => {
val tokens2 = f.split(",")
StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
/**
* 进行双流join
* 限定范围是:3秒钟的Event time时间窗口
*/
val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.tx_code)
.equalTo(_.md_code)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinStream = joinStream.apply(new InnerJoinFunction)
innerJoinStream.name("innerJoin").print()
print("===================== end =========================")
env.execute("join demo")
}
}
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Inner Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
for(transaction <- scalaT1){
for(snapshot <- scalaT2){
out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Left Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.isEmpty){
for(transaction <- scalaT1){
out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Right Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.isEmpty && scalaT2.nonEmpty){
for(snapshot <- scalaT2){
out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
}
}
}
}

运行结果

《flink实战开发----flinkSQL入门大全》

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦

《flink实战开发----flinkSQL入门大全》

 

 

 

 

 

 


推荐阅读
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • 本文详细介绍了GetModuleFileName函数的用法,该函数可以用于获取当前模块所在的路径,方便进行文件操作和读取配置信息。文章通过示例代码和详细的解释,帮助读者理解和使用该函数。同时,还提供了相关的API函数声明和说明。 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • web.py开发web 第八章 Formalchemy 服务端验证方法
    本文介绍了在web.py开发中使用Formalchemy进行服务端表单数据验证的方法。以User表单为例,详细说明了对各字段的验证要求,包括必填、长度限制、唯一性等。同时介绍了如何自定义验证方法来实现验证唯一性和两个密码是否相等的功能。该文提供了相关代码示例。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • 本文介绍了Hive常用命令及其用途,包括列出数据表、显示表字段信息、进入数据库、执行select操作、导出数据到csv文件等。同时还涉及了在AndroidManifest.xml中获取meta-data的value值的方法。 ... [详细]
  • ESXi命令行获取帮助的方法为,常用的命令一般都是以esxcli开头,如果忘记命令可以使用帮助:esxcli-- ... [详细]
  • Flink使用java实现读取csv文件简单实例首先我们来看官方文档中给出的几种方法:首先我们来看官方文档中给出的几种方法:第一种:Da ... [详细]
  • 简介数组、CSV、表格、东西将一个数组转化为逗号为支解符的字符串(CSV)即表格数据。该源码来自于https:30secondsofcode.orgconstarrayToCSV( ... [详细]
author-avatar
一加一等于贰_661
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有