热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink入门(十三到十四)

Flink读取Kafka数据批量写入到MySQL准备你需要将这两个依赖添加到pom.xm

Flink 读取 Kafka 数据批量写入到 MySQL

准备

你需要将这两个依赖添加到 pom.xml 中


<dependency>
   <groupId>mysqlgroupId>
   <artifactId>mysql-connector-javaartifactId>
   <version>5.1.34version>
dependency>

读取 kafka 数据

这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。


package com.zhisheng.connectors.mysql.utils;

import com.zhisheng.common.utils.GsonUtil;
import com.zhisheng.connectors.mysql.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
* Desc: 往kafka中写数据,可以使用这个main函数进行测试
* Created by zhisheng on 2019-02-17
* Blog: http://www.54tianzhisheng.cn/tags/Flink/
*/
public class KafkaUtil {
   public static final String broker_list = "localhost:9092";
   public static final String topic = "student";  //kafka topic 需要和 flink 程序用同一个 topic

   public static void writeToKafka() throws InterruptedException {
       Properties props = new Properties();
       props.put("bootstrap.servers", broker_list);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       KafkaProducer producer = new KafkaProducer(props);

       for (int i = 1; i <= 100; i++) {
           Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
           ProducerRecord record = new ProducerRecord(topic, null, null, GsonUtil.toJson(student));
           producer.send(record);
           System.out.println("发送数据: " + GsonUtil.toJson(student));
           Thread.sleep(10 * 1000); //发送一条数据 sleep 10s,相当于 1 分钟 6 条
       }
       producer.flush();
   }

   public static void main(String[] args) throws InterruptedException {
       writeToKafka();
   }
}

从 kafka 中读取数据,然后序列化成 student 对象。


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
       "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
       new SimpleStringSchema(),
       props)).setParallelism(1)
       .map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 对象

因为 RichSinkFunction 中如果 sink 一条数据到 mysql 中就会调用 invoke 方法一次,所以如果要实现批量写的话,我们最好在 sink 之前就把数据聚合一下。那这里我们开个一分钟的窗口去聚合 Student 数据。


student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
@Override
   public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
       ArrayList students = Lists.newArrayList(values);
       if (students.size() > 0) {
           System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
           out.collect(students);
       }
   }
});

写入数据库

这里使用 DBCP 连接池连接数据库 mysql,pom.xml 中添加依赖:


<dependency>
   <groupId>org.apache.commonsgroupId>
   <artifactId>commons-dbcp2artifactId>
   <version>2.1.1version>
dependency>

如果你想使用其他的数据库连接池请加入对应的依赖。

这里将数据写入到 MySQL 中,依旧是和之前文章一样继承 RichSinkFunction 类,重写里面的方法:


package com.zhisheng.connectors.mysql.sinks;

import com.zhisheng.connectors.mysql.model.Student;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

/**
* Desc: 数据批量 sink 数据到 mysql
* Created by zhisheng_tian on 2019-02-17
* Blog: http://www.54tianzhisheng.cn/tags/Flink/
*/
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
   PreparedStatement ps;
   BasicDataSource dataSource;
   private Connection connection;

/**
    * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
    *
    * @param parameters
    * @throws Exception
    */
@Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       dataSource = new BasicDataSource();
       cOnnection= getConnection(dataSource);
       String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
       ps = this.connection.prepareStatement(sql);
   }

@Override
   public void close() throws Exception {
       super.close();
//关闭连接和释放资源
       if (connection != null) {
           connection.close();
       }
       if (ps != null) {
           ps.close();
       }
   }

/**
    * 每条数据的插入都要调用一次 invoke() 方法
    *
    * @param value
    * @param context
    * @throws Exception
    */
@Override
   public void invoke(List value, Context context) throws Exception {
//遍历数据集合
       for (Student student : value) {
           ps.setInt(1, student.getId());
           ps.setString(2, student.getName());
           ps.setString(3, student.getPassword());
           ps.setInt(4, student.getAge());
           ps.addBatch();
       }
       int[] count = ps.executeBatch();//批量后执行
       System.out.println("成功了插入了" + count.length + "行数据");
   }


   private static Connection getConnection(BasicDataSource dataSource) {
       dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意,替换成自己本地的 mysql 数据库地址和用户名、密码
       dataSource.setUrl("jdbc:mysql://localhost:3306/test");
       dataSource.setUsername("root");
       dataSource.setPassword("root123456");
//设置连接池的一些参数
       dataSource.setInitialSize(10);
       dataSource.setMaxTotal(50);
       dataSource.setMinIdle(2);

       Connection con = null;
       try {
           con = dataSource.getConnection();
           System.out.println("创建连接池:" + con);
       } catch (Exception e) {
           System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
       }
       return con;
   }
}

