热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink中Lookup维表怎么使用

本篇内容主要讲解“flink中Lookup维表怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink

本篇内容主要讲解“flink中Look up维表怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink中Look up维表怎么使用”吧!

背景

在流式计算中,维表是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,比如我们的source stream是来自日志的订单数据,但是日志中我们只是记录了订单商品的id,并没有其他的信息,但是我们把数据存入数仓进行数据分析的时候,却需要商品名称、价格等等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。

维表一般存储在外部存储中,比如mysql、hbase、redis等等,今天我们以mysql为例,讲讲flink中维表的使用。

LookupableTableSource

在flink中提供了一个LookupableTableSource,可以用于实现维表,也就是我们可以通过某几个key列去查询外部存储来获取相关的信息用于补全stream的数据。

public interface LookupableTableSource extends TableSource {

TableFunction getLookupFunction(String[] lookupKeys);

AsyncTableFunction getAsyncLookupFunction(String[] lookupKeys);

boolean isAsyncEnabled();
}

我们看到,LookupableTableSource有三个方法

  • getLookupFunction:用于同步查询维表的数据,返回一个TableFunction,所以本质上还是通过用户自定义 UDTF来实现的。

  • getAsyncLookupFunction:用于异步查询维表的数据,该方法返回一个对象

  • isAsyncEnabled:默认情况下是同步查询,如果要开启异步查询,这个方法需要返回true

在flink里,我们看到实现了这个接口的主要有四个类,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我们主要以jdbc为例讲讲如何进行维表查询。

实例讲解

接下来我们讲一个小例子,首先定义一下stream source,我们使用flink 1.11提供的datagen来生成数据。

我们来模拟生成用户的数据,这里只生成的用户的id,范围在1-100之间。

CREATE TABLE datagen (
userid int,
proctime as PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100'
)

datagen具体的使用方法可以参考:

聊聊flink 1.11 中的随机数据生成器-DataGen connector

然后再创建一个mysql维表信息:

CREATE TABLE dim_mysql (
 id int,
 name STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://localhost:3306/test',
  'table-name' = 'userinfo',
  'username' = 'root',
  'password' = 'root'
)

我们这个mysql表中样例数据如下:

flink中Look up维表怎么使用

最后执行sql查询,流表关联维表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

结果示例如下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

我们看到对于维表中存在的数据,已经关联出来了,对于维表中没有的数据,显示为null

源码解析

JdbcTableSource

以jdbc为例,我们来看看flink底层是怎么做的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持异步的查询,所以进入JdbcTableSource#getLookupFunction方法。

	@Override
public TableFunction getLookupFunction(String[] lookupKeys) {
final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
return JdbcLookupFunction.builder()
.setOptions(options)
.setLookupOptions(lookupOptions)
.setFieldTypes(rowTypeInfo.getFieldTypes())
.setFieldNames(rowTypeInfo.getFieldNames())
.setKeyNames(lookupKeys)
.build();
}

最终是构造了一个JdbcLookupFunction对象,

  • options是连接jdbc的一些参数,比如user、pass、url等。

  • lookupOptions是一些有关维表的参数,主要是缓存的大小、超时时间等。

  • lookupKeys也就是要去关联查询维表的字段。

JdbcLookupFunction

所以我们来看看JdbcLookupFunction类,这个JdbcLookupFunction是一个TableFunction的子类,具体的TableFunction的使用可以参考这个文章:

Flink实战教程-自定义函数之TableFunction

一个TableFunction最核心的就是eval方法,在这个方法里,做的主要的工作就是通过传进来的多个keys拼接成sql去来查询数据,首先查询的是缓存,缓存有数据就直接返回,缓存没有的话再去查询数据库,然后再将查询的结果返回并放入缓存,下次查询的时候直接查询缓存。

为什么要加一个缓存呢?默认情况下是不开启缓存的,每来一个查询,都会给维表发送一个请求去查询,如果数据量比较大的话,势必会给存储维表的系统造成一定的压力,所以flink提供了一个LRU缓存,查询维表的时候,先查询缓存,缓存没有再去查询外部系统,但是如果有一个数据查询频率比较高,一直被命中,就无法获取新数据了。所以缓存还要加一个超时时间,过了这个时间,把这个数据强制删除,去外部系统查询新的数据。

