关于使用flinksql实现数据覆盖功能,我个人不是很推荐,我也只是在local模式下实现的,目前支持数据覆盖的只要fliesystem、hive两种数据源
我这里以filesystem为例子,filesystem要使用insert overwrite语句,是针对分区表
可以参考文档:https://www.bookstack.cn/read/ApacheFlink-1.13-zh/518d1aeb4e76005f.md
一、创建作业
我这里用的filesystem是基于csv格式的数据格式,具有需要引用那些依赖可以参考:https://www.cnblogs.com/braveym/p/16803764.html
二、编写flinksql代码
CREATE TABLE fs_table (
`id` INT,
`title` STRING,
`source` STRING,
`head_img` STRING,
`url` STRING,
`keyword` STRING,
`category` STRING
) PARTITIONED BY (category) WITH (
'connector'='filesystem',
'path'='file:///home/data/filesystem',
'format'='csv'
);
CREATE TABLE source_news(
`id` INT,
`timestamp` TIMESTAMP,
`title` STRING,
`source` STRING,
`head_img` STRING,
`url` STRING,
`category` STRING,
`keyword` STRING,
`tag` STRING,
`description` STRING,
`content` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://172.16.119.50:3306/test?createDatabaseIfNotExist=true&useSSL=false&characterEncoding=UTF-8',
'username' = 'root',
'password' = 'Tj@20220710',
'table-name' = 'source_news'
);
-- INSERT overwrite fs_table PARTITION (category='军事')
-- SELECT id,title,source,head_img,url,keyword FROM source_news where category='军事';
-- INSERT overwrite fs_table PARTITION (category='科技')
-- SELECT id,title,source,head_img,url,keyword FROM source_news where category='科技';
--全量覆盖
INSERT overwrite fs_table PARTITION (category='文化')
SELECT id,title,source,head_img,url,keyword FROM source_news where category='文化' ;
三、运行作业
这个运行的时候要使用批模式
运行成功,这个是实现全量覆盖的,个人感觉这个功能对flink来说意义不是很大,毕竟flink主要是处理流数据的