简介: Flink 社区在集成 Hive 功能方面付出很多,目前进展也比较顺利,最近 Flink 1.10.0 RC1 版本已经发布,感兴趣的读者可以进行调研和验证功能。
作者:Jason
Apache Spark 什么时候开始支持集成 Hive 功能?笔者相信只要使用过 Spark 的读者,应该都会说这是很久以前的事情了。
那 Apache Flink 什么时候支持与 Hive 的集成呢?读者可能有些疑惑,还没有支持吧,没用过?或者说最近版本才支持,但是功能还比较弱。
其实比较也没啥意义,不同社区发展的目标总是会有差异,而且 Flink 在真正的实时流计算方面投入的精力很多。不过笔者想表达的是,Apache Hive 已经成为数据仓库生态系统的焦点,它不仅是一个用于大数据分析和 ETL 的 SQL 引擎,也是一个数据管理平台,所以无论是 Spark,还是 Flink,或是 Impala、Presto 等,都会积极地支持集成 Hive 的功能。
的确,对真正需要使用 Flink 访问 Hive 进行数据读写的读者会发现,Apache Flink 1.9.0 版本才开始提供与 Hive 集成的功能。不过,值得欣慰的是,Flink 社区在集成 Hive 功能方面付出很多,目前进展也比较顺利,最近 Flink 1.10.0 RC1 版本已经发布,感兴趣的读者可以进行调研和验证功能。
架构设计
首先,笔者基于社区公开的资料以及博客,概括性地讲解 Flink 集成 Hive 的架构设计。
Apache Flink 与 Hive 集成的目的,主要包含了元数据和实际表数据的访问。
元数据
为了访问外部系统的元数据,Flink 刚开始提供了 ExternalCatalog 的概念。但是 ExternalCatalog 的定义非常不完整,基本处于不可用的状态。Flink 1.10 版本正式删除了 ExternalCatalog API (FLINK-13697),这包括:
针对 ExternalCatalog 的问题,Flink 社区提出了一套全新的 Catalog 接口(new Catalog API)来取代现有的 ExternalCatalog。新的 Catalog 实现的功能包括:
下图展示了新的 Catalog API 的总体架构:
创建 TableEnvironment 的时候会同时创建一个 CatalogManager,负责管理不同的 Catalog 实例。TableEnvironment 通过 Catalog 来为 Table API 和 SQL Client 用户提供元数据服务。
val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build() val tableEnv = TableEnvironment.create(settings) val name = "myhive" val defaultDatabase = "mydatabase" val hiveConfDir = "/opt/hive-conf"// a local path val version = "2.3.4" val hive = newHiveCatalog(name, defaultDatabase, hiveConfDir, version) tableEnv.registerCatalog("myhive", hive) // set the HiveCatalog as the current catalog of the session tableEnv.useCatalog("myhive")
目前 Catalog 有两个实现,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元数据管理机制,将所有元数据保存在内存中。而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。
另一方面,HiveCatalog 也可以用来处理 Flink 自身的元数据,在这种场景下,HiveCatalog 仅将 Hive Metastore 作为持久化存储使用,写入 Hive Metastore 中的元数据并不一定是 Hive 所支持的格式。一个 HiveCatalog 实例可以同时支持这两种模式,用户无需为管理 Hive 和 Flink 的元数据创建不同的实例。
另外,通过设计 HiveShim 来支持不同版本的 Hive Metastore,具体支持的 Hive 版本列表,请参考官方文档。
表数据
Flink 提供了 Hive Data Connector 来读写 Hive 的表数据。Hive Data Connector 尽可能的复用了 Hive 本身的 Input/Output Format 和 SerDe 等类,这样做的好处一方面是减少了代码重复,更重要的是可以最大程度的保持与 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。
集成 Hive 功能
Flink 与 Hive 集成的功能在 1.9.0 版本中作为试用功能发布,存在不少使用的局限性,但是不久将发布的 Flink 1.10 稳定版本会更加完善集成 Hive 的功能并应用到企业场景中。
为了让读者提前体验 Flink 1.10 集成 Hive 的功能,笔者会基于 Cloudera CDH 编译 Flink 1.10.0 RC1 版本并进行较为完整的测试。
环境信息
CDH 版本:cdh5.16.2
Flink 版本:release-1.10.0-rc1
Flink 使用了 RC 版本,仅供测试,不建议用于生产环境。
目前 Cloudera Data Platform 正式集成了 Flink 作为其流计算产品,非常方便用户使用。
CDH 环境开启了 Sentry 和 Kerberos。
下载并编译 Flink
$ wget https://http://github.com/apache/flink/archive/release-1.10.0-rc1.tar.gz $ tar zxvf release-1.10.0-rc1.tar.gz $ cd flink-release-1.10.0-rc1/ $ mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
不出意外的话,编译到 flink-hadoop-fs 模块时,会报如下错误:
[ERROR] Failed to execute goal on project flink-hadoop-fs: Could not resolve dependencies for project org.apache.flink:flink-hadoop-fs:jar:1.10.0: Failed to collect dependencies at org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Failed to read artifact descriptor for org.apache.flink:flink-shaded-hadoop-2:jar:2.6.0-cdh5.16.2-9.0: Could not transfer artifact org.apache.flink:flink-shaded-hadoop-2:pom:2.6.0-cdh5.16.2-9.0 from/to HDPReleases (https://repo.hortonworks.com/content/repositories/releases/): Remote host closed connection during handshake: SSL peer shut down incorrectly
编译中遇到 flink-shaded-hadoop-2 找不到的问题,其实查看 Maven 仓库会发现,根本原因是 CDH 的 flink-shaded-hadoop-2 的 jar 包在 Maven 中央仓库是没有对应的编译版本,所以需要先对 Flink 依赖的 flink-shaded-hadoop-2 进行打包,再进行编译。■ 解决 flink-shaded-hadoop-2 问题
git clone https://github.com/apache/flink-shaded.git
根据上面报错时提示缺少的版本切换对应的代码分支,即缺少的是 9.0 版本的 flink-shaded-hadoop-2:
git checkout release-9.0
修改 flink-shaded 项目中的 pom.xml,添加 CDH maven 仓库,否则编译时找不到 CDH 相关的包。
在 ... 中添加如下内容:
开始执行编译:
mvn clean install -DskipTests-Drat.skip=true-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
建议通过科学上网方式编译,如果读者遇到一些网络连接的问题,可以试着重试或者更换依赖组件的仓库地址。
编译成功后,就会把 flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar 安装在本地 maven 仓库,如下为编译的最后日志:
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
Installing /Users/.../source/flink-shaded/flink-shaded-hadoop-2-uber/target/dependency-reduced-pom.xml to /Users/.../.m2/repository/org/apache/flink/flink-shaded-hadoop-2-uber/2.6.0-cdh5.16.2-9.0/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.pom
重新编译 Flink
mvn clean install -DskipTests-Pvendor-repos -Dhadoop.version=2.6.0-cdh5.16.2
漫长的等待过程,读者可以并行做其他事情。
编译过程中,如果不出意外的话,会看到类似下面的错误信息:
[INFO] Running 'npm ci --cache-max=0 --no-save' in /Users/xxx/Downloads/Flink/flink-release-1.10.0-rc1/flink-release-1.10.0-rc1/flink-runtime-web/web-dashboard [WARNING] npm WARN prepare removing existing node_modules/ before installation [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/mime/-/mime-2.4.0.tgz failed, reason: read ECONNRESET [ERROR] WARN registry Using stale package data from https://registry.npmjs.org/ due to a request error during revalidation. [ERROR] WARN registry Unexpected warning for https://registry.npmjs.org/: Miscellaneous Warning ECONNRESET: request to https://registry.npmjs.org/optimist/-/optimist-0.6.1.tgz failed, reason: read ECONNRESET
可以看到, flink-runtime-web 模块引入了对 frontend-maven-plugin 的依赖,需要安装 node、npm 和依赖组件。
如果没有通过科学上网,可以修改 flink-runtime-web/pom.xml 文件,添加 nodeDownloadRoot 和 npmDownloadRoot 的信息:
编译成功后,Flink 安装文件位于 flink-release-1.10.0-rc1/flink-dist/target/flink-1.10.0-bin 目录下,打包并上传到部署到节点:
$ cd flink-dist/target/flink-1.10.0-bin $ tar zcvf flink-1.10.0.tar.gz flink-1.10.0
部署和配置
Flink 部署比较简单,解压缩包即可。另外可以设置软链接、环境变量等,笔者不再介绍。
Flink 的核心配置文件是 flink-conf.yaml,一个典型的配置如下:
jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 2048m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 4 parallelism.default: 1 high-availability: zookeeper high-availability.storageDir:hdfs:///user/flink110/recovery high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181 state.backend: filesystem state.checkpoints.dir: hdfs:///user/flink110/checkpoints state.savepoints.dir:hdfs:///user/flink110/savepoints jobmanager.execution.failover-strategy: region rest.port: 8081 taskmanager.memory.preallocate: false classloader.resolve-order: parent-first security.kerberos.login.use-ticket-cache: true security.kerberos.login.keytab:/home/flink_user/flink_user.keytab security.kerberos.login.principal: flink_user jobmanager.archive.fs.dir:hdfs:///user/flink110/completed-jobs historyserver.web.address: 0.0.0.0 historyserver.web.port: 8082 historyserver.archive.fs.dir:hdfs:///user/flink110/completed-jobs historyserver.archive.fs.refresh-interval: 10000
笔者只罗列了一些常见的配置参数,读者根据实际情况修改。配置参数其实还是比较容易理解的,以后结合实战的文章再进行详细讲解。■ 集成 Hive 配置的依赖
如果要使用 Flink 与 Hive 集成的功能,除了上面的配置外,用户还需要添加相应的依赖:
笔者主要介绍使用 SQL Client 的方式,由于使用的 CDH 版本为 5.16.2,其中 Hadoop 版本为 2.6.0,Hive 版本为 1.1.0,所以需要将如下 jar 包拷贝到 flink 部署家目录中的 lib 目录下:
flink-connector-hive2.11-1.10.0.jar
flink-hadoop-compatibility2.11-1.10.0.jar
flink-orc_2.11-1.10.0.jar
flink-release-1.10.0-rc1/flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.0.jar flink-release-1.10.0-rc1/flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.0.jar flink-release-1.10.0-rc1/flink-formats/flink-orc/target/flink-orc_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
flink-shaded/flink-shaded-hadoop-2-uber/target/flink-shaded-hadoop-2-uber-2.6.0-cdh5.16.2-9.0.jar
hive-exec-1.1.0-cdh5.16.2.jar
hive-metastore-1.1.0-cdh5.16.2.jar
libfb303-0.9.3.jar
/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.16.2.jar /opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.16.2.jar /opt/cloudera/parcels/CDH/lib/hive/lib/libfb303- 0.9.3.jar
其中 flink-shaded-hadoop-2-uber 包含了 Hive 对于 Hadoop 的依赖。如果不用 Flink 提供的包,用户也可以将集群中使用的 Hadoop 包添加进来,不过需要保证添加的 Hadoop 版本与 Hive 所依赖的版本是兼容的。
依赖的 Hive 包(即 hive-exec 和 hive-metastore)也可以使用用户集群中 Hive 所提供的 jar 包,详情请见支持不同的 Hive 版本。
Flink 部署的节点要添加 Hadoop、Yarn 以及 Hive 的客户端。■ 配置 HiveCatalog
多年来,Hive Metastore 在 Hadoop 生态系统中已发展成为事实上的元数据中心。许多公司在其生产中有一个单独的 Hive Metastore 服务实例,以管理其所有元数据(Hive 元数据或非 Hive 元数据)。
如果同时部署了 Hive 和 Flink,那么通过 HiveCatalog 能够使用 Hive Metastore 来管理 Flink 的元数据。
如果仅部署 Flink,HiveCatalog 就是 Flink 开箱即用提供的唯一持久化的 Catalog。如果没有持久化的 Catalog,那么使用 Flink SQL CREATE DDL 时必须在每个会话中重复创建像 Kafka 表这样的元对象,这会浪费大量时间。HiveCatalog 通过授权用户只需要创建一次表和其他元对象,并在以后的跨会话中非常方便地进行引用和管理。
如果要使用 SQL Client 时,用户需要在 sql-client-defaults.yaml 中指定自己所需的 Catalog,在 sql-client-defaults.yaml 的 catalogs 列表中可以指定一个或多个 Catalog 实例。
以下的示例展示了如何指定一个 HiveCatalog:
execution: planner: blink type: streaming ... current-catalog: myhive # set the HiveCatalog as the current catalog of the session current-database: mydatabase catalogs: - name: myhive type: hive hive-conf-dir: /opt/hive-conf # contains hive-site.xml hive-version:2.3.4
其中:
指定了 HiveCatalog 以后,用户就可以启动 sql-client,并通过以下命令验证 HiveCatalog 已经正确加载。
Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive;
其中 show catalogs 会列出加载的所有 Catalog 实例。需要注意的是,除了用户在 sql-client-defaults.yaml 文件中配置的 Catalog 以外,FlinkSQL 还会自动加载一个 GenericInMemoryCatalog 实例作为内置的 Catalog,该内置 Catalog 默认名字为 default_catalog。
读写 Hive 表
设置好 HiveCatalog 以后就可以通过 SQL Client 或者 Table API 来读写 Hive 中的表了。
假设 Hive 中已经有一张名为 mytable 的表,我们可以用以下的 SQL 语句来读写这张表。■ 读数据
Flink SQL> show catalogs; myhive default_catalog Flink SQL> use catalog myhive; Flink SQL> show databases; default Flink SQL> show tables; mytable Flink SQL> describe mytable; root |-- name: name |-- type: STRING |-- name: value |-- type: DOUBLE Flink SQL> SELECT * FROM mytable; name value __________ __________ Tom 4.72 John 8.0 Tom 24.2 Bob. 3.14 Bob 4.72 Tom 34.9 Mary 4.79 Tiff 2.72 Bill 4.33 Mary 77.7■ 写数据
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25; Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25; # 静态分区 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; # 动态分区 Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08'; # 静态分区和动态分区 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
总结
在本文中,笔者首先介绍了 Flink 与 Hive 集成功能的架构设计,然后从源码开始编译,解决遇到的一些问题,接着部署和配置 Flink 环境以及集成 Hive 的具体操作过程,最后参考官方的案例,对 Hive 表进行读写操作。
后续,笔者会结合生产环境的实际使用情况,讲解通过 Flink SQL 来操作 Hive。参考:
SQL 分布式计算 Java Hadoop API Apache Maven HIVE 流计算 Spark