Flink 读取 Kafka 数据批量写入到 MySQL准备 你需要将这两个依赖添加到 pom.xml 中
<dependency > <groupId > mysqlgroupId > <artifactId > mysql-connector-javaartifactId > <version > 5.1.34version > dependency >
读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。
package com.zhisheng.connectors.mysql.utils;import com.zhisheng.common.utils.GsonUtil;import com.zhisheng.connectors.mysql.model.Student;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * Desc: 往kafka中写数据,可以使用这个main函数进行测试 * Created by zhisheng on 2019-02-17 * Blog: http://www.54tianzhisheng.cn/tags/Flink/ */ public class KafkaUtil { public static final String broker_list = "localhost:9092" ; public static final String topic = "student" ; //kafka topic 需要和 flink 程序用同一个 topic public static void writeToKafka () throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers" , broker_list); props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); KafkaProducer producer = new KafkaProducer(props); for (int i = 1 ; i <= 100 ; i++) { Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i); ProducerRecord record = new ProducerRecord(topic, null , null , GsonUtil.toJson(student)); producer.send(record); System.out.println("发送数据: " + GsonUtil.toJson(student)); Thread.sleep(10 * 1000 ); //发送一条数据 sleep 10s,相当于 1 分钟 6 条 } producer.flush(); } public static void main (String[] args) throws InterruptedException { writeToKafka(); } }
从 kafka 中读取数据,然后序列化成 student 对象。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("zookeeper.connect" , "localhost:2181" ); props.put("group.id" , "metric-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("auto.offset.reset" , "latest" ); SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>( "student" , //这个 kafka topic 需要和上面的工具类的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1 ) .map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 对象
因为 RichSinkFunction 中如果 sink 一条数据到 mysql 中就会调用 invoke 方法一次,所以如果要实现批量写的话,我们最好在 sink 之前就把数据聚合一下。那这里我们开个一分钟的窗口去聚合 Student 数据。
student.timeWindowAll(Time.minutes(1 )).apply(new AllWindowFunction, TimeWindow>() { @Override public void apply (TimeWindow window, Iterable values, Collector> out)
throws Exception { ArrayList students = Lists.newArrayList(values); if (students.size() > 0 ) { System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size()); out.collect(students); } } });
写入数据库 这里使用 DBCP 连接池连接数据库 mysql,pom.xml 中添加依赖:
<dependency > <groupId > org.apache.commonsgroupId > <artifactId > commons-dbcp2artifactId > <version > 2.1.1version > dependency >
如果你想使用其他的数据库连接池请加入对应的依赖。
这里将数据写入到 MySQL 中,依旧是和之前文章一样继承 RichSinkFunction 类,重写里面的方法:
package com.zhisheng.connectors.mysql.sinks;import com.zhisheng.connectors.mysql.model.Student;import org.apache.commons.dbcp2.BasicDataSource;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import javax.sql.DataSource;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.util.List;/** * Desc: 数据批量 sink 数据到 mysql * Created by zhisheng_tian on 2019-02-17 * Blog: http://www.54tianzhisheng.cn/tags/Flink/ */ public class SinkToMySQL extends RichSinkFunction <List <Student >> { PreparedStatement ps; BasicDataSource dataSource; private Connection connection; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 * * @param parameters * @throws Exception */ @Override public void open (Configuration parameters) throws Exception { super .open(parameters); dataSource = new BasicDataSource(); cOnnection= getConnection(dataSource); String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);" ; ps = this .connection.prepareStatement(sql); } @Override public void close () throws Exception { super .close(); //关闭连接和释放资源 if (connection != null ) { connection.close(); } if (ps != null ) { ps.close(); } } /** * 每条数据的插入都要调用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke (List value, Context context) throws Exception { //遍历数据集合 for (Student student : value) { ps.setInt(1 , student.getId()); ps.setString(2 , student.getName()); ps.setString(3 , student.getPassword()); ps.setInt(4 , student.getAge()); ps.addBatch(); } int [] count = ps.executeBatch();//批量后执行 System.out.println("成功了插入了" + count.length + "行数据" ); } private static Connection getConnection (BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver" ); //注意,替换成自己本地的 mysql 数据库地址和用户名、密码 dataSource.setUrl("jdbc:mysql://localhost:3306/test" ); dataSource.setUsername("root" ); dataSource.setPassword("root123456" ); //设置连接池的一些参数 dataSource.setInitialSize(10 ); dataSource.setMaxTotal(50 ); dataSource.setMinIdle(2 ); Connection con = null ; try { con = dataSource.getConnection(); System.out.println("创建连接池:" + con); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; } }
核心类 Main 核心程序如下:
public class Main { public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("zookeeper.connect" , "localhost:2181" ); props.put("group.id" , "metric-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("auto.offset.reset" , "latest" ); SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>( "student" , //这个 kafka topic 需要和上面的工具类的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1 ) .map(string -> GsonUtil.fromJson(string, Student.class)); // student.timeWindowAll(Time.minutes(1 )).apply(new AllWindowFunction, TimeWindow>() { @Override public void apply (TimeWindow window, Iterable values, Collector> out)
throws Exception { ArrayList students = Lists.newArrayList(values); if (students.size() > 0 ) { System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size()); out.collect(students); } } }).addSink(new SinkToMySQL()); env.execute("flink learning connectors kafka" ); } }
运行项目 运行 Main 类后再运行 KafkaUtils.java 类!
下图是往 Kafka 中发送的数据:
下图是运行 Main 类的日志,会创建 4 个连接池是因为默认的 4 个并行度,你如果在 addSink 这个算子设置并行度为 1 的话就会创建一个连接池:
下图是批量插入数据库的结果:
总结 本文从知识星球一位朋友的疑问来写的,应该都满足了他的条件(批量/数据库连接池/写入mysql),的确网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力
Flink读取Kafka 数据写到 RabbitMQ
前提准备 安装 RabbitMQ 这里我直接用 docker 命令安装吧,先把 docker 在 mac 上启动起来。
在命令行中执行下面的命令:
1
docker run -d -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management
对这个命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/
登录用户名和密码分别是:admin admin ,登录进去是这个样子就代表安装成功了:
依赖 pom.xml 中添加 Flink connector rabbitmq 的依赖如下:
<dependency > <groupId > org.apache.flinkgroupId > <artifactId > flink-connector-rabbitmq_${scala.binary.version}artifactId > <version > ${flink.version}version > dependency >
生产者 这里我们依旧自己写一个工具类一直的往 RabbitMQ 中的某个 queue 中发数据,然后由 Flink 去消费这些数据。
注意按照我的步骤来一步步操作,否则可能会出现一些错误!
RabbitMQProducerUtil.java
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducerUtil { public final static String QUEUE_NAME = "zhisheng" ; public static void main (String[] args) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("localhost" ); factory.setUsername("admin" ); factory.setPassword("admin" ); factory.setPort(5672 ); //创建一个新的连接 Connection cOnnection= factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); // 声明一个队列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发送消息到队列中 String message = "Hello zhisheng" ; //我们这里演示发送一千条数据 for (int i = 0 ; i <1000 ; i++) { channel.basicPublish("" , QUEUE_NAME, null , (message + i).getBytes("UTF-8" )); System.out.println("Producer Send +'" + message + i); } //关闭通道和连接 channel.close(); connection.close(); } }
Flink 主程序
import com.zhisheng.common.utils.ExecutionEnvUtil;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * 从 rabbitmq 读取数据 */ public class Main { public static void main (String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL; //这些配置建议可以放在配置文件中,然后通过 parameterTool 来获取对应的参数值 final RMQConnectionConfig cOnnectionConfig= new RMQConnectionConfig .Builder().setHost("localhost" ).setVirtualHost("/" ) .setPort(5672 ).setUserName("admin" ).setPassword("admin" ) .build(); DataStreamSource zhisheng = env.addSource(new RMQSource<>(connectionConfig, "zhisheng" , true , new SimpleStringSchema())) .setParallelism(1 ); zhisheng.print(); //如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启 // env.enableCheckpointing(10000); env.execute("flink learning connectors rabbitmq" ); } }
运行 RabbitMQProducerUtil 类,再运行 Main 类!
注意 ⚠️:
1、RMQConnectionConfig 中设置的用户名和密码要设置成 admin/admin,如果你换成是 guest/guest,其实是在 RabbitMQ 里面是没有这个用户名和密码的,所以就会报这个错误:
nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.
不出意外的话应该你运行 RabbitMQProducerUtil 类后,立马两个运行的结果都会出来,速度还是很快的。
2、如果你在 RabbitMQProducerUtil 工具类中把注释的那行代码打开的话:
// 声明一个队列 // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
就会出现这种错误:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
这是因为你打开那个注释的话,一旦你运行了该类就会创建一个叫做 zhisheng
的 Queue,当你再运行 Main 类中的时候,它又会创建这样一个叫 zhisheng
的 Queue,然后因为已经有同名的 Queue 了,所以就有了冲突,解决方法就是把那行代码注释就好了。
3、该 connector(连接器)中提供了 RMQSource 类去消费 RabbitMQ queue 中的消息和确认 checkpoints 上的消息,它提供了三种不一样的保证:
Exactly-once(只消费一次): 前提条件有,1 是要开启 checkpoint,因为只有在 checkpoint 完成后,才会返回确认消息给 RabbitMQ(这时,消息才会在 RabbitMQ 队列中删除);2 是要使用 Correlation ID,在将消息发往 RabbitMQ 时,必须在消息属性中设置 Correlation ID。数据源根据 Correlation ID 把从 checkpoint 恢复的数据进行去重;3 是数据源不能并行,这种限制主要是由于 RabbitMQ 将消息从单个队列分派给多个消费者。
At-least-once(至少消费一次): 开启了 checkpoint,但未使用相 Correlation ID 或 数据源是并行的时候,那么就只能保证数据至少消费一次了
No guarantees(无法保证): Flink 接收到数据就返回确认消息给 RabbitMQ
Sink 数据到 RabbitMQ RabbitMQ 除了可以作为数据源,也可以当作下游,Flink 消费数据做了一些处理之后也能把数据发往 RabbitMQ,下面演示下 Flink 消费 Kafka 数据后写入到 RabbitMQ。
public class Main1 { public static void main (String[] args) throws Exception { final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args); StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); DataStreamSource data = KafkaConfigUtil.buildSource(env); final RMQConnectionConfig cOnnectionConfig= new RMQConnectionConfig .Builder().setHost("localhost" ).setVirtualHost("/" ) .setPort(5672 ).setUserName("admin" ).setPassword("admin" ) .build(); //注意,换一个新的 queue,否则也会报错 data.addSink(new RMQSink<>(connectionConfig, "zhisheng001" , new MetricSchema())); env.execute("flink learning connectors rabbitmq" ); } }
是不是很简单?但是需要注意的是,要换一个之前不存在的 queue,否则是会报错的。
不出意外的话,你可以看到 RabbitMQ 的监控页面会出现新的一个 queue 出来,如下图:
总结 本文先把 RabbitMQ 作为数据源,写了个 Flink 消费 RabbitMQ 队列里面的数据进行打印出来,然后又写了个 Flink 消费 Kafka 数据后写入到 RabbitMQ 的例子!