热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinksqljoinhbasedemoflinklookupjoinmysqldemo

lookupjoinmysqldemo:  flinklookupjoinmysqldemo##joinrowkey--LookupSource--kafkasourceCREAT

lookup join mysql demo:  flink lookup join mysql demo

## join rowkey

-- Lookup Source
-- kafka source
CREATE TABLE user_log (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3)
,process_time as proctime()
, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

drop table if exists hbase_behavior_conf ;
CREATE TEMPORARY TABLE hbase_behavior_conf (
rowkey STRING
,cf ROW(item_id STRING
,category_id STRING
,behavior STRING
,ts TIMESTAMP(3))
) WITH (
'connector' = 'hbase-2.2'
,'zookeeper.quorum' = 'thinkpad:12181'
,'table-name' = 'user_log'
,'lookup.cache.max-rows' = '10000'
,'lookup.cache.ttl' = '10 second'
,'lookup.async' = 'true'
);

---sinkTable
CREATE TABLE kakfa_join_mysql_demo (
user_id STRING
,item_id STRING
,category_id STRING
,behavior STRING
,behavior_map STRING
,ts TIMESTAMP(3)
-- ,primary key (user_id) not enforced
) WITH (
'connector' = 'kafka'
,'topic' = 'user_behavior_1'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'group-offsets'
,'format' = 'json'
);

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.user_id = rowkey
where a.behavior is not null;

 

测试 hbase 维表Lookup 功能正常,可以正常缓存数据,缓存也会定时失效,透查Hbase

 

flink sql join hbase demo
    




flink lookup join mysql demo


* 注: 随便测试了一下性能,Hbase 维表有2 万多条数据,输入数据的关联字段都是Hbase 表主键,lookup.cache.ttl 为 1分钟,关联的 TPS 轻松到达: 2W

flink sql join hbase demo
    




flink lookup join mysql demo

## join 非 rowkey

 

join 非主键时,hbase 维表启动时一次性读取 hbase 表全部数据,缓存到内存中,hbase source 状态 finish

INSERT INTO kakfa_join_mysql_demo(user_id, item_id, category_id, behavior, behavior_map, ts)
SELECT a.user_id, a.item_id, a.category_id, a.behavior, concat('map_', c.cf.item_id), a.ts
FROM user_log a
left join hbase_behavior_conf c ON a.item_id = cf.item_id
where a.behavior is not null;

 

 

flink sql join hbase demo
    




flink lookup join mysql demo


* 注: 这种情况在 flink 1.13 版本,不能完成 checkpoint

2021-08-20 09:12:39,313 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, hbase_behavior_conf, project=[cf]]], fields=[cf]) -> Calc(select=[cf, cf.item_id AS $f2]) (1/1) (55be6048b81e2eef95f25d78b3705a34) switched from RUNNING to FINISHED.
2021-08-20 09:13:30,356 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Failed to trigger checkpoint for job 971bbb75c7334b0c2b9d218005efbf00 since some tasks of job 971bbb75c7334b0c2b9d218005efbf00 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.

 

Hbase 关联只有在关联键是Hbase 表的主键的时候,才能应用 Lookup 功能,非主键一次性加载,维表数据没办法更新,而且不能做 Checkpoint 影响 Flink job 的一致性。

在之前版本,SQL 功能不完善的时候,我们使用 UDF 的方式关联 Hbase,可以在 UDF 里面自己关联缓存、透查Hbase,也比较灵活。(没找到之前的代码,空了自己写一个,可以再水一篇博客)

 

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

flink sql join hbase demo
    




flink lookup join mysql demo

 


