首页
技术博客
PHP教程
数据库技术
前端开发
HTML5
Nginx
php论坛
新用户注册
|
会员登录
PHP教程
技术博客
编程问答
PNG素材
编程语言
前端技术
Android
PHP教程
HTML5教程
数据库
Linux技术
Nginx技术
PHP安全
WebSerer
职场攻略
JavaScript
开放平台
业界资讯
大话程序猿
登录
极速注册
取消
热门标签 | HotTags
loops
export
byte
shell
integer
config
post
uml
utf-8
future
hook
jar
typescript
controller
copy
scala
regex
install
web
char
settings
usb
runtime
js
erlang
replace
split
rsa
ascii
vba
command
datetime
instance
object
bash
python3
email
dagger
php7
merge
case
expression
tree
hashtable
数组
search
testing
io
text
filter
cookie
ip
emoji
vbscript
c语言
get
request
java
heap
chat
netty
cpython
bitmap
select
jsp
metadata
php8
buffer
less
iostream
web3
plugins
client
input
cPlusPlus
express
php
node.js
const
当前位置:
开发笔记
>
编程语言
> 正文
FlinkSQL实时消费Kafka并写入到MySQL
作者:白开水 | 来源:互联网 | 2023-06-24 15:08
长按二维码关注大数据领域必关注的公众号1.需求Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。2.添加依赖添加Flink SQL客户端实时消费Kafk
长按二维码关注
大数据领域必关注的公众号
1.需求
Flink SQL实时消费Kafka中的数据,然后做聚合分析,最后将聚合结果写入MySQL数据库。
2.添加依赖
添加Flink SQL客户端实时消费Kafka并写入MySQL的相关依赖。
flink-connector-jdbc_2.11-1.13.5.jar
flink-sql-connector-kafka_2.11-1.13.5.jar
mysql-connector-java-5.1.38.jar
依赖包冲突:
错误:Cannot load user class:org.apache.flink.
connector.jdbc.internal.GenericJdbcSinkFunction
解决思路:
修改flink集群的配置中加载包的顺序
解决步骤:
在flink-conf.yaml中添加如下内容:
classloader.resolve-order: parent-first
3.启动相关服务
(1)启动Flink 集群服务
bin/start-cluster.sh
(2)进入Flink SQL CLI
./sql-client.sh
(3)启动Kafka集群
bin/kafka-server-start.sh -daemon config/server.
properties
(4)启动MySQL服务
service mysqld start
4.FlinkSQL创建表
(1)在Flink SQL 客户端使用如下命令创建kafka source表。
CREATE TABLE sourceTable (
`user` STRING,
url STRING,
cTime STRING
) WITH (
'connector' = 'kafka',
'topic' = 'clicklog_input',
'properties.bootstrap.servers' = 'hadoop1:9092',
'properties.group.id' = 'clicklog',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
(2)在Flink SQL 客户端使用如下命令创建jdbc sink表。
CREATE TABLE sinkTable (
`user` STRING,
cnt BIGINT,
PRIMARY KEY (`user`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/test',
'username' = 'hive',
'password' = 'hive',
'table-name' = 'clickcount'
);
(3)在MySQL的test数据库中创建clickcount表
DROP TABLE IF EXISTS `clickcount`;
CREATE TABLE `clickcount` (
`user` varchar(20) NOT NULL DEFAULT '',
`cnt` bigint(11) DEFAULT NULL,
PRIMARY KEY (`user`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
5.Flink SQL进行聚合分析
(1)Flink SQL客户端从Kafka表中实时读取数据,然后做数据聚合,最后再写入MySQL数据库。
#Flink SQL读取kafka 属于流式计算模式
SET 'execution.runtime-mode' = 'streaming';
INSERT INTO sinkTable
SELECT user,count(url) as cnt
FROM sourceTable
group by user;
(2)打开Kafka生产者模拟产生数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic clicklog_input
{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}
(3)查看聚合分析结果
欢迎点赞 + 收藏 + 在看
素质三连
完
▼
往期精彩回顾
▼
程序员,如何避免内卷
Apache 架构师总结的 30 条架构原则
【全网首发】Hadoop 3.0分布式集群安装
大数据运维工程师经典面试题汇总(附带答案)
大数据面试130题
某集团大数据平台整体架构及实施方案完整目录
大数据凉凉了?Apache将一众大数据开源项目束之高阁!
实战企业数据湖,抢先数仓新玩法
Superset制作智慧数据大屏,看它就够了
Apache Flink 在快手的过去、现在和未来
华为云-基于Ambari构建大数据平台(上)
华为云-基于Ambari构建大数据平台(下)
【HBase调优】Hbase万亿级存储性能优化总结
【Python精华】100个Python练手小程序
【HBase企业应用开发】工作中自己总结的Hbase笔记,非常全面!
【剑指Offer】近50个常见算法面试题的Java实现代码
长按识别左侧二维码
关注领福利
领10本经典大数据书
flink
kafka
mysql
sql
二维码
数据库
jar
java
apache
写下你的评论吧 !
吐个槽吧,看都看了
会员登录
|
用户注册
推荐阅读
install
Amoeba 优化 MySQL 读写分离性能
Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ...
[详细]
蜡笔小新 2024-11-08 19:19:47
char
oracle c3p0 dword 60,web_day10 dbcp c3p0 dbutils
createdatabasemydbcharactersetutf8;alertdatabasemydbcharactersetutf8;1.自定义连接池为了不去经常创建连接和释放 ...
[详细]
蜡笔小新 2024-11-12 19:26:15
char
InfluxDB、collectd与Grafana的详细安装与配置指南
本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ...
[详细]
蜡笔小新 2024-11-11 19:54:24
web
基于CXF框架的Web服务开发详细示例
在Java Web服务开发中,Apache CXF 和 Axis2 是两个广泛使用的框架。CXF 由于其与 Spring 框架的无缝集成能力,以及更简便的部署方式,成为了许多开发者的首选。本文将详细介绍如何使用 CXF 框架进行 Web 服务的开发,包括环境搭建、服务发布和客户端调用等关键步骤,为开发者提供一个全面的实践指南。 ...
[详细]
蜡笔小新 2024-11-08 18:43:17
runtime
Kafka 版本不兼容引发 TaskExecutor 启动故障分析与解决
在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ...
[详细]
蜡笔小新 2024-11-08 14:13:56
copy
基于iSCSI的SQL Server 2012群集测试(一)SQL群集安装
一、测试需求介绍与准备公司计划服务器迁移过程计划同时上线SQLServer2012,引入SQLServer2012群集提高高可用性,需要对SQLServ ...
[详细]
蜡笔小新 2024-11-13 15:49:49
utf-8
使用ArcGIS for Java和Flex浏览自定义ArcGIS Server 9.3地图
本文介绍了如何在Flex应用程序中实现浏览自定义ArcGIS Server 9.3发布的地图。这是一个基本的入门示例,适用于初学者。 ...
[详细]
蜡笔小新 2024-11-13 14:40:13
web
用阿里云的免费 SSL 证书让网站从 HTTP 换成 HTTPS
HTTP协议是不加密传输数据的,也就是用户跟你的网站之间传递数据有可能在途中被截获,破解传递的真实内容,所以使用不加密的HTTP的网站是不 ...
[详细]
蜡笔小新 2024-11-13 14:02:50
config
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例
com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ...
[详细]
蜡笔小新 2024-11-13 10:47:33
char
Spring Boot 使用 JPA 删除数据时 SQL 错误解决方案
本文介绍了在 Spring Boot 中使用 JPA 进行数据删除操作时遇到的 SQL 错误及其解决方法。错误表现为:删除操作失败,原因是无法打开 JPA EntityManager 以进行事务处理。 ...
[详细]
蜡笔小新 2024-11-12 18:01:11
config
Spring详解(六)AOP
原文网址:https:www.cnblogs.comysoceanp7476379.html目录1、AOP什么?2、需求3、解决办法1:使用静态代理4 ...
[详细]
蜡笔小新 2024-11-12 14:40:40
js
解决Bootstrap DataTable Ajax请求重复问题
在最近的一个项目中,我们使用了JQuery DataTable进行数据展示,虽然使用起来非常方便,但在测试过程中发现了一个问题:当查询条件改变时,有时查询结果的数据不正确。通过FireBug调试发现,点击搜索按钮时,会发送两次Ajax请求,一次是原条件的请求,一次是新条件的请求。 ...
[详细]
蜡笔小新 2024-11-12 13:59:27
char
如何在Java中使用DButils类
这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ...
[详细]
蜡笔小新 2024-11-12 13:46:11
replace
JavaWeb文件上传:前端实现与后端处理详解
在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ...
[详细]
蜡笔小新 2024-11-11 19:50:46
char
MySQL Decimal 类型的最大值解析及其在数据处理中的应用艺术
在关系型数据库中,表的设计与SQL语句的编写对性能的影响至关重要,甚至可占到90%以上。本文将重点探讨MySQL中Decimal类型的最大值及其在数据处理中的应用技巧,通过实例分析和优化建议,帮助读者深入理解并掌握这一重要知识点。 ...
[详细]
蜡笔小新 2024-11-11 19:36:19
白开水
这个家伙很懒,什么也没留下!
Tags | 热门标签
loops
export
byte
shell
integer
config
post
uml
utf-8
future
hook
jar
typescript
controller
copy
scala
regex
install
web
char
settings
usb
runtime
js
erlang
replace
split
rsa
ascii
vba
RankList | 热门文章
1
Maven没有正确地选择JAVA_HOME - Maven not picking JAVA_HOME correctly
2
重新命名和删除约束
3
Java Performance Tuning笔记
4
ASP.NET获取IP与MAC[using C#]
5
DJango_Django 模板
6
Mysql存储级别_mysql的权限级别
7
windows443端口和80端口被占用怎么解决
8
正则表达式 php 模板,PHP中正则表达式回顾(4)编写一个非常简单而且山寨的smarty模板引擎...
9
基础_Jsp编程的基础模型
10
ObjectiveC中有没有办法比较两个方法的地址
11
NBIoTDTU对比于3G/4G DTU的区别和优势
12
图文实践给***KVM架构配置IPV6的过程
13
gitpod.io,云端开发调试工具。
14
iOS 12 beta8如何更新? iOS 12 beta8更新方法攻略介绍!
15
引入 ServletContextListener @Autowired null 解决办法
PHP1.CN | 中国最专业的PHP中文社区 |
DevBox开发工具箱
|
json解析格式化
|
PHP资讯
|
PHP教程
|
数据库技术
|
服务器技术
|
前端开发技术
|
PHP框架
|
开发工具
|
在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved |
京公网安备 11010802041100号
|
京ICP备19059560号-4
| PHP1.CN 第一PHP社区 版权所有