官网地址:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/
源码地址:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
1,第一步指定HiveCatalog
解释:
其中 name 是用户给每个 Catalog 实例指定的名字, Catalog 名字和 DB 名字构成了 FlinkSQL 中元数据的命名空间,因此需要保证每个 Catalog 的名字是唯一的。type 表示 Catalog 的类型,对于 HiveCatalog 而言,type 应该指定为 hive。hive-conf-dir 用于读取 Hive 的配置文件,用户可以将其设定为集群中 Hive 的配置文件目录。hive-version 用于指定所使用的 Hive 版本,可以设定为 2.3.4 或者 1.2.1。
2,通过 TableAPI 来创建 HiveCatalog,并注册到 TableEnvironment。
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive_conf_dir";
String version = "2.3.4";
TableEnvironment tableEnv = …; // create TableEnvironment
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);
3,注册为table,直接使用:
TableEnvironment tableEnv = …; // create TableEnvironment
tableEnv.registerCatalog("myhive", hiveCatalog);
// set myhive as current catalog
tableEnv.useCatalog("myhive");
Table src = tableEnv.sqlQuery("select * from src");
// write src into a sink or do further analysis
……
tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')");
tableEnv.execute("insert into src");
4,注意Hive版本问题:
Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前我们只针对这两个版本进行了测试。使用 SQL Client 时,如果用户没有在 sql-client-defaults.yaml 文件中指定 Hive 版本,我们会自动检测 classpath 中的 Hive 版本。如果检测到的 Hive 版本不是 2.3.4 或 1.2.1 就会报错。
借助 Hive 兼容性的保证,其它不同的小版本也比较可能是可以正常工作的。因此,如果用户使用的 Hive 小版本与我们所支持的不同,可以指定一个支持的版本来试用与 Hive 集成的功能。比如用户使用的 Hive 版本是 2.3.3,可以在 sql-client-defaults.yaml 文件或者代码中将 Hive 版本指定为 2.3.4。
5,相关链接:
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247484555&idx=1&sn=bd348f9543951eaa6ea852ea68ee9402&chksm=fd3b8ac9ca4c03df058fcf0ffb9a0977ef49d8192d9f4bc81ef664f775a992ad6d1e02b4dff0&token=1027247201&lang=zh_CN#rd