Apache Flink具有两个关系API – 表API和SQL – 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。
TableEnvironment这TableEnvironment是Table API和SQL集成的核心概念。它负责:
一个TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironment或ExecutionEnvironment与可选的TableConfig。该TableConfig可用于配置TableEnvironment或定制查询优化和翻译过程
代码:
//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
TableEnvironment维护一个表的目录,这些表是按名称注册的。有两种类型的表、输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可以用来将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:
一个输出表可以被注册使用TableSink
代码:
val tableEnv = TableEnvironment.getTableEnvironment(env)
val projTable: Table = tableEnv.scan("X").select(...)
//注册表
tableEnv.registerTable("projectedTable", projTable)
一个已注册的表可以用来将表API或SQL查询的结果发送到外部存储系统,比如数据库、键值存储、消息队列或文件系统(在不同的编码中,例如CSV、Apache Parquet、Avro……)。
说白了就是:table sink的作用就是如何将flink sql查询的数据保存到外部系统,如hdfs或者本地文件,数据库,hbase等。
val tableEnv = TableEnvironment.getTableEnvironment(env)
SQL 语句
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
FlinkSQL,它实现了SQL标准。SQL查询被指定为常规字符串。
SQL文档描述了Flink对流式和批处理表的SQL支持。
主要包括:sqlQuery和sqlUpdate
sqlQuery:主要用于sql查询
sqlUpdate:用于删除,更新等操作
案例一:如何指定一个查询并将结果作为一张表返回
val tableEnv = TableEnvironment.getTableEnvironment(env)
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
案例二:指定一个更新查询,该查询将其结果插入到已注册表中
val tableEnv = TableEnvironment.getTableEnvironment(env)
flinkSQL执行计划
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
表API和SQL查询将转换为DataStream或DataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:
表API和SQL查询可以轻松集成并嵌入到DataStream和DataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或加入元数据,然后使用DataStream或进一步处理数据。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。这种相互作用可以通过将一个DataStream或DataSet转换为一个Table来实现,反之亦然。
scala表API功能的隐式转换DataSet,DataStream以及Table类。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._ Scala DataStream API 之外,还可以通过导入包来启用这些转换
注:flink编程必须导入import org.apache.flink.api.scala._,flinkSQL编程必须导入import org.apache.flink.table.api._
我们可以通过TableEnvironment将获得数据源的DataStream或DataSet转化成Table,在使用flinkSQL的时候这样将会十分便捷。
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream
Table table1 = tableEnv.fromDataStream(stream);
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
一个Table可以转换为DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。
当转换一个Table成DataStream或DataSet,需要指定将所得的数据类型DataStream或DataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:
一个Table是流媒体查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。
将一个Table转换为一个DataStream有两种模式:
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
val tableEnv = TableEnvironment.getTableEnvironment(env)
flinkSQL语法
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
SQL查询是使用sqlQuery()方法指定的TableEnvironment。该方法返回SQL查询的结果为Table。A Table可以在后续的SQL和Table API查询中使用,可以转换为DataSet或DataStream,也可以写入TableSink)。要访问SQL查询中的表,必须在TableEnvironment中注册它。可以从TableSource,Table,DataStream或DataSet 注册表。或者,用户还可以在TableEnvironment中注册外部目录以指定数据源的位置。
注意: Flink的SQL支持尚未完成。包含不受支持的SQL功能的查询会导致a TableException。以下部分列出了批处理和流表上SQL的受支持功能。
支持标准的ANSI SQL。Flink不支持DDL语句。
入门案例需求:
使用flinkSQL,获取文本中的用户的姓名
导入依赖
table API和SQL捆绑在flink-table
Maven工件中。必须将以下依赖项添加到项目中才能使用Table API和SQL:
此外,您需要为Flink的Scala批处理或流API添加依赖项。对于批处理查询,您需要添加:
对于流式查询,您需要添加:
数据准备
创建一个person.txt,内容如下:
kebe men
wede men
baby wemen
james men
代码
package flinkSQL
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import scala.language.postfixOps
/**
* Created by ${WangZhiHua} on 2018/11/12
*/
object sql_test {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//获取table
val tableEnv = TableEnvironment.getTableEnvironment(env)
//读取数据源
val source1 = env.readTextFile("C:/flink_data/person.txt")
val source2: DataStream[Person1] = source1.map(x=>{
val split = x.split(" ")
( Person1(split(0),split(1)))
})
//将DataStream转化成Table
val table1 = tableEnv.fromDataStream(source2)
//注册表,表名为:person
tableEnv.registerTable("person",table1)
//获取表中所有信息
val rs: Table = tableEnv.sqlQuery("select * from person ")
val stream: DataStream[String] = rs
//过滤获取name这一列的数据
.select("name")
//将表转化成DataStream
.toAppendStream[String]
stream.print()
env.execute("flinkSQL")
}
}
/**
* 定义样例类封装数据
*/
case class Person1(name:String ,score:String)
数据一
2016-07-28 13:00:01.820,000001,10.2
2016-07-28 13:00:01.260,000001,10.2
2016-07-28 13:00:02.980,000001,10.1
2016-07-28 13:00:04.330,000001,10.0
2016-07-28 13:00:05.570,000001,10.0
2016-07-28 13:00:05.990,000001,10.0
2016-07-28 13:00:14.000,000001,10.1
2016-07-28 13:00:20.000,000001,10.2
数据二
2016-07-28 13:00:01.000,000001,10.2
2016-07-28 13:00:04.000,000001,10.1
2016-07-28 13:00:07.000,000001,10.0
2016-07-28 13:00:16.000,000001,10.1
package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
/**
* Created by ${WangZhiHua} on 2018/11/13
*/
object JoinDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//获取接口传送的数据
val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//使用样例类StockTransaction封装获取的数据
val dataStreamMap1 = dataStream1.map(f => {
val tokens1 = f.split(",")
StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
//使用样例类StockSnapshot封装获取的数据
val dataStreamMap2 = dataStream2.map(f => {
val tokens2 = f.split(",")
StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
/**
* 进行双流join
* 限定范围是:3秒钟的Event time时间窗口
*/
val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.tx_code)
.equalTo(_.md_code)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinStream = joinStream.apply(new InnerJoinFunction)
innerJoinStream.name("innerJoin").print()
print("===================== end =========================")
env.execute("join demo")
}
}
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Inner Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
for(transaction <- scalaT1){
for(snapshot <- scalaT2){
out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Left Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.nonEmpty && scalaT2.isEmpty){
for(transaction <- scalaT1){
out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 将Java中的Iterable对象转换为Scala的Iterable
* scala的集合操作效率高,简洁
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Right Join要比较的是同一个key下,同一个时间窗口内的数据
*/
if(scalaT1.isEmpty && scalaT2.nonEmpty){
for(snapshot <- scalaT2){
out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
}
}
}
}
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦
扫一扫加入大数据技术交流群,了解更多大数据技术,还有免费资料等你哦