热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flink两阶段提交sink探索(Mysql)

1.简单介绍目的Flink能够提供其应用内部范围的端到端的exeatly-onece语义数据流转,其本身提供的KafkaSink连接器便继承了TwoPhaseCom

1. 简单介绍目的

Flink能够提供其应用内部范围的端到端的exeatly-onece语义数据流转,其本身提供的Kafka Sink连接器便继承了TwoPhaseCommitSinkFunction抽象类,使用两阶段提交方式(需事务操作支持)结合Flink的checkpoint保证端到端一致性。而Mysql数据库支持事务,那我们为了实现数据写入Mysql时也保证程序的端到端一致性,是否可以继承该抽象类?


2. 主程序

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class StreamDemoKafka2Mysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度&#xff0c;为了方便测试&#xff0c;查看消息的顺序&#xff0c;这里设置为1&#xff0c;可以更改为多并行度env.setParallelism(1);// checkpoint设置// 每隔1s进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 设置模式为&#xff1a;exactly_one&#xff0c;仅一次语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有1s的时间间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// 检查点必须在1s之内完成&#xff0c;或者被丢弃【checkpoint超时时间】env.getCheckpointConfig().setCheckpointTimeout(1000);// 同一时间只允许进行一次检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//表示一旦Flink程序被cancel后&#xff0c;会保留checkpoint数据&#xff0c;以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,将检查点保存在hdfs上面&#xff0c;默认保存在内存中。这里先保存到本地// env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));// 设置kafka消费参数Properties props &#61; new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.191.128:9091,192.168.191.128:9092,192.168.191.128:9093");props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");/*SimpleStringSchema可以获取到kafka消息&#xff0c;JSONKeyValueDeserializationSchema可以获取都消息的key,value&#xff0c;metadata:topic,partition&#xff0c;offset等信息*///FlinkKafkaConsumer011 kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>("flink-consumer-group1", new JSONKeyValueDeserializationSchema(true), props);
// kafkaConsumer011.setStartFromLatest(); // 由于设置该选项导致一直从最新offset开始消费kafkaConsumer011.setCommitOffsetsOnCheckpoints(true);kafkaConsumer011.setStartFromGroupOffsets();System.out.println("kafkaConsumer011:" &#43; kafkaConsumer011);//加入kafka数据源DataStreamSource<ObjectNode> streamSource &#61; env.addSource(kafkaConsumer011).setParallelism(1);streamSource.print().setParallelism(1);streamSource.print("------------>:");//数据传输到下游streamSource.addSink(new MySqlTwoPhaseNewCommitSink2()).name("MySqlTwoPhaseCommitSink2").setParallelism(1);//触发执行env.execute(StreamDemoKafka2Mysql.class.getName());}
}

3. 自定义Sink

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class StreamDemoKafka2Mysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度&#xff0c;为了方便测试&#xff0c;查看消息的顺序&#xff0c;这里设置为1&#xff0c;可以更改为多并行度env.setParallelism(1);// checkpoint设置// 每隔1s进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 设置模式为&#xff1a;exactly_one&#xff0c;仅一次语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有1s的时间间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// 检查点必须在1s之内完成&#xff0c;或者被丢弃【checkpoint超时时间】env.getCheckpointConfig().setCheckpointTimeout(1000);// 同一时间只允许进行一次检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//表示一旦Flink程序被cancel后&#xff0c;会保留checkpoint数据&#xff0c;以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,将检查点保存在hdfs上面&#xff0c;默认保存在内存中。这里先保存到本地// env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));// 设置kafka消费参数Properties props &#61; new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.191.128:9091,192.168.191.128:9092,192.168.191.128:9093");props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");/*SimpleStringSchema可以获取到kafka消息&#xff0c;JSONKeyValueDeserializationSchema可以获取都消息的key,value&#xff0c;metadata:topic,partition&#xff0c;offset等信息*///FlinkKafkaConsumer011 kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>("flink-consumer-group1", new JSONKeyValueDeserializationSchema(true), props);
// kafkaConsumer011.setStartFromLatest(); // 由于设置该选项导致一直从最新offset开始消费kafkaConsumer011.setCommitOffsetsOnCheckpoints(true);kafkaConsumer011.setStartFromGroupOffsets();System.out.println("kafkaConsumer011:" &#43; kafkaConsumer011);//加入kafka数据源DataStreamSource<ObjectNode> streamSource &#61; env.addSource(kafkaConsumer011).setParallelism(1);streamSource.print().setParallelism(1);streamSource.print("------------>:");//数据传输到下游streamSource.addSink(new MySqlTwoPhaseNewCommitSink2()).name("MySqlTwoPhaseCommitSink2").setParallelism(1);//触发执行env.execute(StreamDemoKafka2Mysql.class.getName());}
}

4. 使用的工具类

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;public class DBConnectUtil {public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn &#61; null;try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}conn &#61; DriverManager.getConnection(url, user, password);//设置手动提交conn.setAutoCommit(false);return conn;}public static void commit(Connection conn) {if (conn !&#61; null) {try {conn.commit();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}public static void rollback(Connection conn) {if (conn !&#61; null) {try {conn.rollback();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}public static void close(Connection conn) {if (conn !&#61; null) {try {conn.close();} catch (Exception e) {e.printStackTrace();}}}
}

5. 结果

开始消费后&#xff0c;一直执行invoke直到所有数据消费完&#xff0c;然后是
start preCommit…和start beginTransaction…循环执行&#xff0c;在网上找了很多例子&#xff0c;大致都是这样写的&#xff0c;但是始终不能成功。


6. 参考文章&#xff1a;

参考文章1
参考文章2
参考文章3(可看其参考文章)
参考文章4


推荐阅读
author-avatar
SCY瑶_450
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有