核心类 Main

核心程序如下:


public class Main {
   public static void main(String[] args) throws Exception{
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("zookeeper.connect", "localhost:2181");
       props.put("group.id", "metric-group");
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
       props.put("auto.offset.reset", "latest");

       SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer011<>(
               "student",   //这个 kafka topic 需要和上面的工具类的 topic 一致
               new SimpleStringSchema(),
               props)).setParallelism(1)
               .map(string -> GsonUtil.fromJson(string, Student.class)); //
       student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
@Override
           public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
               ArrayList students = Lists.newArrayList(values);
               if (students.size() > 0) {
                   System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
                   out.collect(students);
               }
           }
       }).addSink(new SinkToMySQL());

       env.execute("flink learning connectors kafka");
   }
}

运行项目

运行 Main 类后再运行 KafkaUtils.java 类!

下图是往 Kafka 中发送的数据:

下图是运行 Main 类的日志,会创建 4 个连接池是因为默认的 4 个并行度,你如果在 addSink 这个算子设置并行度为 1 的话就会创建一个连接池:

下图是批量插入数据库的结果:

总结

本文从知识星球一位朋友的疑问来写的,应该都满足了他的条件(批量/数据库连接池/写入mysql),的确网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力


Flink读取Kafka 数据写到 RabbitMQ


前提准备

安装 RabbitMQ

这里我直接用 docker 命令安装吧,先把 docker 在 mac 上启动起来。

在命令行中执行下面的命令:

1

docker run -d  -p 15672:15672  -p  5672:5672  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin --name rabbitmq rabbitmq:3-management

对这个命令不懂的童鞋可以看看我以前的文章:http://www.54tianzhisheng.cn/2018/01/26/SpringBoot-RabbitMQ/

登录用户名和密码分别是:admin admin ,登录进去是这个样子就代表安装成功了:

依赖

pom.xml 中添加 Flink connector rabbitmq 的依赖如下:


<dependency>
   <groupId>org.apache.flinkgroupId>
   <artifactId>flink-connector-rabbitmq_${scala.binary.version}artifactId>
   <version>${flink.version}version>
dependency>

生产者

这里我们依旧自己写一个工具类一直的往 RabbitMQ 中的某个 queue 中发数据,然后由 Flink 去消费这些数据。

注意按照我的步骤来一步步操作,否则可能会出现一些错误!

RabbitMQProducerUtil.java


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQProducerUtil {
   public final static String QUEUE_NAME = "zhisheng";

   public static void main(String[] args) throws Exception {
//创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ相关信息
       factory.setHost("localhost");
       factory.setUsername("admin");
       factory.setPassword("admin");
       factory.setPort(5672);

//创建一个新的连接
       Connection cOnnection= factory.newConnection();

//创建一个通道
       Channel channel = connection.createChannel();

// 声明一个队列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//发送消息到队列中
       String message = "Hello zhisheng";

//我们这里演示发送一千条数据
       for (int i = 0; i <1000; i++) {
           channel.basicPublish("", QUEUE_NAME, null, (message + i).getBytes("UTF-8"));
           System.out.println("Producer Send +'" + message + i);
       }

//关闭通道和连接
       channel.close();
       connection.close();
   }
}

Flink 主程序


import com.zhisheng.common.utils.ExecutionEnvUtil;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
* 从 rabbitmq 读取数据
*/
public class Main {
   public static void main(String[] args) throws Exception {
       final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

//这些配置建议可以放在配置文件中,然后通过 parameterTool 来获取对应的参数值
       final RMQConnectionConfig cOnnectionConfig= new RMQConnectionConfig
               .Builder().setHost("localhost").setVirtualHost("/")
               .setPort(5672).setUserName("admin").setPassword("admin")
               .build();

       DataStreamSource zhisheng = env.addSource(new RMQSource<>(connectionConfig,
               "zhisheng",
               true,
               new SimpleStringSchema()))
               .setParallelism(1);
       zhisheng.print();

//如果想保证 exactly-once 或 at-least-once 需要把 checkpoint 开启
//        env.enableCheckpointing(10000);
       env.execute("flink learning connectors rabbitmq");
   }
}