具体的怎么开启缓存呢?我们看下JdbcLookupFunction#open方法

	@Override
public void open(FunctionContext context) throws Exception {
try {
establishConnectionAndStatement();
this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}

也就是说cacheMaxSize和cacheExpireMs需要同时设置,就会构造一个缓存对象cache来缓存数据.这两个参数对应的DDL的属性就是lookup.cache.max-rows和lookup.cache.ttl

对于具体的缓存的大小和超时时间的设置,用户需要根据自身的情况来自己定义,在数据的准确性和系统的吞吐量之间做一个权衡。

到此,相信大家对“flink中Look up维表怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是编程笔记网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


推荐阅读
  • 本文详细介绍了使用 Python 进行 MySQL 和 Redis 数据库操作的实战技巧。首先,针对 MySQL 数据库,通过 `pymysql` 模块展示了如何连接和操作数据库,包括建立连接、执行查询和更新等常见操作。接着,文章深入探讨了 Redis 的基本命令和高级功能,如键值存储、列表操作和事务处理。此外,还提供了多个实际案例,帮助读者更好地理解和应用这些技术。 ... [详细]
  • SQLite数据库CRUD操作实例分析与应用
    本文通过分析和实例演示了SQLite数据库中的CRUD(创建、读取、更新和删除)操作,详细介绍了如何在Java环境中使用Person实体类进行数据库操作。文章首先阐述了SQLite数据库的基本概念及其在移动应用开发中的重要性,然后通过具体的代码示例,逐步展示了如何实现对Person实体类的增删改查功能。此外,还讨论了常见错误及其解决方法,为开发者提供了实用的参考和指导。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • PHP中元素的计量单位是什么? ... [详细]
  • 全面解析JavaScript代码注释技巧与标准规范
    在Web前端开发中,JavaScript代码的可读性和维护性至关重要。本文将详细介绍如何有效地使用注释来提高代码的可读性,并探讨JavaScript代码注释的最佳实践和标准规范。通过合理的注释,开发者可以更好地理解和维护复杂的代码逻辑,提升团队协作效率。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • ### 优化后的摘要本学习指南旨在帮助读者全面掌握 Bootstrap 前端框架的核心知识点与实战技巧。内容涵盖基础入门、核心功能和高级应用。第一章通过一个简单的“Hello World”示例,介绍 Bootstrap 的基本用法和快速上手方法。第二章深入探讨 Bootstrap 与 JSP 集成的细节,揭示两者结合的优势和应用场景。第三章则进一步讲解 Bootstrap 的高级特性,如响应式设计和组件定制,为开发者提供全方位的技术支持。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 开发日志:201521044091 《Java编程基础》第11周学习心得与总结
    开发日志:201521044091 《Java编程基础》第11周学习心得与总结 ... [详细]
  • 在深入掌握Spring框架的事务管理之前,了解其背后的数据库事务基础至关重要。Spring的事务管理功能虽然强大且灵活,但其核心依赖于数据库自身的事务处理机制。因此,熟悉数据库事务的基本概念和特性是必不可少的。这包括事务的ACID属性、隔离级别以及常见的事务管理策略等。通过这些基础知识的学习,可以更好地理解和应用Spring中的事务管理配置。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • PHP网站日志深度解析与数据洞察分析
    通过对PHP网站日志进行深入解析与数据洞察分析,可以有效提升网站性能和用户体验。由于网站日志数据量庞大,通常需要借助专业的日志分析工具来处理。常用的工具包括光年日志分析工具和WebLog Expert等,这些工具能够帮助技术人员快速识别并解决网站运行中的各种问题,从而优化SEO效果和提升整体运营效率。 ... [详细]
  • Java中处理NullPointerException:getStackTrace()方法详解与实例代码 ... [详细]
  • 字节跳动深圳研发中心安全业务团队正在火热招募人才! ... [详细]
  • 如何在 Node.js 环境中将 CSV 数据转换为标准的 JSON 文件格式? ... [详细]
author-avatar
--Zqf
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有