热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flinkscalamysql_Flink1.9实战:使用SQL读取Kafka并写入MySQL

上周六在深圳分享了《FlinkSQL1.9.0技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的Demo代码非常感兴趣,迫不及待地想尝试下࿰

上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码非常感兴趣,迫不及待地想尝试下,所以写了这篇文章分享下这份代码。希望对于 Flink SQL 的初学者能有所帮助。完整分享可以观看 Meetup 视频回顾 :https://developer.aliyun.com/live/1416

这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动停止脚本、 一份测试数据集、Kafka 数据源生成器。

通过本实战,你将学到:

如何使用 Blink Planner

一个简单的 SqlSubmit 是如何实现的

如何用 DDL 创建一个 Kafka 源表和 MySQL 结果表

运行一个从 Kafka 读取数据,计算 PVUV,并写入 MySQL 的作业

设置调优参数,观察对作业的影响

SqlSubmit 的实现

笔者一开始是想用 SQL Client 来贯穿整个演示环节,但可惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。所以笔者就只好自己写了个简单的提交脚本。后来想想,也挺好的,可以让听众同时了解如何通过 SQL 的方式,和编程的方式使用 Flink SQL。

SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现非常简单,就是通过正则表达式匹配每个语句块。如果是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。如果是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

// 创建一个使用 Blink Planner 的 TableEnvironment, 并工作在流模式

TableEnvironment tEnv = TableEnvironment.create(settings);

// 读取 SQL 文件

List sql = Files.readAllLines(path);

// 通过正则表达式匹配前缀,来区分不同的 SQL 语句

List calls = SqlCommandParser.parse(sql);

// 根据不同的 SQL 语句,调用 TableEnvironment 执行

for (SqlCommandCall call : calls) {

switch (call.command) {

case SET:

String key = call.operands[0];

String value = call.operands[1];

// 设置参数

tEnv.getConfig().getConfiguration().setString(key, value);

break;

case CREATE_TABLE:

String ddl = call.operands[0];

tEnv.sqlUpdate(ddl);

break;

case INSERT_INTO:

String dml = call.operands[0];

tEnv.sqlUpdate(dml);

break;

default:

throw new RuntimeException("Unsupported command: " + call.command);

}

}

// 提交作业

tEnv.execute("SQL Job");

使用 DDL 连接 Kafka 源表

在 flink-sql-submit 项目中,我们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,大概长这个样子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

为了模拟真实的 Kafka 数据源,笔者还特地写了一个 source-generator.sh 脚本(感兴趣的可以看下源码),会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中。

有了数据源后,我们就可以用 DDL 去创建并连接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql)。

CREATE TABLE user_log (

user_id VARCHAR,

item_id VARCHAR,

category_id VARCHAR,

behavior VARCHAR,

ts TIMESTAMP

) WITH (

'connector.type' = 'kafka', -- 使用 kafka connector

'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本

'connector.topic' = 'user_behavior', -- kafka topic

'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取

'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息

'connector.properties.0.value' = 'localhost:2181',

'connector.properties.1.key' = 'bootstrap.servers',

'connector.properties.1.value' = 'localhost:9092',

'update-mode' = 'append',

'format.type' = 'json', -- 数据源格式为 json

'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则

)

注:可能有用户会觉得其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。

使用 DDL 连接 MySQL 结果表

连接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (

dt VARCHAR,

pv BIGINT,

uv BIGINT

) WITH (

'connector.type' = 'jdbc', -- 使用 jdbc connector

'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url

'connector.table' = 'pvuv_sink', -- 表名

'connector.username' = 'root', -- 用户名

'connector.password' = '123456', -- 密码

'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条

)

PV UV 计算

假设我们的需求是计算每小时全网的用户访问量,和独立用户数。很多用户可能会想到使用滚动窗口来计算。但这里我们介绍另一种方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink

SELECT

DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,

COUNT(*) AS pv,

COUNT(DISTINCT user_id) AS uv

FROM user_log

GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 这个内置函数,将日志时间归一化成“年月日小时”的字符串格式,并根据这个字符串进行分组,即根据每小时分组,然后通过 COUNT(*) 计算用户访问量(PV),通过 COUNT(DISTINCT user_id) 计算独立用户数(UV)。这种方式的执行模式是每收到一条数据,便会进行基于之前计算的值做增量计算(如+1),然后将最新结果输出。所以实时性很高,但输出量也大。

我们将这个查询的结果,通过 INSERT INTO 语句,写到了之前定义的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我们有对这种查询的性能调优做了深度的介绍。

实战演示

环境准备

本实战演示环节需要安装一些必须的服务,包括:

