热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

批量读取_Flink1.11版本读取kafka数据批量写入mysql

主要代码packagecn.nanxiuzi.kafka.kafka2mysql;importcn.nanxiuzi.kafka.KafkaDic;importcom.google

主要代码

package cn.nanxiuzi.kafka.kafka2mysql;import cn.nanxiuzi.kafka.KafkaDic;import com.google.common.collect.Lists;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.List;import java.util.Properties;/** * 读取kafka之后写入mysql */public class Kafka2Mysql { public static final Logger logger &#61; LoggerFactory.getLogger(Kafka2Mysql.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint的设置 //每隔10s进行启动一个检查点【设置checkpoint的周期】 env.enableCheckpointing(10000); //设置模式为&#xff1a;exactly_one&#xff0c;仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //确保检查点之间有1s的时间间隔【checkpoint最小间隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); //检查点必须在10s之内完成&#xff0c;或者被丢弃【checkpoint超时时间】 env.getCheckpointConfig().setCheckpointTimeout(10000); //同一时间只允许进行一次检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //表示一旦Flink程序被cancel后&#xff0c;会保留checkpoint数据&#xff0c;以便根据实际需要恢复到指定的checkpoint //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend,将检查点保存在hdfs上面&#xff0c;默认保存在内存中。这里先保存到本地// env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/")); Properties ppt &#61; new Properties(); ppt.setProperty("bootstrap.servers", KafkaDic.Kafka_ADDRESS_COLLECTION); ppt.setProperty("group.id", KafkaDic.CONSUMER_GROUP_ID); ppt.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); ppt.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer flinkKafkaConsumer &#61; new FlinkKafkaConsumer<>(KafkaDic.CONSUMER_TOPIC, new SimpleStringSchema(), ppt); flinkKafkaConsumer.setStartFromLatest(); DataStreamSource kafkaSource &#61; env.addSource(flinkKafkaConsumer).setParallelism(1); kafkaSource.map(new MapFunction() { &#64;Override public String map(String o) throws Exception { return o.split(",")[0]; } }).timeWindowAll(Time.seconds(10)).process(new ProcessAllWindowFunction, TimeWindow>() { &#64;Override public void process(Context context, Iterable elements, Collector> out) throws Exception { ArrayList strings &#61; Lists.newArrayList(elements); if (strings.size() > 0) { out.collect(strings); } } }).addSink(new MysqlSink()).setParallelism(1); env.execute("MysqlSink"); }}

Kafka生产者代码

package cn.nanxiuzi.kafka;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class MyKafkaProducer { public static void main(String[] args) { Properties props &#61; new Properties(); props.put("bootstrap.servers", KafkaDic.Kafka_ADDRESS_COLLECTION); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer &#61; new KafkaProducer<>(props); for(int i&#61;0;i<10000;i&#43;&#43;){ String messageContext&#61;String.format("姓名%s,广东深圳%s,身高%s,体重%s,电话%s",Integer.toString(i),Integer.toString(i),Integer.toString(i),Integer.toString(i),Integer.toString(i)); kafkaProducer.send(new ProducerRecord(KafkaDic.PRODUCER_TOPIC,Integer.toString(i),messageContext)); System.out.println("sented:"&#43;messageContext); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } kafkaProducer.close(); }}

DBConnectUtil

package cn.nanxiuzi.kafka.kafka2mysql;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Connection;/** * Created with IntelliJ IDEA. * User: zzy * Date: 2019/5/28 * Time: 8:58 PM * To change this template use File | Settings | File Templates. */public class DBConnectUtil { private static final Logger log &#61; LoggerFactory.getLogger(DBConnectUtil.class); /** * 获取连接 * * &#64;param url * &#64;param user * &#64;param password * &#64;return * &#64;throws SQLException */ public static Connection getConnection(String url, String user, String password) throws SQLException { Connection conn &#61; null; try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { log.error("获取mysql.jdbc.Driver失败"); e.printStackTrace(); } try { conn &#61; DriverManager.getConnection(url, user, password); log.info("获取连接:{} 成功...",conn); }catch (Exception e){ log.error("获取连接失败&#xff0c;url:" &#43; url &#43; ",user:" &#43; user); } //设置手动提交 //conn.setAutoCommit(false); return conn; } /** * 提交事物 */ public static void commit(Connection conn) { if (conn !&#61; null) { try { conn.commit(); } catch (SQLException e) { log.error("提交事物失败,Connection:" &#43; conn); e.printStackTrace(); } finally { close(conn); } } } /** * 事物回滚 * * &#64;param conn */ public static void rollback(Connection conn) { if (conn !&#61; null) { try { conn.rollback(); } catch (SQLException e) { log.error("事物回滚失败,Connection:" &#43; conn); e.printStackTrace(); } finally { close(conn); } } } /** * 关闭连接 * * &#64;param conn */ public static void close(Connection conn) { if (conn !&#61; null) { try { conn.close(); } catch (SQLException e) { log.error("关闭连接失败,Connection:" &#43; conn); e.printStackTrace(); } } }}

输出

6ee8645fcce5c2101d4b9d79d52d256c.png

2d49fdf0dbb71d1d50f0cf9f750fec67.png

89044223c9a74b21a0050a8b88df18ee.png

pom.xml

4.0.0modelVersion> cn.nanxiuzigroupId> myflinkartifactId> 1.0-SNAPSHOTversion> 1.11.2flink-version> 1.2.17log4j-version> properties> org.apache.flinkgroupId> flink-javaartifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-coreartifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-streaming-java_2.11artifactId> ${flink-version}version> providedscope> dependency> org.apache.flinkgroupId> flink-connector-kafka_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-hbase_2.11artifactId> 1.10.2version> dependency> org.apache.flinkgroupId> flink-connector-filesystem_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-connector-jdbc_2.11artifactId> ${flink-version}version> dependency> org.apache.flinkgroupId> flink-clients_2.11artifactId> ${flink-version}version> dependency> com.github.javafakergroupId> javafakerartifactId> 1.0.2version> dependency> log4jgroupId> log4jartifactId> ${log4j-version}version> dependency> com.fasterxml.jackson.coregroupId> jackson-databindartifactId> 2.11.2version> dependency> com.alibabagroupId> fastjsonartifactId> 1.2.73version> dependency> mysqlgroupId> mysql-connector-javaartifactId> 5.1.25version> dependency> dependencies> org.apache.maven.pluginsgroupId> maven-shade-pluginartifactId> 3.1.1version> packagephase> shadegoal> goals> com.google.code.findbugs:jsr305exclude> org.slf4j:*exclude> log4j:*exclude> excludes> artifactSet> *:*artifact> META-INF/*.SFexclude> META-INF/*.DSAexclude> META-INF/*.RSAexclude> excludes> filter> filters> my.programs.main.clazzmainClass> transformer> transformers> configuration> execution> executions> plugin> org.apache.maven.pluginsgroupId> maven-compiler-pluginartifactId> 8source> 8target> configuration> plugin> plugins> build>project>




推荐阅读
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 在PHP中如何正确调用JavaScript变量及定义PHP变量的方法详解 ... [详细]
  • 本文探讨了如何利用Java代码获取当前本地操作系统中正在运行的进程列表及其详细信息。通过引入必要的包和类,开发者可以轻松地实现这一功能,为系统监控和管理提供有力支持。示例代码展示了具体实现方法,适用于需要了解系统进程状态的开发人员。 ... [详细]
  • 我在使用 AngularJS 的路由功能开发单页应用 (SPA),但需要支持 IE7(包括 IE8 的 IE7 兼容模式)。我希望浏览器的历史记录功能能够正常工作,即使需要使用 jQuery 插件。 ... [详细]
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • 本文探讨了如何在 Java 中将多参数方法通过 Lambda 表达式传递给一个接受 List 的 Function。具体分析了 `OrderUtil` 类中的 `runInBatches` 方法及其使用场景。 ... [详细]
  • 在JavaWeb开发中,文件上传是一个常见的需求。无论是通过表单还是其他方式上传文件,都必须使用POST请求。前端部分通常采用HTML表单来实现文件选择和提交功能。后端则利用Apache Commons FileUpload库来处理上传的文件,该库提供了强大的文件解析和存储能力,能够高效地处理各种文件类型。此外,为了提高系统的安全性和稳定性,还需要对上传文件的大小、格式等进行严格的校验和限制。 ... [详细]
  • 在处理 XML 数据时,如果需要解析 `` 标签的内容,可以采用 Pull 解析方法。Pull 解析是一种高效的 XML 解析方式,适用于流式数据处理。具体实现中,可以通过 Java 的 `XmlPullParser` 或其他类似的库来逐步读取和解析 XML 文档中的 `` 元素。这样不仅能够提高解析效率,还能减少内存占用。本文将详细介绍如何使用 Pull 解析方法来提取 `` 标签的内容,并提供一个示例代码,帮助开发者快速解决问题。 ... [详细]
  • 字节流(InputStream和OutputStream),字节流读写文件,字节流的缓冲区,字节缓冲流
    字节流抽象类InputStream和OutputStream是字节流的顶级父类所有的字节输入流都继承自InputStream,所有的输出流都继承子OutputStreamInput ... [详细]
  • JComponentJLabel的setBorder前言用例2205262241前言setBorder(Border边框)实现自JComponentjava.awt.Insets ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
  • 文章目录Golang定时器Timer和Tickertime.Timertime.NewTimer()实例time.AfterFunctime.Tickertime.NewTicke ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • Nginx 反向代理配置与应用指南
    本文详细介绍了 Nginx 反向代理的配置与应用方法。首先,用户可以从官方下载页面(http://nginx.org/en/download.html)获取最新稳定版 Nginx,推荐使用 1.14.2 版本。下载并解压后,通过双击 `nginx.exe` 文件启动 Nginx 服务。文章进一步探讨了反向代理的基本原理及其在实际应用场景中的配置技巧,包括负载均衡、缓存管理和安全设置等,为用户提供了一套全面的实践指南。 ... [详细]
  • 深入探索HTTP协议的学习与实践
    在初次访问某个网站时,由于本地没有缓存,服务器会返回一个200状态码的响应,并在响应头中设置Etag和Last-Modified等缓存控制字段。这些字段用于后续请求时验证资源是否已更新,从而提高页面加载速度和减少带宽消耗。本文将深入探讨HTTP缓存机制及其在实际应用中的优化策略,帮助读者更好地理解和运用HTTP协议。 ... [详细]
author-avatar
345877103_b54cd7
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有