热门标签 | HotTags
当前位置:  开发笔记 > 数据库 > 正文

FlinkSQL实践—时态表/版本表

1.背景在FlinkSQL关联时,必然会涉及到维表,维表又可能是不断变化的(aka时态表或版本表)。版本表:如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,

1. 背景

在FlinkSQL关联时,必然会涉及到维表,维表又可能是不断变化的(aka 时态表 或 版本表)。

版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

2. FlinkSQL中时态表类型


2.1 Debezium

未实战测试


2.2 a compacted Kafka topic

将某一个主题设置为compacted topic

bin/kafka-topics.sh --alter --topic my_topic_name --zookeeper my_zookeeper:2181 --config cleanup.policy=compact

tableEnv.executeSql(
"CREATE TABLE dim_source (" +
" id STRING," +
" name STRING," +
" update_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, " +
" WATERMARK FOR update_time AS update_time, " +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'upsert-kafka'," +
" 'topic' = 'flinksqldim'," +
" 'properties.bootstrap.servers' = 'ip:port'," +
" 'properties.group.id' = 'flinksqlDim'," +
" 'key.format' = 'json'," +
" 'value.format' = 'json')"
);

对应的kafka topic生产者代码

// 创建消息
// DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.nnnnnnnnn");
for (int i = 1; i <5; i++) {
JSONObject json1 = new JSONObject();
json1.put("key", i+"");
//json.put("update_time", dtf.format(LocalDateTime.now()));
JSONObject json = new JSONObject();
json.put("id", i+"");
json.put("name", "name222"+i);
ProducerRecord record = new ProducerRecord(
"flinksqldim",
json1.toJSONString(),
json.toJSONString()
);
// 发送消息
Future future = producer.send(record);
}

3. 遇到的问题


3.1 kafka支持的METADATA

kafka支持的METADATA类型


3.2 key.format

必须指定key.format,可以不指定'scan.startup.mode' = 'earliest-offset'


4. 引用

Flink-1.12 - 之Flink SQL 与 kafka connector实践

Flink动态表和时态表总结

基于 FLINK SQL 的实时数据打宽的三种方式

将某一个主题设置为compacted topic

kafka支持的METADATA类型



推荐阅读
  • 数据库异常智能分析与诊断
    数据库,异常, ... [详细]
  • 马蜂窝数据总监分享:从数仓到数据中台,大数据演进技术选型最优解
    大家好,今天分享的议题主要包括几大内容:带大家回顾一下大数据在国内的发展,从传统数仓到当前数据中台的演进过程;我个人认为数 ... [详细]
  • 目录摘要SQL的现在NoSQL,NotOnlySQL要分布式,也要SQL总结引用摘要毫不夸张的说,关系数据库是企业软件系统的核心,企业形形色色信息行为的背后,都有关系数据库的支撑。 ... [详细]
  • 实践解析可视化开发平台FlinkSever优势
    实践,解析,可,视,化,开发,平台,fli ... [详细]
  • 本文详细介绍了MysqlDump和mysqldump进行全库备份的相关知识,包括备份命令的使用方法、my.cnf配置文件的设置、binlog日志的位置指定、增量恢复的方式以及适用于innodb引擎和myisam引擎的备份方法。对于需要进行数据库备份的用户来说,本文提供了一些有价值的参考内容。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 本文由编程笔记小编整理,介绍了PHP中的MySQL函数库及其常用函数,包括mysql_connect、mysql_error、mysql_select_db、mysql_query、mysql_affected_row、mysql_close等。希望对读者有一定的参考价值。 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • Oracle分析函数first_value()和last_value()的用法及原理
    本文介绍了Oracle分析函数first_value()和last_value()的用法和原理,以及在查询销售记录日期和部门中的应用。通过示例和解释,详细说明了first_value()和last_value()的功能和不同之处。同时,对于last_value()的结果出现不一样的情况进行了解释,并提供了理解last_value()默认统计范围的方法。该文对于使用Oracle分析函数的开发人员和数据库管理员具有参考价值。 ... [详细]
  • MyBatis错题分析解析及注意事项
    本文对MyBatis的错题进行了分析和解析,同时介绍了使用MyBatis时需要注意的一些事项,如resultMap的使用、SqlSession和SqlSessionFactory的获取方式、动态SQL中的else元素和when元素的使用、resource属性和url属性的配置方式、typeAliases的使用方法等。同时还指出了在属性名与查询字段名不一致时需要使用resultMap进行结果映射,而不能使用resultType。 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
author-avatar
伴生约定_879
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有