作者:SCY瑶_450 | 来源:互联网 | 2023-09-18 18:01
1.简单介绍目的Flink能够提供其应用内部范围的端到端的exeatly-onece语义数据流转,其本身提供的KafkaSink连接器便继承了TwoPhaseCom
1. 简单介绍目的
Flink能够提供其应用内部范围的端到端的exeatly-onece语义数据流转,其本身提供的Kafka Sink连接器便继承了TwoPhaseCommitSinkFunction抽象类,使用两阶段提交方式(需事务操作支持)结合Flink的checkpoint保证端到端一致性。而Mysql数据库支持事务,那我们为了实现数据写入Mysql时也保证程序的端到端一致性,是否可以继承该抽象类?
2. 主程序
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class StreamDemoKafka2Mysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);env.getCheckpointConfig().setCheckpointTimeout(1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties props &#61; new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.191.128:9091,192.168.191.128:9092,192.168.191.128:9093");props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>("flink-consumer-group1", new JSONKeyValueDeserializationSchema(true), props);
kafkaConsumer011.setCommitOffsetsOnCheckpoints(true);kafkaConsumer011.setStartFromGroupOffsets();System.out.println("kafkaConsumer011:" &#43; kafkaConsumer011);DataStreamSource<ObjectNode> streamSource &#61; env.addSource(kafkaConsumer011).setParallelism(1);streamSource.print().setParallelism(1);streamSource.print("------------>:");streamSource.addSink(new MySqlTwoPhaseNewCommitSink2()).name("MySqlTwoPhaseCommitSink2").setParallelism(1);env.execute(StreamDemoKafka2Mysql.class.getName());}
}
3. 自定义Sink
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class StreamDemoKafka2Mysql {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);env.getCheckpointConfig().setCheckpointTimeout(1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties props &#61; new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.191.128:9091,192.168.191.128:9092,192.168.191.128:9093");props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 &#61; new FlinkKafkaConsumer011<>("flink-consumer-group1", new JSONKeyValueDeserializationSchema(true), props);
kafkaConsumer011.setCommitOffsetsOnCheckpoints(true);kafkaConsumer011.setStartFromGroupOffsets();System.out.println("kafkaConsumer011:" &#43; kafkaConsumer011);DataStreamSource<ObjectNode> streamSource &#61; env.addSource(kafkaConsumer011).setParallelism(1);streamSource.print().setParallelism(1);streamSource.print("------------>:");streamSource.addSink(new MySqlTwoPhaseNewCommitSink2()).name("MySqlTwoPhaseCommitSink2").setParallelism(1);env.execute(StreamDemoKafka2Mysql.class.getName());}
}
4. 使用的工具类
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;public class DBConnectUtil {public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn &#61; null;try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}conn &#61; DriverManager.getConnection(url, user, password);conn.setAutoCommit(false);return conn;}public static void commit(Connection conn) {if (conn !&#61; null) {try {conn.commit();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}public static void rollback(Connection conn) {if (conn !&#61; null) {try {conn.rollback();} catch (SQLException e) {e.printStackTrace();} finally {close(conn);}}}public static void close(Connection conn) {if (conn !&#61; null) {try {conn.close();} catch (Exception e) {e.printStackTrace();}}}
}
5. 结果
开始消费后&#xff0c;一直执行invoke直到所有数据消费完&#xff0c;然后是
start preCommit…和start beginTransaction…循环执行&#xff0c;在网上找了很多例子&#xff0c;大致都是这样写的&#xff0c;但是始终不能成功。
6. 参考文章&#xff1a;
参考文章1
参考文章2
参考文章3(可看其参考文章)
参考文章4