热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka传数据到Flink存储到mysql之Flink使用SQL语句聚合数据流(设置时间窗口,EventTime)...

网上没什么资料,就分享下:)简单模式:kafka传数据到Flink存储到mysql可以参考网站:利用Flinkstream从kafka中写

网上没什么资料,就分享下:)

简单模式:kafka传数据到Flink存储到mysql 可以参考网站:

利用Flink stream从kafka中写数据到mysql

maven依赖情况:

<project xmlns&#61;"http://maven.apache.org/POM/4.0.0" xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>com.xxrgroupId><artifactId>flinkartifactId><version>0.0.1-SNAPSHOTversion><packaging>jarpackaging><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><flink.version>1.4.1flink.version>properties><build><pluginManagement><plugins>
<plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target><encoding>UTF-8encoding><compilerArgs><arg>-extdirsarg><arg>${project.basedir}/src/libarg>compilerArgs>configuration>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-assembly-pluginartifactId><version>2.5.5version><configuration><archive><manifest><mainClass>com.xxr.flink.stream_sqlmainClass>manifest>archive><descriptorRefs><descriptorRef>jar-with-dependenciesdescriptorRef>descriptorRefs>configuration>plugin>plugins>pluginManagement>build><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>3.8.1version><scope>testscope>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.8.0-beta1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.8.0-beta1version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-compilerartifactId><version>2.11.1version>dependency><dependency><groupId>org.scala-lang.modulesgroupId><artifactId>scala-xml_2.11artifactId><version>1.0.2version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-scala_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-coreartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-runtime_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-wikiedits_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka-0.8_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-jdbcartifactId><version>${flink.version}version>dependency><dependency><groupId>mysqlgroupId><artifactId>mysql-connector-javaartifactId><version>5.1.39version>dependency>dependencies>
project>

配置文件及sql语句&#xff0c;时间窗口是1分钟&#xff1a;

public class JDBCTestBase {//每过一分钟计算一分钟内的num最大值&#xff0c;以rowtime作为时间基准public static final String SQL_MAX &#61; "SELECT MAX(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_AVG &#61; "SELECT AVG(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_MIN &#61; "SELECT MIN(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String kafka_group &#61; "test-consumer-group";public static final String kafka_zookper &#61; "localhost:2181";public static final String kafka_hosts &#61; "localhost:9092";public static final String kafka_topic &#61; "wiki-result";public static final String DRIVER_CLASS &#61; "com.mysql.jdbc.Driver";public static final String DB_URL &#61; "jdbc:mysql://localhost:3306/db?user&#61;user&password&#61;password";
}

 

MySQL建表&#xff1a;

CREATE TABLE wiki (Id int(11) NOT NULL AUTO_INCREMENT,avg double DEFAULT NULL,time timestamp NULL DEFAULT NULL,PRIMARY KEY (Id)
)

 

发送数据到kafka,用的是flink example的wikiproducer&#xff1a;

Monitoring the Wikipedia Edit Stream

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;public class WikipediaAnalysis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see &#61; StreamExecutionEnvironment.getExecutionEnvironment();DataStream edits &#61; see.addSource(new WikipediaEditsSource());KeyedStream keyedEdits &#61; edits.keyBy(new KeySelector() {&#64;Overridepublic String getKey(WikipediaEditEvent event) {return event.getUser();}});DataStream> result &#61; keyedEdits.timeWindow(Time.seconds(10)).fold(new Tuple3<>("", 0L,0L), new FoldFunction>() {&#64;Overridepublic Tuple3 fold(Tuple3 acc, WikipediaEditEvent event) {acc.f0 &#61; event.getUser().trim();acc.f1 &#43;&#61; event.getByteDiff();acc.f2 &#61; System.currentTimeMillis();return acc;}});result.map(new MapFunction, String>() {&#64;Overridepublic String map(Tuple3 tuple) {return tuple.toString();}}).addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));result.print();see.execute();}
}