运行 RabbitMQProducerUtil 类,再运行 Main 类!

注意⚠️:

1、RMQConnectionConfig 中设置的用户名和密码要设置成 admin/admin,如果你换成是 guest/guest,其实是在 RabbitMQ 里面是没有这个用户名和密码的,所以就会报这个错误:


nested exception is com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

不出意外的话应该你运行 RabbitMQProducerUtil 类后,立马两个运行的结果都会出来,速度还是很快的。

2、如果你在 RabbitMQProducerUtil 工具类中把注释的那行代码打开的话:


// 声明一个队列
//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

就会出现这种错误:


Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'zhisheng' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)

这是因为你打开那个注释的话,一旦你运行了该类就会创建一个叫做 zhisheng
 的 Queue,当你再运行 Main 类中的时候,它又会创建这样一个叫 zhisheng
 的 Queue,然后因为已经有同名的 Queue 了,所以就有了冲突,解决方法就是把那行代码注释就好了。

3、该 connector(连接器)中提供了 RMQSource 类去消费 RabbitMQ queue 中的消息和确认 checkpoints 上的消息,它提供了三种不一样的保证:


  • Exactly-once(只消费一次): 前提条件有,1 是要开启 checkpoint,因为只有在 checkpoint 完成后,才会返回确认消息给 RabbitMQ(这时,消息才会在 RabbitMQ 队列中删除);2 是要使用 Correlation ID,在将消息发往 RabbitMQ 时,必须在消息属性中设置 Correlation ID。数据源根据 Correlation ID 把从 checkpoint 恢复的数据进行去重;3 是数据源不能并行,这种限制主要是由于 RabbitMQ 将消息从单个队列分派给多个消费者。


  • At-least-once(至少消费一次): 开启了 checkpoint,但未使用相 Correlation ID 或 数据源是并行的时候,那么就只能保证数据至少消费一次了


  • No guarantees(无法保证): Flink 接收到数据就返回确认消息给 RabbitMQ


Sink 数据到 RabbitMQ

RabbitMQ 除了可以作为数据源,也可以当作下游,Flink 消费数据做了一些处理之后也能把数据发往 RabbitMQ,下面演示下 Flink 消费 Kafka 数据后写入到 RabbitMQ。


public class Main1 {
   public static void main(String[] args) throws Exception {
       final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
       StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
       DataStreamSource data = KafkaConfigUtil.buildSource(env);

       final RMQConnectionConfig cOnnectionConfig= new RMQConnectionConfig
               .Builder().setHost("localhost").setVirtualHost("/")
               .setPort(5672).setUserName("admin").setPassword("admin")
               .build();

//注意,换一个新的 queue,否则也会报错
       data.addSink(new RMQSink<>(connectionConfig, "zhisheng001", new MetricSchema()));
       env.execute("flink learning connectors rabbitmq");
   }
}

是不是很简单?但是需要注意的是,要换一个之前不存在的 queue,否则是会报错的。

不出意外的话,你可以看到 RabbitMQ 的监控页面会出现新的一个 queue 出来,如下图:

总结

本文先把 RabbitMQ 作为数据源,写了个 Flink 消费 RabbitMQ 队列里面的数据进行打印出来,然后又写了个 Flink 消费 Kafka 数据后写入到 RabbitMQ 的例子!





推荐阅读
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 代理模式的详细介绍及应用场景
    代理模式是一种在软件开发中常用的设计模式,通过在客户端和目标对象之间增加一层中间层,让代理对象代替目标对象进行访问,从而简化系统的复杂性。代理模式可以根据不同的使用目的分为远程代理、虚拟代理、Copy-on-Write代理、保护代理、防火墙代理、智能引用代理和Cache代理等几种。本文将详细介绍代理模式的原理和应用场景。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 安装mysqlclient失败解决办法
    本文介绍了在MAC系统中,使用django使用mysql数据库报错的解决办法。通过源码安装mysqlclient或将mysql_config添加到系统环境变量中,可以解决安装mysqlclient失败的问题。同时,还介绍了查看mysql安装路径和使配置文件生效的方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • CentOS 6.5安装VMware Tools及共享文件夹显示问题解决方法
    本文介绍了在CentOS 6.5上安装VMware Tools及解决共享文件夹显示问题的方法。包括清空CD/DVD使用的ISO镜像文件、创建挂载目录、改变光驱设备的读写权限等步骤。最后给出了拷贝解压VMware Tools的操作。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
author-avatar
菌挥发油
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有