热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkSQL实时消费Kafka并写入到MySQL

长按二维码关注大数据领域必关注的公众号1.需求Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。2.添加依赖添加Flink SQL客户端实时消费Kafk

长按二维码关注

大数据领域必关注的公众号



1.需求
Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。

2.添加依赖
添加Flink SQL客户端实时消费Kafka并写入MySQL的相关依赖。
flink-connector-jdbc_2.11-1.13.5.jar
flink-sql-connector-kafka_2.11-1.13.5.jar
mysql-connector-java-5.1.38.jar

依赖包冲突:
错误:Cannot load user class:org.apache.flink.
connector.jdbc.internal.GenericJdbcSinkFunction

解决思路:
修改flink集群的配置中加载包的顺序

解决步骤:
在flink-conf.yaml中添加如下内容:
classloader.resolve-order: parent-first

3.启动相关服务
(1)启动Flink 集群服务
bin/start-cluster.sh

(2)进入Flink SQL CLI
./sql-client.sh

(3)启动Kafka集群
bin/kafka-server-start.sh -daemon config/server.
properties

(4)启动MySQL服务
service mysqld start

4.FlinkSQL创建表
(1)在Flink SQL 客户端使用如下命令创建kafka source表。
CREATE TABLE sourceTable (
`user` STRING,
 url STRING,
 cTime STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'clicklog_input',
 'properties.bootstrap.servers' = 'hadoop1:9092',
 'properties.group.id' = 'clicklog',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);

(2)在Flink SQL 客户端使用如下命令创建jdbc sink表。
CREATE TABLE sinkTable (
 `user` STRING,
 cnt BIGINT,
 PRIMARY KEY (`user`) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://hadoop1:3306/test',
 'username' = 'hive',
 'password' = 'hive',
 'table-name' = 'clickcount'
);

(3)在MySQL的test数据库中创建clickcount表
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
  `user` varchar(20) NOT NULL DEFAULT '',
  `cnt` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`user`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.Flink SQL进行聚合分析
(1)Flink SQL客户端从Kafka表中实时读取数据,然后做数据聚合,最后再写入MySQL数据库。
#Flink SQL读取kafka 属于流式计算模式 
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO sinkTable
SELECT user,count(url) as cnt
FROM sourceTable
group by user;

(2)打开Kafka生产者模拟产生数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}

(3)查看聚合分析结果

欢迎点赞 + 收藏 + 在看  素质三连 


往期精彩回顾
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码

长按识别左侧二维码

     关注领福利    

  领10本经典大数据书


推荐阅读
  • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
  • oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
    createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
    一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ... [详细]
  • 使用ArcGIS for Java和Flex浏览自定义ArcGIS Server 9.3地图
    本文介绍了如何在Flex应用程序中实现浏览自定义ArcGIS Server 9.3发布的地图。这是一个基本的入门示例,适用于初学者。 ... [详细]
  • 用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
    HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 本文介绍了在 Spring Boot 中使用 JPA 进行数据删除操作时遇到的 SQL 错误及其解决方法。错误表现为:删除操作失败,原因是无法打开 JPA EntityManager 以进行事务处理。 ... [详细]
  • 原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ... [详细]
  • 解决Bootstrap DataTable Ajax请求重复问题
    在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
    在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ... [详细]
author-avatar
白开水
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有