重写RichSinkFunction&#xff0c;用于写入到mysql中&#xff1a;

 

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import kafka.common.Config;public class WikiSQLSink extends RichSinkFunction> {private static final long serialVersionUID &#61; 1L;private Connection connection;private PreparedStatement preparedStatement;String drivername &#61; JDBCTestBase.DRIVER_CLASS;String dburl &#61; JDBCTestBase.DB_URL;&#64;Overridepublic void invoke(Tuple3 value) throws Exception {Class.forName(drivername);connection &#61; DriverManager.getConnection(dburl);String sql &#61; "INSERT into wiki(name,avg,time) values(?,?,?)";preparedStatement &#61; connection.prepareStatement(sql);preparedStatement.setString(1, value.f0);preparedStatement.setLong(2, value.f1);preparedStatement.setLong(3, value.f2);//preparedStatement.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
preparedStatement.executeUpdate();if (preparedStatement !&#61; null) {preparedStatement.close();}if (connection !&#61; null) {connection.close();}}}

 

用Flink中流计算类&#xff0c;用的是EventTime&#xff0c;用sql语句对数据进行聚合&#xff0c;写入数据到mysql中去&#xff0c;sql的语法用的是是另一个开源框架Apache Cassandra&#xff1a;

图片说明&#xff1a;

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes

package com.xxr.flink;import java.sql.Timestamp;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.WindowedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;
//时间参数网址
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#event-time
//Concepts & Common API
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/common.html#register-a-table
//SQL语法
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html
public class stream_sql {public static void main(String[] args) throws Exception {Properties pro &#61; new Properties();pro.put("bootstrap.servers", JDBCTestBase.kafka_hosts);pro.put("zookeeper.connect", JDBCTestBase.kafka_zookper);pro.put("group.id", JDBCTestBase.kafka_group);StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv &#61; TableEnvironment.getTableEnvironment(env);// env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(5000);
DataStream sourceStream &#61; env.addSource(new FlinkKafkaConsumer08(JDBCTestBase.kafka_topic, new SimpleStringSchema(), pro));DataStream> sourceStreamTra &#61; sourceStream.filter(new FilterFunction() {&#64;Overridepublic boolean filter(String value) throws Exception {return StringUtils.isNotBlank(value);}}).map(new MapFunction>() {&#64;Overridepublic Tuple3 map(String value) throws Exception {String temp &#61; value.replaceAll("(\\(|\\))", "");String[] args &#61; temp.split(",");try {return new Tuple3(Long.valueOf(args[2]), args[0].trim(), Long.valueOf(args[1]));} catch (Exception e) {// TODO Auto-generated catch block
e.printStackTrace();return new Tuple3(System.currentTimeMillis(), args[0].trim(),0L);}}});//設置将哪个字段用于eventTimeDataStream> withTimestampsAndWatermarks &#61; sourceStreamTra.assignTimestampsAndWatermarks(new FirstTandW());//内置参数rowtime.rowtime就是EventTime protime是ProcessingTimetableEnv.registerDataStream("wiki_table", withTimestampsAndWatermarks, "etime,name, num,rowtime.rowtime");withTimestampsAndWatermarks.print();// define sink for room data and execute queryJDBCAppendTableSink sink &#61; JDBCAppendTableSink.builder().setDrivername(JDBCTestBase.DRIVER_CLASS).setDBUrl(JDBCTestBase.DB_URL).setQuery("INSERT INTO wiki (avg,time) VALUES (?,?)").setParameterTypes(Types.LONG, Types.SQL_TIMESTAMP).build();//执行查询Table result &#61; tableEnv.sqlQuery(JDBCTestBase.SQL_MIN);//写入csv
// result.writeToSink(new CsvTableSink("D:/a.csv", // output path
// "|", // optional: delimit files by &#39;|&#39;
// 1, // optional: write to a single file
// WriteMode.OVERWRITE)); // optional: override existing files//写入数据库
result.writeToSink(sink);env.execute();}
}

重写AssignerWithPeriodicWatermarks设置watermark&#xff0c;处理时间是EventTime的话必须要有这个方法&#xff0c;ProcessingTime 可忽略

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class FirstTandW implements AssignerWithPeriodicWatermarks> {private final long maxOutOfOrderness &#61; 3500; // 3.5 secondsprivate long currentMaxTimestamp;&#64;Overridepublic long extractTimestamp(Tuple3 element, long previousElementTimestamp) {// TODO Auto-generated method stublong timestamp &#61; element.f0; currentMaxTimestamp &#61; Math.max(timestamp, currentMaxTimestamp);return timestamp;}&#64;Overridepublic Watermark getCurrentWatermark() {// TODO Auto-generated method stubreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}

 maven assembly打包成jar&#xff0c;放flink运行就行了&#xff0c;不会打包看我博客

基础知识

Flink 的Window 操作

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

Flink文档写的很好。。刚开始做没仔细看&#xff0c;坑不少

git&#xff1a;https://github.com/xxrznj/flink-kafka-sql


转载于:https://www.cnblogs.com/34fj/p/8820094.html


推荐阅读
  • 一个建表一个执行crud操作建表代码importandroid.content.Context;importandroid.database.sqlite.SQLiteDat ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • 本文深入探讨了如何利用Maven高效管理项目中的外部依赖库。通过介绍Maven的官方依赖搜索地址(),详细讲解了依赖库的添加、版本管理和冲突解决等关键操作。此外,还提供了实用的配置示例和最佳实践,帮助开发者优化项目构建流程,提高开发效率。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Presto:高效即席查询引擎的深度解析与应用
    本文深入解析了Presto这一高效的即席查询引擎,详细探讨了其架构设计及其优缺点。Presto通过内存到内存的数据处理方式,显著提升了查询性能,相比传统的MapReduce查询,不仅减少了数据传输的延迟,还提高了查询的准确性和效率。然而,Presto在大规模数据处理和容错机制方面仍存在一定的局限性。本文还介绍了Presto在实际应用中的多种场景,展示了其在大数据分析领域的强大潜力。 ... [详细]
  • 一、Tomcat安装后本身提供了一个server,端口配置默认是8080,对应目录为:..\Tomcat8.0\webapps二、Tomcat8.0配置多个端口,其实也就是给T ... [详细]
  • HTTP(HyperTextTransferProtocol)是超文本传输协议的缩写,它用于传送www方式的数据。HTTP协议采用了请求响应模型。客服端向服务器发送一 ... [详细]
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 本文详细介绍了如何在 Linux 系统上安装 JDK 1.8、MySQL 和 Redis,并提供了相应的环境配置和验证步骤。 ... [详细]
  • 本文介绍了如何使用Java和PDFBox库根据坐标值对PDF文件进行局部切割的方法。 ... [详细]
  • 本文介绍如何在 Android 中自定义加载对话框 CustomProgressDialog,包括自定义 View 类和 XML 布局文件的详细步骤。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 在PHP中如何正确调用JavaScript变量及定义PHP变量的方法详解 ... [详细]
  • Amoeba 通过优化 MySQL 的读写分离功能显著提升了数据库性能。作为一款基于 MySQL 协议的代理工具,Amoeba 能够高效地处理应用程序的请求,并根据预设的规则将 SQL 请求智能地分配到不同的数据库实例,从而实现负载均衡和高可用性。该方案不仅提高了系统的并发处理能力,还有效减少了主数据库的负担,确保了数据的一致性和可靠性。 ... [详细]
author-avatar
怡伶心怡67
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有