推荐阅读
  • MySQL:不仅仅是数据库那么简单
    MySQL不仅是一款高效、可靠的数据库管理系统,它还具备丰富的功能和扩展性,支持多种存储引擎,适用于各种应用场景。从简单的网站开发到复杂的企业级应用,MySQL都能提供强大的数据管理和优化能力,满足不同用户的需求。其开源特性也促进了社区的活跃发展,为技术进步提供了持续动力。 ... [详细]
  • 这里不需要UDF。Column已提供simpleString方法和simpleString实例:frompyspark.sql.typesimportDoubleTy ... [详细]
  • 多线程基础概览
    本文探讨了多线程的起源及其在现代编程中的重要性。线程的引入是为了增强进程的稳定性,确保一个进程的崩溃不会影响其他进程。而进程的存在则是为了保障操作系统的稳定运行,防止单一应用程序的错误导致整个系统的崩溃。线程作为进程的逻辑单元,多个线程共享同一CPU,需要合理调度以避免资源竞争。 ... [详细]
  • MySQL 5.7 学习指南:SQLyog 中的主键、列属性和数据类型
    本文介绍了 MySQL 5.7 中主键(Primary Key)和自增(Auto-Increment)的概念,以及如何在 SQLyog 中设置这些属性。同时,还探讨了数据类型的分类和选择,以及列属性的设置方法。 ... [详细]
  • 本文是Java并发编程系列的开篇之作,将详细解析Java 1.5及以上版本中提供的并发工具。文章假设读者已经具备同步和易失性关键字的基本知识,重点介绍信号量机制的内部工作原理及其在实际开发中的应用。 ... [详细]
  • 在使用 Cacti 进行监控时,发现已运行的转码机未产生流量,导致 Cacti 监控界面显示该转码机处于宕机状态。进一步检查 Cacti 日志,发现数据库中存在 SQL 查询失败的问题,错误代码为 145。此问题可能是由于数据库表损坏或索引失效所致,建议对相关表进行修复操作以恢复监控功能。 ... [详细]
  • 利用爬虫技术抓取数据,结合Fiddler与Postman在Chrome中的应用优化提交流程
    本文探讨了如何利用爬虫技术抓取目标网站的数据,并结合Fiddler和Postman工具在Chrome浏览器中的应用,优化数据提交流程。通过详细的抓包分析和模拟提交,有效提升了数据抓取的效率和准确性。此外,文章还介绍了如何使用这些工具进行调试和优化,为开发者提供了实用的操作指南。 ... [详细]
  • 本文介绍了UUID(通用唯一标识符)的概念及其在JavaScript中生成Java兼容UUID的代码实现与优化技巧。UUID是一个128位的唯一标识符,广泛应用于分布式系统中以确保唯一性。文章详细探讨了如何利用JavaScript生成符合Java标准的UUID,并提供了多种优化方法,以提高生成效率和兼容性。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 在Python多进程编程中,`multiprocessing`模块是不可或缺的工具。本文详细探讨了该模块在多进程管理中的核心原理,并通过实际代码示例进行了深入分析。文章不仅总结了常见的多进程编程技巧,还提供了解决常见问题的实用方法,帮助读者更好地理解和应用多进程编程技术。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • C#微信开发入门教程第二篇:新手快速上手指南,含详细视频讲解
    在距离上次课程一个多星期后,我们终于带来了第二讲的内容。虽然原计划是一周一次更新,但由于工作繁忙有所延迟。近期在交流群中发现,一些初学者已经能够熟练调用微信接口,但对微信公众平台的消息接收处理机制还不够了解。因此,本次课程将详细介绍如何高效处理微信公众平台的消息接收,并提供详细的视频讲解,帮助大家快速上手。 ... [详细]
  • NoSQL数据库,即非关系型数据库,有时也被称作Not Only SQL,是一种区别于传统关系型数据库的管理系统。这类数据库设计用于处理大规模、高并发的数据存储与查询需求,特别适用于需要快速读写大量非结构化或半结构化数据的应用场景。NoSQL数据库通过牺牲部分一致性来换取更高的可扩展性和性能,支持分布式部署,能够有效应对互联网时代的海量数据挑战。 ... [详细]
  • 适用于 SSR/WASM 的 ZXing Blazor 扫码组件,高效集成与优化
    本项目基于 ZXing 封装了适用于 SSR 和 WASM 的 Blazor 扫码组件,能够高效地集成到 Blazor 应用中,并支持通过手机或桌面电脑的摄像头进行扫码操作。该组件库不仅简化了开发流程,还提供了高性能的扫码体验。项目地址:[链接] ... [详细]
  • centos7.3配置python2、3环境与配置各自pip
    环境:CentOS-7-x86_64-Everything-1611No.1查看CentOS对Python的默认依赖[root@cs~]#lsusrbinpython* ... [详细]
author-avatar
mobiledu2502926703
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有