热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

批量读取_Flink1.11版本读取kafka数据批量写入mysql

主要代码packagecn.nanxiuzi.kafka.kafka2mysql;importcn.nanxiuzi.kafka.KafkaDic;importcom.google

主要代码

package cn.nanxiuzi.kafka.kafka2mysql;import cn.nanxiuzi.kafka.KafkaDic;import com.google.common.collect.Lists;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Properties;/** * 读取kafka之后写入mysql */public class Kafka2Mysql { public static final Logger logger &#61; LoggerFactory.getLogger(Kafka2Mysql.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint的设置 //每隔10s进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(10000); //设置模式为&#xff1a;exactly_one&#xff0c;仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //确保检查点之间有1s的时间间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); //检查点必须在10s之内完成&#xff0c;或者被丢弃【checkpoint超时时间】 env.getCheckpointConfig().setCheckpointTimeout(10000); //同一时间只允许进行一次检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //表示一旦Flink程序被cancel后&#xff0c;会保留checkpoint数据&#xff0c;以便根据实际需要恢复到指定的checkpoint //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend,将检查点保存在hdfs上面&#xff0c;默认保存在内存中。这里先保存到本地// env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/")); Properties ppt &#61; new Properties(); ppt.setProperty("bootstrap.servers", KafkaDic.Kafka_ADDRESS_COLLECTION); ppt.setProperty("group.id", KafkaDic.CONSUMER_GROUP_ID); ppt.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ppt.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer flinkKafkaConsumer &#61; new FlinkKafkaConsumer<>(KafkaDic.CONSUMER_TOPIC, new SimpleStringSchema(), ppt); flinkKafkaConsumer.setStartFromLatest(); DataStreamSource kafkaSource &#61; env.addSource(flinkKafkaConsumer).setParallelism(1); kafkaSource.map(new MapFunction() { &#64;Override public String map(String o) throws Exception { return o.split(",")[0]; } }).timeWindowAll(Time.seconds(10)).process(new ProcessAllWindowFunction, TimeWindow>() { &#64;Override public void process(Context context, Iterable elements, Collector> out) throws Exception { ArrayList strings &#61; Lists.newArrayList(elements); if (strings.size() > 0) { out.collect(strings); } } }).addSink(new MysqlSink()).setParallelism(1); env.execute("MysqlSink"); }}

Kafka生产者代码

package cn.nanxiuzi.kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MyKafkaProducer { public static void main(String[] args) { Properties props &#61; new Properties(); props.put("bootstrap.servers", KafkaDic.Kafka_ADDRESS_COLLECTION); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer &#61; new KafkaProducer<>(props); for(int i&#61;0;i<10000;i&#43;&#43;){ String messageContext&#61;String.format("姓名%s,广东深圳%s,身高%s,体重%s,电话%s",Integer.toString(i),Integer.toString(i),Integer.toString(i),Integer.toString(i),Integer.toString(i)); kafkaProducer.send(new ProducerRecord(KafkaDic.PRODUCER_TOPIC,Integer.toString(i),messageContext)); System.out.println("sented:"&#43;messageContext); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } kafkaProducer.close(); }}

DBConnectUtil

package cn.nanxiuzi.kafka.kafka2mysql;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Connection;/** * Created with IntelliJ IDEA. * User: zzy * Date: 2019/5/28 * Time: 8:58 PM * To change this template use File | Settings | File Templates. */public class DBConnectUtil { private static final Logger log &#61; LoggerFactory.getLogger(DBConnectUtil.class); /** * 获取连接 * * &#64;param url * &#64;param user * &#64;param password * &#64;return * &#64;throws SQLException */ public static Connection getConnection(String url, String user, String password) throws SQLException { Connection conn &#61; null; try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { log.error("获取mysql.jdbc.Driver失败"); e.printStackTrace(); } try { conn &#61; DriverManager.getConnection(url, user, password); log.info("获取连接:{} 成功...",conn); }catch (Exception e){ log.error("获取连接失败&#xff0c;url:" &#43; url &#43; ",user:" &#43; user); } //设置手动提交 //conn.setAutoCommit(false); return conn; } /** * 提交事物 */ public static void commit(Connection conn) { if (conn !&#61; null) { try { conn.commit(); } catch (SQLException e) { log.error("提交事物失败,Connection:" &#43; conn); e.printStackTrace(); } finally { close(conn); } } } /** * 事物回滚 * * &#64;param conn */ public static void rollback(Connection conn) { if (conn !&#61; null) { try { conn.rollback(); } catch (SQLException e) { log.error("事物回滚失败,Connection:" &#43; conn); e.printStackTrace(); } finally { close(conn); } } } /** * 关闭连接 * * &#64;param conn */ public static void close(Connection conn) { if (conn !&#61; null) { try { conn.close(); } catch (SQLException e) { log.error("关闭连接失败,Connection:" &#43; conn); e.printStackTrace(); } } }}

输出

6ee8645fcce5c2101d4b9d79d52d256c.png

2d49fdf0dbb71d1d50f0cf9f750fec67.png

89044223c9a74b21a0050a8b88df18ee.png

pom.xml

4.0.0modelVersion> cn.nanxiuzigroupId> myflinkartifactId> 1.0-SNAPSHOTversion> 1.11.2flink-version> 1.2.17log4j-version> properties> org.apache.flinkgroupId> flink-javaartifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-coreartifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-streaming-java_2.11artifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-connector-kafka_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-hbase_2.11artifactId> 1.10.2version> dependency> org.apache.flinkgroupId> flink-connector-filesystem_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-connector-jdbc_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-clients_2.11artifactId> ${flink-version}version> dependency> com.github.javafakergroupId> javafakerartifactId> 1.0.2version> dependency> log4jgroupId> log4jartifactId> ${log4j-version}version> dependency> com.fasterxml.jackson.coregroupId> jackson-databindartifactId> 2.11.2version> dependency> com.alibabagroupId> fastjsonartifactId> 1.2.73version> dependency> mysqlgroupId> mysql-connector-javaartifactId> 5.1.25version> dependency> dependencies> org.apache.maven.pluginsgroupId> maven-shade-pluginartifactId> 3.1.1version> packagephase> shadegoal> goals> com.google.code.findbugs:jsr305exclude> org.slf4j:*exclude> log4j:*exclude> excludes> artifactSet> *:*artifact> META-INF/*.SFexclude> META-INF/*.DSAexclude> META-INF/*.RSAexclude> excludes> filter> filters> my.programs.main.clazzmainClass> transformer> transformers> configuration> execution> executions> plugin> org.apache.maven.pluginsgroupId> maven-compiler-pluginartifactId> 8source> 8target> configuration> plugin> plugins> build>project>




推荐阅读
  • GetWindowLong函数
    今天在看一个代码里头写了GetWindowLong(hwnd,0),我当时就有点费解,靠,上网搜索函数原型说明,死活找不到第 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Commit1ced2a7433ea8937a1b260ea65d708f32ca7c95eintroduceda+Clonetraitboundtom ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文讨论了在openwrt-17.01版本中,mt7628设备上初始化启动时eth0的mac地址总是随机生成的问题。每次随机生成的eth0的mac地址都会写到/sys/class/net/eth0/address目录下,而openwrt-17.01原版的SDK会根据随机生成的eth0的mac地址再生成eth0.1、eth0.2等,生成后的mac地址会保存在/etc/config/network下。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • GreenDAO快速入门
    前言之前在自己做项目的时候,用到了GreenDAO数据库,其实对于数据库辅助工具库从OrmLite,到litePal再到GreenDAO,总是在不停的切换,但是没有真正去了解他们的 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • 本文详细介绍了Android中的坐标系以及与View相关的方法。首先介绍了Android坐标系和视图坐标系的概念,并通过图示进行了解释。接着提到了View的大小可以超过手机屏幕,并且只有在手机屏幕内才能看到。最后,作者表示将在后续文章中继续探讨与View相关的内容。 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • ejava,刘聪dejava
    本文目录一览:1、什么是Java?2、java ... [详细]
author-avatar
345877103_b54cd7
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有