热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

大数据培训:在flink中使用hiveudf的原因分析

1.序篇废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:背景及应用场景介绍&

1.序篇

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,其实很多场景下实时数仓的建设都是随着离线数仓而建设的(相同的逻辑在实时数仓中重新实现一遍),因此能够在 flink sql 中复用 hive udf 是能够大大提高人效的。
  2. flink 扩展支持 hive 内置 udf:flink sql 提供了扩展 udf 的能力,即 module,并且 flink sql 也内置了 HiveModule(需要你主动加载进环境),来支持一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。
  3. flink 扩展支持用户自定义的 hive udf:主要介绍 flink sql 流任务中,不能使用 create temporary function 去引入一个用户自定义的 hive udf。因此博主只能通过 flink sql 提供的 module 插件能力,自定义了 module,来支持引入用户自定义的 hive udf。

2.背景及应用场景介绍

其实大多数公司都是从离线数仓开始建设的。相信大家必然在自己的生产环境中开发了非常多的 hive udf。随着需求对于时效性要求的增高,大数据培训越来越多的公司也开始建设起实时数仓。很多场景下实时数仓的建设都是随着离线数仓而建设的。实时数据使用 flink 产出,离线数据使用 hive\spark 产出。

那么回到我们文章标题的问题:为什么需要 flink 支持 hive udf 呢?

博主分析了下,结论如下:

站在数据需求的角度来说,一般会有以下两种情况:

  1. 以前已经有了离线数据链路,需求方也想要实时数据。如果直接能用已经开发好的 hive udf,则不用将相同的逻辑迁移到 flink udf 中,并且后续无需费时费力维护两个 udf 的逻辑一致性。
  2. 实时和离线的需求都是新的,需要新开发。如果只开发一套 udf,则事半功倍。

因此在 flink 中支持 hive udf 这件事对开发人员提效来说是非常有好处的。

3.在扩展前,你需要知道一些基本概念

flink 支持 hive udf 这件事分为两个部分。

  1. flink 扩展支持 hive 内置 udf
  2. flink 扩展支持用户自定义 hive udf

第一部分:flink 扩展支持 hive 内置 udf,比如 get_json_object,rlike 等等。

有同学问了,这么基本的 udf,flink 都没有吗?

确实没有。关于 flink sql 内置的 udf 见如下链接,大家可以看看 flink 支持了哪些

udf:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如果我如果强行使用 get_json_object 这个 udf,会发生啥呢?结果如下图。

直接报错找不到 udf。

 

第二部分:flink 扩展支持用户自定义 hive udf。

内置函数解决不了用户的复杂需求,用户就需要自己写 hive udf,并且这部分自定义 udf 也想在 flink sql 中使用。

下面看看怎么在 flink sql 中进行这两种扩展。

4.hive udf 扩展支持


4.1.flink sql module

涉及到扩展 udf 就不得不提到 flink 提供的 module。见官网下图。

 

从第一句话就可以看到,module 的作用就是让用户去扩展 udf 的。

flink 本身已经内置了一个 module,名字叫 CoreModule,其中已经包含了一些 udf。

那我们要怎么使用 module 这玩意去扩展我们的 hive udf 呢?

4.2.flink 扩展支持 hive 内置 udf

步骤如下:

  1. 引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。在 HiveModule 中包含了 hive 内置的 udf。

org.apache.flink

flink-connector-hive_${scala.binary.version}

${flink.version}

  1. 在 StreamTableEnvironment 中加载 HiveModule。

String name = "default";

String version = "3.1.2";

tEnv.loadModule(name, new HiveModule(version));

然后在控制台打印一下目前有的 module。

String[] modules = tEnv.listModules();

Arrays.stream(modules).forEach(System.out::println);

然后可以看到除了 core module,还有我们刚刚加载进去的 default module。

default

core

  1. 查看所有 module 的所有 udf。在控制台打印一下。

String[] functions = tEnv.listFunctions();

Arrays.stream(functions).forEach(System.out::println);

就会将 default 和 core module 中的所有包含的 udf 给列举出来,当然也就包含了 hive module 中的 get_json_object。

 

然后我们再去在 flink sql 中使用 get_json_object 这个 udf,就没有报错,能正常输出结果了。

使用 flink hive connector 自带的 HiveModule,已经能够解决很大一部分常见 udf 使用的问题了。

4.2.flink 扩展支持用户自定义 hive udf

原本博主是直接想要使用 flink sql 中的 create temporary function 去执行引入自定义 hive udf 的。

