热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据培训:在flink中使用hiveudf的原因分析

1.序篇废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:背景及应用场景介绍&

1.序篇

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,其实很多场景下实时数仓的建设都是随着离线数仓而建设的(相同的逻辑在实时数仓中重新实现一遍),因此能够在 flink sql 中复用 hive udf 是能够大大提高人效的。
  2. flink 扩展支持 hive 内置 udf:flink sql 提供了扩展 udf 的能力,即 module,并且 flink sql 也内置了 HiveModule(需要你主动加载进环境),来支持一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。
  3. flink 扩展支持用户自定义的 hive udf:主要介绍 flink sql 流任务中,不能使用 create temporary function 去引入一个用户自定义的 hive udf。因此博主只能通过 flink sql 提供的 module 插件能力,自定义了 module,来支持引入用户自定义的 hive udf。

2.背景及应用场景介绍

其实大多数公司都是从离线数仓开始建设的。相信大家必然在自己的生产环境中开发了非常多的 hive udf。随着需求对于时效性要求的增高,大数据培训越来越多的公司也开始建设起实时数仓。很多场景下实时数仓的建设都是随着离线数仓而建设的。实时数据使用 flink 产出,离线数据使用 hive\spark 产出。

那么回到我们文章标题的问题:为什么需要 flink 支持 hive udf 呢?

博主分析了下,结论如下:

站在数据需求的角度来说,一般会有以下两种情况:

  1. 以前已经有了离线数据链路,需求方也想要实时数据。如果直接能用已经开发好的 hive udf,则不用将相同的逻辑迁移到 flink udf 中,并且后续无需费时费力维护两个 udf 的逻辑一致性。
  2. 实时和离线的需求都是新的,需要新开发。如果只开发一套 udf,则事半功倍。

因此在 flink 中支持 hive udf 这件事对开发人员提效来说是非常有好处的。

3.在扩展前,你需要知道一些基本概念

flink 支持 hive udf 这件事分为两个部分。

  1. flink 扩展支持 hive 内置 udf
  2. flink 扩展支持用户自定义 hive udf

第一部分:flink 扩展支持 hive 内置 udf,比如 get_json_object,rlike 等等。

有同学问了,这么基本的 udf,flink 都没有吗?

确实没有。关于 flink sql 内置的 udf 见如下链接,大家可以看看 flink 支持了哪些

udf:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如果我如果强行使用 get_json_object 这个 udf,会发生啥呢?结果如下图。

直接报错找不到 udf。

 

第二部分:flink 扩展支持用户自定义 hive udf。

内置函数解决不了用户的复杂需求,用户就需要自己写 hive udf,并且这部分自定义 udf 也想在 flink sql 中使用。

下面看看怎么在 flink sql 中进行这两种扩展。

4.hive udf 扩展支持


4.1.flink sql module

涉及到扩展 udf 就不得不提到 flink 提供的 module。见官网下图。

 

从第一句话就可以看到,module 的作用就是让用户去扩展 udf 的。

flink 本身已经内置了一个 module,名字叫 CoreModule,其中已经包含了一些 udf。

那我们要怎么使用 module 这玩意去扩展我们的 hive udf 呢?

4.2.flink 扩展支持 hive 内置 udf

步骤如下:

  1. 引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。在 HiveModule 中包含了 hive 内置的 udf。

org.apache.flink

flink-connector-hive_${scala.binary.version}

${flink.version}

  1. 在 StreamTableEnvironment 中加载 HiveModule。

String name = "default";

String version = "3.1.2";

tEnv.loadModule(name, new HiveModule(version));

然后在控制台打印一下目前有的 module。

String[] modules = tEnv.listModules();

Arrays.stream(modules).forEach(System.out::println);

然后可以看到除了 core module,还有我们刚刚加载进去的 default module。

default

core

  1. 查看所有 module 的所有 udf。在控制台打印一下。

String[] functions = tEnv.listFunctions();

Arrays.stream(functions).forEach(System.out::println);

就会将 default 和 core module 中的所有包含的 udf 给列举出来,当然也就包含了 hive module 中的 get_json_object。

 

然后我们再去在 flink sql 中使用 get_json_object 这个 udf,就没有报错,能正常输出结果了。

