热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkKafkaConnectorFlink结合Kafka实战

flink,kafka,connector,flink,结

简介

Flink-kafka-connector用来做什么?

Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复
Kafka可以作为Flink的source和sink
任务失败,通过设置kafka的offset来恢复应用

kafka简单介绍

关于kafka,我们会有专题文章介绍,这里简单介绍几个必须知道的概念。

1.生产者(Producer)

顾名思义,生产者就是生产消息的组件,它的主要工作就是源源不断地生产出消息,然后发送给消息队列。生产者可以向消息队列发送各种类型的消息,如狭义的字符串消息,也可以发送二进制消息。生产者是消息队列的数据源,只有通过生产者持续不断地向消息队列发送消息,消息队列才能不断处理消息。

2.消费者(Consumer)

所谓消费者,指的是不断消费(获取)消息的组件,它获取消息的来源就是消息队列(即Kafka本身)。换句话说,生产者不断向消息队列发送消息,而消费者则不断从消息队列中获取消息。

3.主题(Topic)

主题是Kafka中一个极为重要的概念。首先,主题是一个逻辑上的概念,它用于从逻辑上来归类与存储消息本身。多个生产者可以向一个Topic发送消息,同时也可以有多个消费者消费一个Topic中的消息。Topic还有分区和副本的概念。Topic与消息这两个概念之间密切相关,Kafka中的每一条消息都归属于某一个Topic,而一个Topic下面可以有任意数量的消息。 

kafka简单操作

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181

发送数据:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动一个消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

删除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn

Flink消费Kafka注意事项

  • setStartFromGroupOffsets()【默认消费策略】

    默认读取上次保存的offset信息 如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据 
  • setStartFromEarliest()
    从最早的数据开始进行消费,忽略存储的offset信息
  • setStartFromLatest()
    从最新的数据进行消费,忽略存储的offset信息
  • setStartFromSpecificOffsets(Map)
    从指定位置进行消费
  • 当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
  • 为了能够使用支持容错的kafka Consumer,需要开启checkpoint
    env.enableCheckpointing(5000); // 每5s checkpoint一次

搭建Kafka单机环境

我本地安装了一个kafka_2.11-2.1.0版本的kafka

image

启动Zookeeper和kafka server:

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

image

实战案例

Kafka作为Flink Sink

首先pom依赖:

 org.apache.flink flink-connector-kafka_2.11 1.7.0 

向kafka写入数据:

public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties); /* //event-timestamp事件的发生时间 producer.setWriteTimestampToKafka(true); */ text.addSink(producer); env.execute(); } }//

大家这里特别注意,我们实现了一个并行度为1的MyNoParalleSource来生产数据,代码如下:

//使用并行度为1的source public class MyNoParalleSource implements SourceFunction {//1 //private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { while(isRunning){ //图书的排行榜 List books = new ArrayList<>(); books.add("Pyhton从入门到放弃");//10 books.add("Java从入门到放弃");//8 books.add("Php从入门到放弃");//5 books.add("C++从入门到放弃");//3 books.add("Scala从入门到放弃");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒产生一条数据 Thread.sleep(2000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } 

代码实现了一个发送器,来发送书名等...

然后右键运行我们的程序,控制台输出如下:

image

开始源源不断的生产数据了。

然后我们用命令去查看一下 kafka test这个topic:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

输出如下:

image

Kafka作为Flink Source

直接上代码:

public class KafkaConsumer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); FlinkKafkaConsumer cOnsumer= new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); //从最早开始消费 consumer.setStartFromEarliest(); DataStream stream = env .addSource(consumer); stream.print(); //stream.map(); env.execute(); } }//

控制台输出如下:

image
将我们之前发往kafka的消息全部打印出来了。


推荐阅读
  • com.sun.javadoc.PackageDoc.exceptions()方法的使用及代码示例 ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
  • 如何将Python与Excel高效结合:常用操作技巧解析
    本文深入探讨了如何将Python与Excel高效结合,涵盖了一系列实用的操作技巧。文章内容详尽,步骤清晰,注重细节处理,旨在帮助读者掌握Python与Excel之间的无缝对接方法,提升数据处理效率。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • poj 3352 Road Construction ... [详细]
  • 本文详细介绍了MySQL数据库的基础语法与核心操作,涵盖从基础概念到具体应用的多个方面。首先,文章从基础知识入手,逐步深入到创建和修改数据表的操作。接着,详细讲解了如何进行数据的插入、更新与删除。在查询部分,不仅介绍了DISTINCT和LIMIT的使用方法,还探讨了排序、过滤和通配符的应用。此外,文章还涵盖了计算字段以及多种函数的使用,包括文本处理、日期和时间处理及数值处理等。通过这些内容,读者可以全面掌握MySQL数据库的核心操作技巧。 ... [详细]
  • 如何使用 `org.opencb.opencga.core.results.VariantQueryResult.getSource()` 方法及其代码示例详解 ... [详细]
  • 本文详细探讨了二元Probit模型及其在实际应用中的重要性。作为一种广义线性模型,Probit模型主要用于处理二分类问题,与Logistic模型类似,但其假设误差项服从标准正态分布。尽管Probit模型在某些领域应用较少,但在特定情境下仍具有独特优势。文章不仅介绍了模型的基本原理,还通过实例分析展示了其在经济学、社会学等领域的具体应用。 ... [详细]
  • 基于Net Core 3.0与Web API的前后端分离开发:Vue.js在前端的应用
    本文介绍了如何使用Net Core 3.0和Web API进行前后端分离开发,并重点探讨了Vue.js在前端的应用。后端采用MySQL数据库和EF Core框架进行数据操作,开发环境为Windows 10和Visual Studio 2019,MySQL服务器版本为8.0.16。文章详细描述了API项目的创建过程、启动步骤以及必要的插件安装,为开发者提供了一套完整的开发指南。 ... [详细]
  • 如何使用 `org.eclipse.rdf4j.query.impl.MapBindingSet.getValue()` 方法及其代码示例详解 ... [详细]
  • 分布式开源任务调度框架 TBSchedule 深度解析与应用实践
    本文深入解析了分布式开源任务调度框架 TBSchedule 的核心原理与应用场景,并通过实际案例详细介绍了其部署与使用方法。首先,从源码下载开始,详细阐述了 TBSchedule 的安装步骤和配置要点。接着,探讨了该框架在大规模分布式环境中的性能优化策略,以及如何通过灵活的任务调度机制提升系统效率。最后,结合具体实例,展示了 TBSchedule 在实际项目中的应用效果,为开发者提供了宝贵的实践经验。 ... [详细]
  • 【并发编程】全面解析 Java 内存模型,一篇文章带你彻底掌握
    本文深入解析了 Java 内存模型(JMM),从基础概念到高级特性进行全面讲解,帮助读者彻底掌握 JMM 的核心原理和应用技巧。通过详细分析内存可见性、原子性和有序性等问题,结合实际代码示例,使开发者能够更好地理解和优化多线程并发程序。 ... [详细]
author-avatar
1911530988com
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有