作者:tantyana428_673 | 来源:互联网 | 2023-09-07 08:05
flinkapi层次结构图 其中,flinksql处于flink最高层的api,相当于api来讲,用法更易理解,但是没有api灵活些,下面简单介绍下flinksql的简单应用。
flinkapi层次结构图
其中,flinksql处于flink最高层的api,相当于api来讲,用法更易理解,但是没有api灵活些,下面简单介绍下flinksql的简单应用。
flinksql样例
备注:使用的是1.13.0版本
消费kafka
CREATE TABLE bg_action (
bg BIGINT,
user_source BIGINT,
uid BIGINT,
action VARCHAR,
__ts BIGINT,
actionp VARCHAR,
actionp2 VARCHAR,
actionp3 VARCHAR,
actionp5 VARCHAR,
actionp8 VARCHAR,
actionp10 VARCHAR,
t as if(__ts is null,cast(TIMESTAMPADD(HOUR,8,current_timestamp) as TIMESTAMP(3)),to_timestamp(from_unixtime(__ts/1000,'yyyy-MM-dd HH:mm:ss'))),
watermark for t as t - interval '3' second
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = '***', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.group.id' = '***',
'properties.bootstrap.servers' = '***',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
查询
select action,t from bg_action;
写入kafka
create table sink(
action string
) with(
'connector' = 'kafka',
'topic' = '***',
'properties.bootstrap.servers' = '***',
'sink.partitioner' = 'round-robin',
'format' = 'json'
); --定义sink表作为topic的输出
insert into sink
select action from bg_action
当然,中间处理逻辑也可以通过view进行作为临时表映射。