热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink实践教程:入门(6):读取PG数据写入ClickHouse

作者:腾讯云流计算Oceanus团队流计算Oceanus简介流计算Oceanus是大数据产品生态体系的实时化分析利器,是基于ApacheFlink构建的具备一站开发、无缝连接、亚秒

作者:腾讯云流计算 Oceanus 团队


流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
本文将向您详细介绍如何获取 PostgreSQL 表数据,并使用字符串函数进行转换,最后将数据输出到 ClickHouse 中。


操作视频


前置准备


创建流计算 Oceanus 集群

进入流计算 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考流计算 Oceanus 官方文档 创建独享集群 [2]。


创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [3],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [4]。


数据准备:

进入实例数据库,创建 test1 表,并手动插入数据。

-- 建表语句create table public.test1 ( id INT, str_one VARCHAR(50), str_two VARCHAR(50), str_thr VARCHAR(50), PRIMARY key(id));-- 插入语句INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1');INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2');INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');

笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [5]



创建 ClickHouse 集群

进入 ClickHouse 控制台 [6],点击左上角【新建集群】,完成 ClickHouse 集群创建,具体可参考 ClickHouse 快速入门 [7]。创建 ClickHouse 表:  登陆 ClickHouse 集群(登入方式参考 ClickHouse 快速入门 [7]),并建表。

CREATE TABLE default.pg_to_ck on cluster default_cluster ( id Int8, str_one String, str_two String, str_thr String, Sign Int8 )ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign)ORDER BY (id);

注:流计算 Oceanus 集群、PostgreSQL 实例、ClickHouse 集群需在同一 VPC 下。



流计算 Oceanus 作业


1. 创建 Source

-- PostgreSQL CDC Source。CREATE TABLE PostgreSourceTable ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ( 'connector' = 'postgres-cdc', -- 必须为 'postgres-cdc' 'hostname' = '10.0.0.236', -- 数据库的 IP 'port' = '5432', -- 数据库的访问端口 'username' = 'root', -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例) 'password' = 'xxxxxxxxxxx', -- 数据库访问使用的密码 'database-name' = 'postgres', -- 需要同步的数据库名 'schema-name' = 'public', -- 需要同步的数据库模式 (Schema) 'table-name' = 'test1' -- 需要同步的数据表名);

2. 创建 Sink

-- ClickHouse Sink (不完全支持upsert,详见说明文档)。配合 flink-connector-clickhouse 使用。CREATE TABLE clickhouse_sink ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ( 'connector' = 'clickhouse', -- connector 类型为 clickhouse 'url' = 'clickhouse://10.0.0.178:8123', -- 指定数据库链接 url 'database-name' = 'default', -- 需要写入的 clickhouse 库名 'table-name' = 'pg_to_ck', -- 需要写入的 clickhouse 表名 'table.collapsing.field' = 'Sign' -- 采用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列字段的名称);

3. 编写业务 SQL

INSERT INTO clickhouse_sinkSELECT id,--INITCAP:将 str_one 中的单词转为大写开头,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。 INITCAP(str_one) AS str_one,--TO_BASE64:将 string 表示的字符串编码为 Base64 字符串。 TO_BASE64(str_two) AS str_two,--REPLACE:将 string1 字符串中所有的 string2 替换为 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。 REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr FROM PostgreSourceTable;

这里我们使用 Flink 1.13 集群,旧版 Flink 集群需选择相应的内置 Connector



总结


    使用 Postgres-CDC 连接器:


用于同步的 Postgres 用户至少需要开启 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。可以进入 PostgreSQL 数据库进行授权操作。

CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON DATABASE database_name TO debezium_user;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;

日志级别必须大于等于 logical, 且设置后需要重启实例。进入数据库实例,单击【参数设置】,单击【WAL】,修改【wal_level】的【参数运行值】为 "logical"。修改成功后点击右上角【重启】。


    更多字符串操作函数请参考流计算 Oceanus 官方文档 字符串函数[8]。



参考链接

[1] 流计算 Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index

[4] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961

[5] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429

[6] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou

[7] ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824

[8] 流计算 Oceanus 字符串函数:https://cloud.tencent.com/document/product/849/18073


关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站 Get~

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓




Flink 实践教程:入门(6):读取 PG 数据写入 ClickHouse的相关教程结束。



推荐阅读
  • Python SQLAlchemy库的使用方法详解
    本文详细介绍了Python中使用SQLAlchemy库的方法。首先对SQLAlchemy进行了简介,包括其定义、适用的数据库类型等。然后讨论了SQLAlchemy提供的两种主要使用模式,即SQL表达式语言和ORM。针对不同的需求,给出了选择哪种模式的建议。最后,介绍了连接数据库的方法,包括创建SQLAlchemy引擎和执行SQL语句的接口。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 电话号码的字母组合解题思路和代码示例
    本文介绍了力扣题目《电话号码的字母组合》的解题思路和代码示例。通过使用哈希表和递归求解的方法,可以将给定的电话号码转换为对应的字母组合。详细的解题思路和代码示例可以帮助读者更好地理解和实现该题目。 ... [详细]
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了在rhel5.5操作系统下搭建网关+LAMP+postfix+dhcp的步骤和配置方法。通过配置dhcp自动分配ip、实现外网访问公司网站、内网收发邮件、内网上网以及SNAT转换等功能。详细介绍了安装dhcp和配置相关文件的步骤,并提供了相关的命令和配置示例。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • C# 7.0 新特性:基于Tuple的“多”返回值方法
    本文介绍了C# 7.0中基于Tuple的“多”返回值方法的使用。通过对C# 6.0及更早版本的做法进行回顾,提出了问题:如何使一个方法可返回多个返回值。然后详细介绍了C# 7.0中使用Tuple的写法,并给出了示例代码。最后,总结了该新特性的优点。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
  • 如何在php中将mysql查询结果赋值给变量
    本文介绍了在php中将mysql查询结果赋值给变量的方法,包括从mysql表中查询count(学号)并赋值给一个变量,以及如何将sql中查询单条结果赋值给php页面的一个变量。同时还讨论了php调用mysql查询结果到变量的方法,并提供了示例代码。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
author-avatar
用户cnhr0qjy0s
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有