热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

kafka传数据到Flink存储到mysql之Flink使用SQL语句聚合数据流(设置时间窗口,EventTime)...

网上没什么资料,就分享下:)简单模式:kafka传数据到Flink存储到mysql可以参考网站:利用Flinkstream从kafka中写

网上没什么资料,就分享下:)

简单模式:kafka传数据到Flink存储到mysql 可以参考网站:

利用Flink stream从kafka中写数据到mysql

maven依赖情况:

<project xmlns&#61;"http://maven.apache.org/POM/4.0.0" xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>com.xxrgroupId><artifactId>flinkartifactId><version>0.0.1-SNAPSHOTversion><packaging>jarpackaging><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><flink.version>1.4.1flink.version>properties><build><pluginManagement><plugins>
<plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target><encoding>UTF-8encoding><compilerArgs><arg>-extdirsarg><arg>${project.basedir}/src/libarg>compilerArgs>configuration>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-assembly-pluginartifactId><version>2.5.5version><configuration><archive><manifest><mainClass>com.xxr.flink.stream_sqlmainClass>manifest>archive><descriptorRefs><descriptorRef>jar-with-dependenciesdescriptorRef>descriptorRefs>configuration>plugin>plugins>pluginManagement>build><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>3.8.1version><scope>testscope>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.8.0-beta1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.8.0-beta1version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-compilerartifactId><version>2.11.1version>dependency><dependency><groupId>org.scala-lang.modulesgroupId><artifactId>scala-xml_2.11artifactId><version>1.0.2version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-scala_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-coreartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-runtime_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-wikiedits_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka-0.8_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-jdbcartifactId><version>${flink.version}version>dependency><dependency><groupId>mysqlgroupId><artifactId>mysql-connector-javaartifactId><version>5.1.39version>dependency>dependencies>
project>

配置文件及sql语句&#xff0c;时间窗口是1分钟&#xff1a;

public class JDBCTestBase {//每过一分钟计算一分钟内的num最大值&#xff0c;以rowtime作为时间基准public static final String SQL_MAX &#61; "SELECT MAX(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_AVG &#61; "SELECT AVG(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_MIN &#61; "SELECT MIN(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String kafka_group &#61; "test-consumer-group";public static final String kafka_zookper &#61; "localhost:2181";public static final String kafka_hosts &#61; "localhost:9092";public static final String kafka_topic &#61; "wiki-result";public static final String DRIVER_CLASS &#61; "com.mysql.jdbc.Driver";public static final String DB_URL &#61; "jdbc:mysql://localhost:3306/db?user&#61;user&password&#61;password";
}

 

MySQL建表&#xff1a;

CREATE TABLE wiki (Id int(11) NOT NULL AUTO_INCREMENT,avg double DEFAULT NULL,time timestamp NULL DEFAULT NULL,PRIMARY KEY (Id)
)

 

发送数据到kafka,用的是flink example的wikiproducer&#xff1a;

Monitoring the Wikipedia Edit Stream

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;public class WikipediaAnalysis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see &#61; StreamExecutionEnvironment.getExecutionEnvironment();DataStream edits &#61; see.addSource(new WikipediaEditsSource());KeyedStream keyedEdits &#61; edits.keyBy(new KeySelector() {&#64;Overridepublic String getKey(WikipediaEditEvent event) {return event.getUser();}});DataStream> result &#61; keyedEdits.timeWindow(Time.seconds(10)).fold(new Tuple3<>("", 0L,0L), new FoldFunction>() {&#64;Overridepublic Tuple3 fold(Tuple3 acc, WikipediaEditEvent event) {acc.f0 &#61; event.getUser().trim();acc.f1 &#43;&#61; event.getByteDiff();acc.f2 &#61; System.currentTimeMillis();return acc;}});result.map(new MapFunction, String>() {&#64;Overridepublic String map(Tuple3 tuple) {return tuple.toString();}}).addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));result.print();see.execute();}
}

