网上没什么资料,就分享下:)
简单模式:kafka传数据到Flink存储到mysql 可以参考网站:
利用Flink stream从kafka中写数据到mysql
maven依赖情况:
<project xmlns&#61;"http://maven.apache.org/POM/4.0.0" xmlns:xsi&#61;"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&#61;"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0modelVersion><groupId>com.xxrgroupId><artifactId>flinkartifactId><version>0.0.1-SNAPSHOTversion><packaging>jarpackaging><properties><project.build.sourceEncoding>UTF-8project.build.sourceEncoding><flink.version>1.4.1flink.version>properties><build><pluginManagement><plugins>
<plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-compiler-pluginartifactId><configuration><source>1.8source><target>1.8target><encoding>UTF-8encoding><compilerArgs><arg>-extdirsarg><arg>${project.basedir}/src/libarg>compilerArgs>configuration>plugin><plugin><groupId>org.apache.maven.pluginsgroupId><artifactId>maven-assembly-pluginartifactId><version>2.5.5version><configuration><archive><manifest><mainClass>com.xxr.flink.stream_sqlmainClass>manifest>archive><descriptorRefs><descriptorRef>jar-with-dependenciesdescriptorRef>descriptorRefs>configuration>plugin>plugins>pluginManagement>build><dependencies><dependency><groupId>junitgroupId><artifactId>junitartifactId><version>3.8.1version><scope>testscope>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.8.0-beta1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.8.0-beta1version>dependency><dependency><groupId>log4jgroupId><artifactId>log4jartifactId><version>1.2.17version>dependency><dependency><groupId>org.scala-langgroupId><artifactId>scala-compilerartifactId><version>2.11.1version>dependency><dependency><groupId>org.scala-lang.modulesgroupId><artifactId>scala-xml_2.11artifactId><version>1.0.2version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-javaartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-scala_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-clients_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-coreartifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-runtime_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-wikiedits_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka-0.8_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table_2.11artifactId><version>${flink.version}version>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-jdbcartifactId><version>${flink.version}version>dependency><dependency><groupId>mysqlgroupId><artifactId>mysql-connector-javaartifactId><version>5.1.39version>dependency>dependencies>
project>
配置文件及sql语句&#xff0c;时间窗口是1分钟&#xff1a;
public class JDBCTestBase {//每过一分钟计算一分钟内的num最大值&#xff0c;以rowtime作为时间基准public static final String SQL_MAX &#61; "SELECT MAX(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_AVG &#61; "SELECT AVG(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String SQL_MIN &#61; "SELECT MIN(num) ,TUMBLE_END(rowtime, INTERVAL &#39;1&#39; minute) as wEnd FROM wiki_table group by TUMBLE(rowtime, interval &#39;1&#39; minute)";public static final String kafka_group &#61; "test-consumer-group";public static final String kafka_zookper &#61; "localhost:2181";public static final String kafka_hosts &#61; "localhost:9092";public static final String kafka_topic &#61; "wiki-result";public static final String DRIVER_CLASS &#61; "com.mysql.jdbc.Driver";public static final String DB_URL &#61; "jdbc:mysql://localhost:3306/db?user&#61;user&password&#61;password";
}
MySQL建表&#xff1a;
CREATE TABLE wiki (Id int(11) NOT NULL AUTO_INCREMENT,avg double DEFAULT NULL,time timestamp NULL DEFAULT NULL,PRIMARY KEY (Id)
)
发送数据到kafka,用的是flink example的wikiproducer&#xff1a;
Monitoring the Wikipedia Edit Stream import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;public class WikipediaAnalysis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see &#61; StreamExecutionEnvironment.getExecutionEnvironment();DataStream edits &#61; see.addSource(new WikipediaEditsSource());KeyedStream keyedEdits &#61; edits.keyBy(new KeySelector() {&#64;Overridepublic String getKey(WikipediaEditEvent event) {return event.getUser();}});DataStream> result &#61; keyedEdits.timeWindow(Time.seconds(10)).fold(new Tuple3<>("", 0L,0L), new FoldFunction>() {&#64;Overridepublic Tuple3 fold(Tuple3 acc, WikipediaEditEvent event) {acc.f0 &#61; event.getUser().trim();acc.f1 &#43;&#61; event.getByteDiff();acc.f2 &#61; System.currentTimeMillis();return acc;}});result.map(new MapFunction, String>() {&#64;Overridepublic String map(Tuple3 tuple) {return tuple.toString();}}).addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));result.print();see.execute();}
}
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;public class WikipediaAnalysis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see &#61; StreamExecutionEnvironment.getExecutionEnvironment();DataStream
}
重写RichSinkFunction&#xff0c;用于写入到mysql中&#xff1a;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import kafka.common.Config;public class WikiSQLSink extends RichSinkFunction
preparedStatement.executeUpdate();if (preparedStatement !&#61; null) {preparedStatement.close();}if (connection !&#61; null) {connection.close();}}}
用Flink中流计算类&#xff0c;用的是EventTime&#xff0c;用sql语句对数据进行聚合&#xff0c;写入数据到mysql中去&#xff0c;sql的语法用的是是另一个开源框架Apache Cassandra&#xff1a;
图片说明&#xff1a;
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes
package com.xxr.flink;import java.sql.Timestamp;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.TimeUnit;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.WindowedTable;
import org.apache.flink.table.api.java.StreamTableEnvironment;
//时间参数网址
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#event-time
//Concepts & Common API
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/common.html#register-a-table
//SQL语法
//https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html
public class stream_sql {public static void main(String[] args) throws Exception {Properties pro &#61; new Properties();pro.put("bootstrap.servers", JDBCTestBase.kafka_hosts);pro.put("zookeeper.connect", JDBCTestBase.kafka_zookper);pro.put("group.id", JDBCTestBase.kafka_group);StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv &#61; TableEnvironment.getTableEnvironment(env);// env.getConfig().disableSysoutLogging(); //设置此可以屏蔽掉日记打印情况env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.enableCheckpointing(5000);
DataStream
e.printStackTrace();return new Tuple3
// result.writeToSink(new CsvTableSink("D:/a.csv", // output path
// "|", // optional: delimit files by &#39;|&#39;
// 1, // optional: write to a single file
// WriteMode.OVERWRITE)); // optional: override existing files//写入数据库
result.writeToSink(sink);env.execute();}
}
重写AssignerWithPeriodicWatermarks设置watermark&#xff0c;处理时间是EventTime的话必须要有这个方法&#xff0c;ProcessingTime 可忽略
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;public class FirstTandW implements AssignerWithPeriodicWatermarks
maven assembly打包成jar&#xff0c;放flink运行就行了&#xff0c;不会打包看我博客
基础知识
Flink 的Window 操作 Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime
Flink文档写的很好。。刚开始做没仔细看&#xff0c;坑不少
git&#xff1a;https://github.com/xxrznj/flink-kafka-sql