举例如下:

CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF';

发现在执行这句 sql 时,是可以执行成功,将 udf 注册进去的。

但是在后续 udf 初始化时就报错了。具体错误如下图。直接报错 ClassCastException。

 

看了下源码,flink 流环境下(未连接 hive catalog 时)在创建 udf 时会认为这个 udf 是 flink 生态体系中的 udf。

所以在初始化我们引入的 TestGenericUDF 时,默认会按照 flink 的 UserDefinedFunction 强转,因此才会报强转错误。

那么我们就不能使用 hive udf 了吗?

错误,小伙伴萌岂敢有这种想法。博主都把这个标题列出来了(牛逼都吹出去了),还能给不出解决方案嘛。

4.3.flink 扩展支持用户自定义 hive udf 的增强 module

其实思路很简单。

使用 flink sql 中的 create temporary function 虽然不能执行,但是 flink 提供了插件化的自定义 module。

我们可以扩展一个支持用户自定义 hive udf 的 module,使用这个 module 来支持自定义的 hive udf。

实现的代码也非常简单。简单的把 flink hive connector 提供的 HiveModule 做一个增强即可,即下图中的 HiveModuleV2。

使用方式如下图所示:

 

然后程序就正常跑起来了。

5.总结与展望

本文主要介绍了如果在 flink sql 使用 hive 内置 udf 及用户自定义 hive udf,总结如下:

  1. 背景及应用场景介绍:博主期望你能了解到,其实很多场景下实时数仓的建设都是随着离线数仓而建设的(相同的逻辑在实时数仓中重新实现一遍),因此能够在 flink sql 中复用 hive udf 是能够大大提高人效的。
  2. flink 扩展支持 hive 内置 udf:flink sql 提供了扩展 udf 的能力,即 module,并且 flink sql 也内置了 HiveModule(需要你主动加载进环境),来支持一些 hive 内置的 udf (比如 get_json_object)给小伙伴们使用。
  3. flink 扩展支持用户自定义的 hive udf:主要介绍 flink sql 流任务中,不能使用 create temporary function 去引入一个用户自定义的 hive udf。因此博主只能通过 flink sql 提供的 module 插件能力,自定义了 module,来支持引入用户自定义的 hive udf。

文章来源于大数据羊说


推荐阅读
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • 本文详细介绍了 Pentaho Kettle 中 RowMetaInterface.writeMeta 方法的使用,并提供了多个代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • Spark与HBase结合处理大规模流量数据结构设计
    本文将详细介绍如何利用Spark和HBase进行大规模流量数据的分析与处理,包括数据结构的设计和优化方法。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 属性类 `Properties` 是 `Hashtable` 类的子类,用于存储键值对形式的数据。该类在 Java 中广泛应用于配置文件的读取与写入,支持字符串类型的键和值。通过 `Properties` 类,开发者可以方便地进行配置信息的管理,确保应用程序的灵活性和可维护性。此外,`Properties` 类还提供了加载和保存属性文件的方法,使其在实际开发中具有较高的实用价值。 ... [详细]
  • Spring框架中枚举参数的正确使用方法与技巧
    本文详细阐述了在Spring Boot框架中正确使用枚举参数的方法与技巧,旨在帮助开发者更高效地掌握和应用枚举类型的数据传递,适合对Spring Boot感兴趣的读者深入学习。 ... [详细]
  • 在ElasticStack日志监控系统中,Logstash编码插件自5.0版本起进行了重大改进。插件被独立拆分为gem包,每个插件可以单独进行更新和维护,无需依赖Logstash的整体升级。这不仅提高了系统的灵活性和可维护性,还简化了插件的管理和部署过程。本文将详细介绍这些编码插件的功能、配置方法,并通过实际生产环境中的应用案例,展示其在日志处理和监控中的高效性和可靠性。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 使用Maven JAR插件将单个或多个文件及其依赖项合并为一个可引用的JAR包
    本文介绍了如何利用Maven中的maven-assembly-plugin插件将单个或多个Java文件及其依赖项打包成一个可引用的JAR文件。首先,需要创建一个新的Maven项目,并将待打包的Java文件复制到该项目中。通过配置maven-assembly-plugin,可以实现将所有文件及其依赖项合并为一个独立的JAR包,方便在其他项目中引用和使用。此外,该方法还支持自定义装配描述符,以满足不同场景下的需求。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
author-avatar
余陈辉syllabear
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有