热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

快速手上FlinkSQL——Table与DataStream之间的互转

教你快速使用FlinkSQL连接kafka,以及将Table与DataStream之间的类型转换


前言


         上述讲到,成功将一个文件里的内容使用SQL
进行了一解析(快速入门Flink SQL —— 介绍及入门)本篇文章主要会跟大家分享如何连接kafka,MySQL,作为输入流和数出的操作,以及Table与DataStream进行互转。

一、将kafka作为输入流


          kafka 的连接器 flink-kafka-connector
中,1.10
版本的已经提供了 Table API 的支持。我们可以在  connect
方法中直接传入一个叫做 Kafka
的类,这就是 kafka 连接器的描述器ConnectorDescriptor

准备数据:

1,语数
2,英物
3,化生
4,文学
5,语理
6,学物

创建kafka主题

./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic FlinkSqlTest

通过命令行的方式启动一个生产者

[root@node01 bin]# ./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic FlinkSqlTest
>1,语数
>2,英物 
>3,化生
>4,文学
>5,语理\
>6,学物

编写Flink代码连接到kafka

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{CsvKafkaSchema}

/**
 * @Package
 * @author 大数据老哥
 * @date 2020/12/17 0:35
 * @version V1.0
 */


object FlinkSQLSourceKafka {
  def main(args: Array[String]): Unit = {
    // 获取流处理的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 获取table的运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    tableEnv.connect(
      new Kafka()
        .version("0.11")  // 设置kafka的版本
          .topic("FlinkSqlTest"// 设置要连接的主题
        .property("zookeeper.connect","node01:2181,node02:2181,node03:2181")  //设置zookeeper的连接地址跟端口号
        .property("bootstrap.servers","node01:9092,node02:9092,node03:9092"//设置kafka的连接地址跟端口号
    ).withFormat(new Csv()) // 设置格式
      .withSchema(new Schema()  // 设置元数据信息
        .field("id",DataTypes.STRING())
        .field("name",DataTypes.STRING())
      ).createTemporaryTable("kafkaInputTable"// 创建临时表
     //定义要查询的sql语句
    val result = tableEnv.sqlQuery("select * from  kafkaInputTable ")
    //打印数据
    result.toAppendStream[(String,String)].print()
    // 开启执行
    env.execute("source kafkaInputTable")
  }
}


运行结果图          当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是类似的。

二、表的查询

         利用外部系统的连接器 connector
,我们可以读写数据,并在环境的 Catalog 中注册表。接下来就可以对表做查询转换了。Flink 给我们提供了两种查询方式:Table API
SQL

三、Table API 的调用

         Table API 是集成在 Scala 和 Java 语言内的查询 API。与 SQL
不同,Table API 的查询不会用字符串表示,而是在宿主语言中一步一步调用完成的。         Table API 基于代表一张
的 Table 类,并提供一整套操作处理的方法 API。这些方法会返回一个新的 Table 对象
,这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作,可以由多个方法调用组成,构成链式调用结构。例如 table.select(…).filter(…) ,其中 select(…) 表示选择表中指定的字段,filter(…)表示筛选条件。代码中的实现如下:

 val kafkaInputTable = tableEnv.from("kafkaInputTable")
    kafkaInputTable.select("*")
      .filter('id !=="1")

四、SQL查询

          Flink 的 SQL 集成,基于的是 ApacheCalcite,它实现了 SQL 标准
。在 Flink 中,用常规字符串来定义 SQL 查询语句。SQL 查询的结果,是一个新的 Table

代码实现如下:

  val result = tableEnv.sqlQuery("select * from  kafkaInputTable ")

         当然,也可以加上聚合操作,比如我们统计每个用户的个数

调用 table API

     val result: Table = tableEnv.from("kafkaInputTable")
       result.groupBy("user")
       .select('name,'name.count as 'count)

调用SQL

    val result = tableEnv.sqlQuery("select  name ,count(1) as count from kafkaInputTable group by name ")

         这里 Table API 里指定的字段,前面加了一个单引号
,这是 Table API 中定义的 Expression
类型的写法,可以很方便地表示一个表中的字段。          字段可以直接全部用双引号引起来
,也可以用半边单引号+字段名
的方式。以后的代码中,一般都用后一种形式

五、将DataStream 转成Table

         Flink 允许我们把 Table
DataStream
做转换:我们可以基于一个 DataStream,先流式地读取数据源,然后 map 成样例类,再把它转成 Table。Table 的列字段(column fields),就是样例类里的字段,这样就不用再麻烦地定义 schema 了。

5.1、代码实现

         代码中实现非常简单,直接用  tableEnv.fromDataStream() 就可以了。默认转换后的 Table schema
DataStream
中的字段定义一一对应,也可以单独指定出来。

       这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次 map 操作(或者 Table API 的 select 操作)。

代码具体如下:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._

/**
 * @Package
 * @author 大数据老哥
 * @date 2020/12/17 21:21
 * @version V1.0
 */

object FlinkSqlReadFileTable {

  def main(args: Array[String]): Unit = {
    // 构建流处理运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 构建table运行环境
    val tableEnv = StreamTableEnvironment.create(env)
    // 使用流处理来读取数据
    val readData = env.readTextFile("./data/word.txt")
    // 使用flatMap进行切分
    val word: DataStream[String] = readData.flatMap(_.split(" "))
    // 将word 转为 table
    val table = tableEnv.fromDataStream(word)
    // 计算wordcount
    val wordCount = table.groupBy("f0").select('f0'f0.count as 'count)
    wordCount.printSchema()
    //转换成流处理打印输出
    tableEnv.toRetractStream[(String,Long)](wordCount).print()
    env.execute("FlinkSqlReadFileTable")
  }
}

5.2 数据类型与 Table schema 的对应

         DataStream 中的数据类型,与表的 Schema
之间的对应关系,是按照样例类中的字段名来对应的(name-based mapping
),所以还可以用 as 做重命名。  

           另外一种对应方式是,直接按照字段的位置来对应(position-based mapping),对应的过程中,就可以直接指定新的字段名了。

基于名称的对应:

val userTable = tableEnv.fromDataStream(dataStream,'username as 'name,'id as 'myid)

基于位置的对应:

val userTable = tableEnv.fromDataStream(dataStream, 'name'id)

         Flink 的 DataStream 和 DataSet API 支持多种类型。组合类型,比如元组
(内置 Scala 和 Java 元组)、POJO
Scala case
类和 Flink 的 Row
类型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类型,则被视为原子类型。

        元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应,也是可以的:元组类型,默认的名称是_1
, _2
;而原子类型,默认名称是 f0

六、创建临时视图(Temporary View)

         创建临时视图的第一种方式,就是直接从 DataStream 转换而来。同样,可以直接对应字段转换;也可以在转换的时候,指定相应的字段。代码如下:

tableEnv.createTemporaryView("sensorView", dataStream) 
tableEnv.createTemporaryView("sensorView", dataStream, 'id'temperature,'timestamp as 'ts)

另外,当然还可以基于 Table 创建视图:

tableEnv.createTemporaryView("sensorView", sensorTable)

         View 和 Table 的 Schema 完全相同。事实上,在 Table API 中,可以认为 View 和 Table是等价的。

总结

          上述文章了主要讲解了以kafka方式作为输入流进行流失处理,其实我也可以设置MySQL、ES、MySQL 等,都是类似的,以及table API 与sql之间的区别,还讲解了DataStream转换位Table 或者Table 转换为DataStream这样的或我们后面在做数据分析的时候就非常简单了,这篇文章到这里就结束了,喜欢的朋友可以给个一键三连
。到现在为止只分享了作为输入流还没分享输出流,后面会持续发布,我们下期见。

微信公众号搜索【大数据老哥】可以获取 200个为你定制的简历模板、大数据面试题、企业面试
题.....等等。

资源获取



往期推荐

十分钟入门Fink SQL


做为程序员必备的十软件(让你的工作效率瞬间爆棚)


如何在面试中介绍自己的项目经验


点个 在看 你最好看



推荐阅读
  • 电商高并发解决方案详解
    本文以京东为例,详细探讨了电商中常见的高并发解决方案,包括多级缓存和Nginx限流技术,旨在帮助读者更好地理解和应用这些技术。 ... [详细]
  • Python3爬虫入门:pyspider的基本使用[python爬虫入门]
    Python学习网有大量免费的Python入门教程,欢迎大家来学习。本文主要通过爬取去哪儿网的旅游攻略来给大家介绍pyspid ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 本文将详细探讨 Python 编程语言中 sys.argv 的使用方法及其重要性。通过实际案例,我们将了解如何在命令行环境中传递参数给 Python 脚本,并分析这些参数是如何被处理和使用的。 ... [详细]
  • 如何在Django框架中实现对象关系映射(ORM)
    本文介绍了Django框架中对象关系映射(ORM)的实现方式,通过ORM,开发者可以通过定义模型类来间接操作数据库表,从而简化数据库操作流程,提高开发效率。 ... [详细]
  • Logging all MySQL queries into the Slow Log
    MySQLoptionallylogsslowqueriesintotheSlowQueryLog–orjustSlowLog,asfriendscallit.However,Thereareseveralreasonstologallqueries.Thislistisnotexhaustive:Belowyoucanfindthevariablestochange,astheyshouldbewritteninth ... [详细]
  • Kafka入门指南
    本文将详细介绍如何在CentOS 7上安装和配置Kafka,包括必要的环境准备、JDK和Zookeeper的配置步骤。 ... [详细]
  • 深入解析Unity3D游戏开发中的音频播放技术
    在游戏开发中,音频播放是提升玩家沉浸感的关键因素之一。本文将探讨如何在Unity3D中高效地管理和播放不同类型的游戏音频,包括背景音乐和效果音效,并介绍实现这些功能的具体步骤。 ... [详细]
  • ArcBlock 发布 ABT 节点 1.0.31 版本更新
    2020年11月9日,ArcBlock 区块链基础平台发布了 ABT 节点开发平台的1.0.31版本更新,此次更新带来了多项功能增强与性能优化。 ... [详细]
  • 本文详细介绍如何安装和配置DedeCMS的移动端站点,包括新版本安装、老版本升级、模板适配以及必要的代码修改,以确保移动站点的正常运行。 ... [详细]
  • Zabbix自定义监控与邮件告警配置实践
    本文详细介绍了如何在Zabbix中添加自定义监控项目,配置邮件告警功能,并解决测试告警时遇到的邮件不发送问题。 ... [详细]
  • 本文探讨了如何在PHP与MySQL环境中实现高效的分页查询,包括基本的分页实现、性能优化技巧以及高级的分页策略。 ... [详细]
  • 本文详细介绍了在Linux操作系统上安装和部署MySQL数据库的过程,包括必要的环境准备、安装步骤、配置优化及安全设置等内容。 ... [详细]
  • 本文探讨了在Windows系统中运行Apache服务器时频繁出现崩溃的问题,并提供了多种可能的解决方案和建议。错误日志显示多个子进程因达到最大请求限制而退出。 ... [详细]
author-avatar
fffas2010_734_196
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有