在编写基于Flink的ETL程序时,我们经常需要用维度数据丰富我们接入的流式数据,如通过商品ID获得商品名称、通过商品分类ID获得分类名称等等。而维度表基本都位于外部存储,换句话说,就是要解决一个无界的流式表与一个有界的码表或半静态表做join操作的问题。
一般情况下的首选方案是Flink内置的异步I/O机制,必要时还得配合使用高效的缓存(如Guava提供的LoadingCache)减少对外部数据源的请求压力。由于今天时间紧张,所以不深入谈它的原理和用法了,之后会再提。看官如果想了解的话,可以先参考官方文档和FLIP-12给出的设计细节。
但是,异步I/O对于那种变化缓慢并且规模不大的维度数据,就显得有些杀鸡用牛刀了。我们完全可以自己做个轻量级的实现。下面举出一个示例,它从订单日志中取出站点ID、城市ID,然后从存储在MySQL的维度表中获取站点名和城市名,并写回订单日志。
public static final class MapWithSiteInfoFuncextends RichMapFunction {private static final Logger LOGGER &#61; LoggerFactory.getLogger(MapWithSiteInfoFunc.class);private static final long serialVersionUID &#61; 1L;private transient ScheduledExecutorService dbScheduler;private Map siteInfoCache;&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);siteInfoCache &#61; new HashMap<>(1024);dbScheduler &#61; new ScheduledThreadPoolExecutor(1, r -> {Thread thread &#61; new Thread(r, "site-info-update-thread");thread.setUncaughtExceptionHandler((t, e) -> {LOGGER.error("Thread " t " got uncaught exception: " e);});return thread;});dbScheduler.scheduleWithFixedDelay(() -> {try {QueryRunner queryRunner &#61; new QueryRunner(JdbcUtil.getDataSource());List
这段代码的思路很直接&#xff1a;用一个RichMapFunction封装整个join过程&#xff0c;用一个单线程的调度线程池每隔10分钟请求MySQL&#xff0c;拉取想要的维度表数据存入HashMap&#xff0c;再根据日志中的ID查HashMap就完事了。为了安全&#xff0c;在RichMapFunction的close()方法里要记得关闭线程池和连接。
上述代码中的QueryRunner和MapListHandler来自Apache Commons框架里的JDBC工具DBUtils。JdbcUtil中则封装了MySQL连接的参数与DBCP2里的基本连接池BasicDataSource&#xff0c;很简单&#xff0c;看官可以自行实现。
声明&#xff1a;本号所有文章除特殊注明&#xff0c;都为原创&#xff0c;公众号读者拥有优先阅读权&#xff0c;未经作者本人允许不得转载&#xff0c;否则追究侵权责任。
关注我的公众号&#xff0c;后台回复【JAVAPDF】获取200页面试题&#xff01;5万人关注的大数据成神之路&#xff0c;不来了解一下吗&#xff1f;5万人关注的大数据成神之路&#xff0c;真的不来了解一下吗&#xff1f;5万人关注的大数据成神之路&#xff0c;确定真的不来了解一下吗&#xff1f;
欢迎您关注《大数据成神之路》