重写RichSinkFunction&#xff0c;用于写入到mysql中&#xff1a;

 

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import kafka.common.Config;public class WikiSQLSink extends RichSinkFunction> {private static final long serialVersionUID &#61; 1L;private Connection connection;private PreparedStatement preparedStatement;String drivername &#61; JDBCTestBase.DRIVER_CLASS;String dburl &#61; JDBCTestBase.DB_URL;&#64;Overridepublic void invoke(Tuple3 value) throws Exception {Class.forName(drivername);connection &#61; DriverManager.getConnection(dburl);String sql &#61; "INSERT into wiki(name,avg,time) values(?,?,?)";preparedStatement &#61; connection.prepareStatement(sql);preparedStatement.setString(1, value.f0);preparedStatement.setLong(2, value.f1);preparedStatement.setLong(3, value.f2);//preparedStatement.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
preparedStatement.executeUpdate();if (preparedStatement !&#61; null) {preparedStatement.close();}if (connection !&#61; null) {connection.close();}}}

 

用Flink中流计算类&#xff0c;用的是EventTime&#xff0c;用sql语句对数据进行聚合&#xff0c;写入数据到mysql中去&#xff0c;sql的语法用的是是另一个开源框架Apache Cassandra&#xff1a;

图片说明&#xff1a;

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes

package com.xxr.flink;import java.sql.Timestamp;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.WindowedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;
//时间参数网址
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#event-time
//Concepts & Common API
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/common.html#register-a-table
//SQL语法
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html
public class stream_sql {public static void main(String[] args) throws Exception {Properties pro &#61; new Properties();pro.put("bootstrap.servers", JDBCTestBase.kafka_hosts);pro.put("zookeeper.connect", JDBCTestBase.kafka_zookper);pro.put("group.id", JDBCTestBase.kafka_group);StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv &#61; TableEnvironment.getTableEnvironment(env);// env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(5000);
DataStream sourceStream &#61; env.addSource(new FlinkKafkaConsumer08(JDBCTestBase.kafka_topic, new SimpleStringSchema(), pro));DataStream> sourceStreamTra &#61; sourceStream.filter(new FilterFunction() {&#64;Overridepublic boolean filter(String value) throws Exception {return StringUtils.isNotBlank(value);}}).map(new MapFunction>() {&#64;Overridepublic Tuple3 map(String value) throws Exception {String temp &#61; value.replaceAll("(\\(|\\))", "");String[] args &#61; temp.split(",");try {return new Tuple3(Long.valueOf(args[2]), args[0].trim(), Long.valueOf(args[1]));} catch (Exception e) {// TODO Auto-generated catch block
e.printStackTrace();return new Tuple3(System.currentTimeMillis(), args[0].trim(),0L);}}});//設置将哪个字段用于eventTimeDataStream> withTimestampsAndWatermarks &#61; sourceStreamTra.assignTimestampsAndWatermarks(new FirstTandW());//内置参数rowtime.rowtime就是EventTime protime是ProcessingTimetableEnv.registerDataStream("wiki_table", withTimestampsAndWatermarks, "etime,name, num,rowtime.rowtime");withTimestampsAndWatermarks.print();// define sink for room data and execute queryJDBCAppendTableSink sink &#61; JDBCAppendTableSink.builder().setDrivername(JDBCTestBase.DRIVER_CLASS).setDBUrl(JDBCTestBase.DB_URL).setQuery("INSERT INTO wiki (avg,time) VALUES (?,?)").setParameterTypes(Types.LONG, Types.SQL_TIMESTAMP).build();//执行查询Table result &#61; tableEnv.sqlQuery(JDBCTestBase.SQL_MIN);//写入csv
// result.writeToSink(new CsvTableSink("D:/a.csv", // output path
// "|", // optional: delimit files by &#39;|&#39;
// 1, // optional: write to a single file
// WriteMode.OVERWRITE)); // optional: override existing files//写入数据库
result.writeToSink(sink);env.execute();}
}

重写AssignerWithPeriodicWatermarks设置watermark&#xff0c;处理时间是EventTime的话必须要有这个方法&#xff0c;ProcessingTime 可忽略

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class FirstTandW implements AssignerWithPeriodicWatermarks> {private final long maxOutOfOrderness &#61; 3500; // 3.5 secondsprivate long currentMaxTimestamp;&#64;Overridepublic long extractTimestamp(Tuple3 element, long previousElementTimestamp) {// TODO Auto-generated method stublong timestamp &#61; element.f0; currentMaxTimestamp &#61; Math.max(timestamp, currentMaxTimestamp);return timestamp;}&#64;Overridepublic Watermark getCurrentWatermark() {// TODO Auto-generated method stubreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}

 maven assembly打包成jar&#xff0c;放flink运行就行了&#xff0c;不会打包看我博客

基础知识

Flink 的Window 操作

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

Flink文档写的很好。。刚开始做没仔细看&#xff0c;坑不少

git&#xff1a;https://github.com/xxrznj/flink-kafka-sql


转载于:https://www.cnblogs.com/34fj/p/8820094.html


推荐阅读
  • 在工作中,遇到需要将excel表中的特定数据提取出来,并将数据以键值对的形式存储到map集合中。因为我用的是maven管理的jar包,所 ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • 本文讨论了在shiro java配置中加入Shiro listener后启动失败的问题。作者引入了一系列jar包,并在web.xml中配置了相关内容,但启动后却无法正常运行。文章提供了具体引入的jar包和web.xml的配置内容,并指出可能的错误原因。该问题可能与jar包版本不兼容、web.xml配置错误等有关。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • Struts2+Sring+Hibernate简单配置
    2019独角兽企业重金招聘Python工程师标准Struts2SpringHibernate搭建全解!Struts2SpringHibernate是J2EE的最 ... [详细]
  • Question该提问来源于开源项目:react-native-device-info/react-native-device-info ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 开发笔记:MyBatis学习之逆向工程
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了MyBatis学习之逆向工程相关的知识,希望对你有一定的参考价值。转载:http://w ... [详细]
  • TableAPI报一下异常:FieldtypesofqueryresultandregisteredTableSink
    报错信息如下:Exceptioninthread“main”org.apache.flink.table.api.ValidationException:Fieldtypesofq ... [详细]
  • springboot基于redis配置session共享项目环境配置pom.xml引入依赖application.properties配置Cookie序列化(高版本不需要)测试启 ... [详细]
  • 前言本篇为大家总结社区多人合作常见的场景和对应的git操作命令。本篇非新手教程,阅读本篇前需具备Git基础知识。Git入门教程请参考https://www ... [详细]
author-avatar
怡伶心怡67
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有