使用 flink hive connector 自带的 HiveModule,已经能够解决很大一部分常见 udf 使用的问题了。

4.2.flink 扩展支持用户自定义 hive udf

原本博主是直接想要使用 flink sql 中的 create temporary function 去执行引入自定义 hive udf 的。

举例如下:

CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF';

发现在执行这句 sql 时,是可以执行成功,将 udf 注册进去的。

但是在后续 udf 初始化时就报错了。具体错误如下图。直接报错 ClassCastException。

 

看了下源码,flink 流环境下(未连接 hive catalog 时)在创建 udf 时会认为这个 udf 是 flink 生态体系中的 udf。

所以在初始化我们引入的 TestGenericUDF 时,默认会按照 flink 的 UserDefinedFunction 强转,因此才会报强转错误。

那么我们就不能使用 hive udf 了吗?

错误,小伙伴萌岂敢有这种想法。博主都把这个标题列出来了(牛逼都吹出去了),还能给不出解决方案嘛。

4.3.flink 扩展支持用户自定义 hive udf 的增强 module

其实思路很简单。

使用 flink sql 中的 create temporary function 虽然不能执行,但是 flink 提供了插件化的自定义 module。

我们可以扩展一个支持用户自定义 hive udf 的 module,使用这个 module 来支持自定义的 hive udf。

实现的代码也非常简单。简单的把 flink hive connector 提供的 HiveModule 做一个增强即可,即下图中的 HiveModuleV2。

使用方式如下图所示:

 

然后程序就正常跑起来了。

5.总结与展望

本文主要介绍了如果在 flink sql 使用 hive 内置 udf 及用户自定义 hive udf,总结如下:

  1. 背景及应用场景介绍:博主期望你能了解到,其实很多场景下实时数仓的建设都是随着离线数仓而建设的(相同的逻辑在实时数仓中重新实现一遍),因此能够在 flink sql 中复用 hive udf 是能够大大提高人效的。
  2. flink 扩展支持 hive 内置 udf:flink sql 提供了扩展 udf 的能力,即 module,并且 flink sql 也内置了 HiveModule(需要你主动加载进环境),来支持一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。
  3. flink 扩展支持用户自定义的 hive udf:主要介绍 flink sql 流任务中,不能使用 create temporary function 去引入一个用户自定义的 hive udf。因此博主只能通过 flink sql 提供的 module 插件能力,自定义了 module,来支持引入用户自定义的 hive udf。

文章来源于大数据羊说


推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 热血合击脚本辅助工具及随机数生成器源码分享
    本文分享了一个热血合击脚本辅助工具及随机数生成器源码。游戏脚本能够实现类似真实玩家的操作,但信息量有限且操作不可控。热血合击脚本辅助工具可以帮助玩家自动刷图、换图拉怪等操作,并提供了雷电云手机的扩展服务。此外,还介绍了使用mt_rand函数作为随机数生成器的代码示例。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • python限制递归次数(python最大公约数递归)
    本文目录一览:1、python为什么要进行递归限制 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • 单点登录原理及实现方案详解
    本文详细介绍了单点登录的原理及实现方案,其中包括共享Session的方式,以及基于Redis的Session共享方案。同时,还分享了作者在应用环境中所遇到的问题和经验,希望对读者有所帮助。 ... [详细]
  • 本文讨论了Kotlin中扩展函数的一些惯用用法以及其合理性。作者认为在某些情况下,定义扩展函数没有意义,但官方的编码约定支持这种方式。文章还介绍了在类之外定义扩展函数的具体用法,并讨论了避免使用扩展函数的边缘情况。作者提出了对于扩展函数的合理性的质疑,并给出了自己的反驳。最后,文章强调了在编写Kotlin代码时可以自由地使用扩展函数的重要性。 ... [详细]
  • 使用正则表达式爬取36Kr网站首页新闻的操作步骤和代码示例
    本文介绍了使用正则表达式来爬取36Kr网站首页所有新闻的操作步骤和代码示例。通过访问网站、查找关键词、编写代码等步骤,可以获取到网站首页的新闻数据。代码示例使用Python编写,并使用正则表达式来提取所需的数据。详细的操作步骤和代码示例可以参考本文内容。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
author-avatar
余陈辉syllabear
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有