热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkDataStreamJoin小规模维度数据的简便方法

在编写基于Flink的ETL程序时,我们经常需要用维度数据丰富我们接入的流式数据,如通过商品ID获得商品名称、通过商品分类ID获得分类名称等等。而维度表

在编写基于Flink的ETL程序时,我们经常需要用维度数据丰富我们接入的流式数据,如通过商品ID获得商品名称、通过商品分类ID获得分类名称等等。而维度表基本都位于外部存储,换句话说,就是要解决一个无界的流式表与一个有界的码表或半静态表做join操作的问题。

一般情况下的首选方案是Flink内置的异步I/O机制,必要时还得配合使用高效的缓存(如Guava提供的LoadingCache)减少对外部数据源的请求压力。由于今天时间紧张,所以不深入谈它的原理和用法了,之后会再提。看官如果想了解的话,可以先参考官方文档和FLIP-12给出的设计细节。

但是,异步I/O对于那种变化缓慢并且规模不大的维度数据,就显得有些杀鸡用牛刀了。我们完全可以自己做个轻量级的实现。下面举出一个示例,它从订单日志中取出站点ID、城市ID,然后从存储在MySQL的维度表中获取站点名和城市名,并写回订单日志。

public static final class MapWithSiteInfoFuncextends RichMapFunction {private static final Logger LOGGER &#61; LoggerFactory.getLogger(MapWithSiteInfoFunc.class);private static final long serialVersionUID &#61; 1L;private transient ScheduledExecutorService dbScheduler;private Map siteInfoCache;&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);siteInfoCache &#61; new HashMap<>(1024);dbScheduler &#61; new ScheduledThreadPoolExecutor(1, r -> {Thread thread &#61; new Thread(r, "site-info-update-thread");thread.setUncaughtExceptionHandler((t, e) -> {LOGGER.error("Thread " t " got uncaught exception: " e);});return thread;});dbScheduler.scheduleWithFixedDelay(() -> {try {QueryRunner queryRunner &#61; new QueryRunner(JdbcUtil.getDataSource());List> info &#61; queryRunner.query(SITE_INFO_QUERY_SQL, new MapListHandler());for (Map item : info) {siteInfoCache.put((int) item.get("site_id"), new SiteAndCityInfo((int) item.get("site_id"),(String) item.getOrDefault("site_name", ""),(long) item.get("city_id"),(String) item.getOrDefault("city_name", "")));}LOGGER.info("Fetched {} site info records, {} records in cache", info.size(), siteInfoCache.size());} catch (Exception e) {LOGGER.error("Exception occurred when querying: " e);}}, 0, 10 * 60, TimeUnit.SECONDS);}&#64;Overridepublic String map(String value) throws Exception {JSONObject json &#61; JSON.parseObject(value);int siteId &#61; json.getInteger("site_id");String siteName &#61; "", cityName &#61; "";SiteAndCityInfo info &#61; siteInfoCache.getOrDefault(siteId, null);if (info !&#61; null) {siteName &#61; info.getSiteName();cityName &#61; info.getCityName();}json.put("site_name", siteName);json.put("city_name", cityName);return json.toJSONString();}&#64;Overridepublic void close() throws Exception {siteInfoCache.clear();ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, dbScheduler);JdbcUtil.close();super.close();}private static final String SITE_INFO_QUERY_SQL &#61; "...";}

这段代码的思路很直接&#xff1a;用一个RichMapFunction封装整个join过程&#xff0c;用一个单线程的调度线程池每隔10分钟请求MySQL&#xff0c;拉取想要的维度表数据存入HashMap&#xff0c;再根据日志中的ID查HashMap就完事了。为了安全&#xff0c;在RichMapFunction的close()方法里要记得关闭线程池和连接。

上述代码中的QueryRunner和MapListHandler来自Apache Commons框架里的JDBC工具DBUtils。JdbcUtil中则封装了MySQL连接的参数与DBCP2里的基本连接池BasicDataSource&#xff0c;很简单&#xff0c;看官可以自行实现。



声明&#xff1a;本号所有文章除特殊注明&#xff0c;都为原创&#xff0c;公众号读者拥有优先阅读权&#xff0c;未经作者本人允许不得转载&#xff0c;否则追究侵权责任。

关注我的公众号&#xff0c;后台回复【JAVAPDF】获取200页面试题&#xff01;5万人关注的大数据成神之路&#xff0c;不来了解一下吗&#xff1f;5万人关注的大数据成神之路&#xff0c;真的不来了解一下吗&#xff1f;5万人关注的大数据成神之路&#xff0c;确定真的不来了解一下吗&#xff1f;

欢迎您关注《大数据成神之路》

大数据技术与架构


推荐阅读
  • 普通树(每个节点可以有任意数量的子节点)级序遍历 ... [详细]
  • 本文将介绍如何在混合开发(Hybrid)应用中实现Native与HTML5的交互,包括基本概念、学习目标以及具体的实现步骤。 ... [详细]
  • 本文详细介绍了如何使用Python的多进程技术来高效地分块读取超大文件,并将其输出为多个文件。通过这种方式,可以显著提高读取速度和处理效率。 ... [详细]
  • MySQL初级篇——字符串、日期时间、流程控制函数的相关应用
    文章目录:1.字符串函数2.日期时间函数2.1获取日期时间2.2日期与时间戳的转换2.3获取年月日、时分秒、星期数、天数等函数2.4时间和秒钟的转换2. ... [详细]
  • 机器学习算法:SVM(支持向量机)
    SVM算法(SupportVectorMachine,支持向量机)的核心思想有2点:1、如果数据线性可分,那么基于最大间隔的方式来确定超平面,以确保全局最优, ... [详细]
  • 本文节选自《NLTK基础教程——用NLTK和Python库构建机器学习应用》一书的第1章第1.2节,作者Nitin Hardeniya。本文将带领读者快速了解Python的基础知识,为后续的机器学习应用打下坚实的基础。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 在分析Android的Audio系统时,我们对mpAudioPolicy->get_input进行了详细探讨,发现其背后涉及的机制相当复杂。本文将详细介绍这一过程及其背后的实现细节。 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 基于Linux开源VOIP系统LinPhone[四]
    ****************************************************************************************** ... [详细]
  • 本文探讨了如何通过编程手段在Linux系统中禁用硬件预取功能。基于Intel® Core™微架构的应用性能优化需求,文章详细介绍了相关配置方法和代码实现,旨在帮助开发人员有效控制硬件预取行为,提升应用程序的运行效率。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
author-avatar
ayuanliang
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有