Flink 本地集群:用来运行 Flink SQL 任务。

Kafka 本地集群:用来作为数据源。

MySQL 数据库:用来作为结果表。

Flink 本地集群安装

1.下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz

2.下载以下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。因为我们运行时需要依赖各个 connector 实现。

3.将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因为我们的演示任务可能会消耗多于1个的 slot。

4.在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。

运行成功的话,可以在 http://localhost:8081 访问到 Flink Web UI。

d7bef2747a41baffedb5b0a7e39fdcb3.png

另外,还需要将 Flink 的安装路径填到 flink-sql-submit 项目的 env.sh 中,用于后面提交 SQL 任务,如我的路径是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集群安装

将安装路径填到 flink-sql-submit 项目的 env.sh 中,如我的路径是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。

在命令行执行 jps,如果看到 Kafka 进程和 QuorumPeerMain 进程即表明启动成功。

MySQL 安装

$ docker pull mysql

$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然后在 MySQL 中创建一个 flink-test 的数据库,并按照上文的 schema 创建 pvuv_sink 表。

提交 SQL 任务

1.在 flink-sql-submit 目录下运行 ./source-generator.sh,会自动创建 user_behavior topic,并实时往里灌入数据。

272830285faf0799e65d211ed27e4219.png

2.在 flink-sql-submit 目录下运行 ./run.sh q1, 提交成功后,可以在 Web UI 中看到拓扑。

206c5b5a12f752601a3bc16a3540ba35.png

在 MySQL 客户端,我们也可以实时地看到每个小时的 pv uv 值在不断地变化

96d4c9eb9ad079627d4c6cd3d61b9fc2.png

结尾

本文带大家搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习了解如何连接外部系统。flink-sql-submit/src/main/resources/q1.sql 中还有一些注释掉的调优参数,感兴趣的同学可以将参数打开,观察对作业的影响。关于这些调优参数的原理,可以看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内幕和最佳实践》。

原文链接

本文为云栖社区原创内容,未经允许不得转载。



推荐阅读
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 本文介绍如何使用 Python 的 DOM 和 SAX 方法解析 XML 文件,并通过示例展示了如何动态创建数据库表和处理大量数据的实时插入。 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 您的数据库配置是否安全?DBSAT工具助您一臂之力!
    本文探讨了Oracle提供的免费工具DBSAT,该工具能够有效协助用户检测和优化数据库配置的安全性。通过全面的分析和报告,DBSAT帮助用户识别潜在的安全漏洞,并提供针对性的改进建议,确保数据库系统的稳定性和安全性。 ... [详细]
  • 在Cisco IOS XR系统中,存在提供服务的服务器和使用这些服务的客户端。本文深入探讨了进程与线程状态转换机制,分析了其在系统性能优化中的关键作用,并提出了改进措施,以提高系统的响应速度和资源利用率。通过详细研究状态转换的各个环节,本文为开发人员和系统管理员提供了实用的指导,旨在提升整体系统效率和稳定性。 ... [详细]
  • 触发器的稳态数量分析及其应用价值
    本文对数据库中的SQL触发器进行了稳态数量的详细分析,探讨了其在实际应用中的重要价值。通过研究触发器在不同场景下的表现,揭示了其在数据完整性和业务逻辑自动化方面的关键作用。此外,还介绍了如何在Ubuntu 22.04环境下配置和使用触发器,以及在Tomcat和SQLite等平台上的具体实现方法。 ... [详细]
  • 深入解析HTML5字符集属性:charset与defaultCharset
    本文将详细介绍HTML5中新增的字符集属性charset和defaultCharset,帮助开发者更好地理解和应用这些属性,以确保网页在不同环境下的正确显示。 ... [详细]
  • Linux CentOS 7 安装PostgreSQL 9.5.17 (源码编译)
    近日需要将PostgreSQL数据库从Windows中迁移到Linux中,LinuxCentOS7安装PostgreSQL9.5.17安装过程特此记录。安装环境&#x ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • DVWA学习笔记系列:深入理解CSRF攻击机制
    DVWA学习笔记系列:深入理解CSRF攻击机制 ... [详细]
  • SecureCRT是一款功能强大的终端仿真软件,支持SSH1和SSH2协议,适用于在Windows环境下高效连接和管理Linux服务器。该工具不仅提供了稳定的连接性能,还具备丰富的配置选项,能够满足不同用户的需求。通过SecureCRT,用户可以轻松实现对远程Linux系统的安全访问和操作。 ... [详细]
  • Linux基础知识:Vi与Vim编辑器详解
    Linux基础知识:Vi与Vim编辑器详解 ... [详细]
author-avatar
书友8649571
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有