热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka自定义生产者

第一步:在终端启动一个消费都等待生产者生产出来的数据代码实现创建Maven项目添加依赖junit
第一步:在终端启动一个消费都等待生产者生产出来的数据

在这里插入图片描述

代码实现

创建Maven项目


  • 添加依赖

<dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.4.1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.7.25version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.7.25version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka_2.12artifactId><version>2.4.1version>dependency>

  • 在resources目录下添加log4j.properties

### 设置###
log4j.rootLogger &#61; debug,stdout,D,E### 输出信息到控制抬 ###
log4j.appender.stdout &#61; org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target &#61; System.out
log4j.appender.stdout.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern &#61; [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### 输出DEBUG 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.D &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File &#61; E://logs/log.log
log4j.appender.D.Append &#61; true
log4j.appender.D.Threshold &#61; DEBUG
log4j.appender.D.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n### 输出ERROR 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.E &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File &#61;E://logs/error.log
log4j.appender.E.Append &#61; true
log4j.appender.E.Threshold &#61; ERROR
log4j.appender.E.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n

情况一&#xff1a;创建生产者

public class CustomerProducer {// 配置信息来源&#xff1a;ProducerConfigpublic static void main(String[] args) {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092");// 应答级别&#xff1a;等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);//16k// 请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);//32M// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer &#61; new KafkaProducer<>(props);//循环发送数据for (int i &#61; 0; i < 8; i&#43;&#43;) {ProducerRecord<String, String> data &#61; new ProducerRecord<>("first", Integer.toString(i), "haha-" &#43; i);producer.send(data);}producer.close();}
}

运行程序&#xff0c;在Consumer终端上查看结果&#xff1a;
在这里插入图片描述

情况二&#xff1a;创建带回调的生产者

public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hehe-" &#43; i);kafkaProducer.send(pr, new Callback() {&#64;Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception !&#61; null){System.out.println("发送失败");}else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.topic()&#43;" - "&#43;metadata.partition() &#43; " - " &#43; metadata.offset());}}}});}kafkaProducer.close();}}

在Intellij控制中结果&#xff1a;
在这里插入图片描述
在counsumer终端中查看结果&#xff1a;
在这里插入图片描述

情况三&#xff1a;创建自定义分区的生产者


  • 自定义Partitioner

public class CustomPartitioner implements Partitioner {&#64;Overridepublic void configure(Map<String, ?> configs) {}&#64;Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 控制分区return 1;}&#64;Overridepublic void close() {}
}

  • 自定义Procuder

public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();// Kafka服务端的主机名和端口号props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");// 等待所有副本节点的应答props.put("acks", "all");// 消息发送最大尝试次数props.put("retries", 0);// 一批消息处理大小props.put("batch.size", 16384);// 增加服务端请求延时props.put("linger.ms", 1);// 发送缓存区内存大小props.put("buffer.memory", 33554432);// key序列化props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value序列化props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 自定义分区props.put("partitioner.class", "com.hc.producer.customerparitioner.CustomPartitioner");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hello world-" &#43; i);kafkaProducer.send(pr, (metadata, exception) -> {if (exception !&#61; null) {System.out.println("发送失败");} else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.partition() &#43; " --- " &#43; metadata.offset());}}});}kafkaProducer.close();}}

  • 结果

在Intellij控制中结果&#xff1a;
在这里插入图片描述
在counsumer终端中查看结果&#xff1a;
在这里插入图片描述


推荐阅读
  • 如何正确配置Log4j以优化日志记录效果? ... [详细]
  • Mybatis_04日志
    前几天临近期末考试,一直在准备考试,吐槽一下,这个学期的考试真是全背书,服了,背吐了。考完试到元旦又放肆了几天 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 本文介绍了NHibernate中通过定义接口和实现类来管理会话工厂的方法,包括接口的优势、模型文件夹的结构以及具体的代码示例。 ... [详细]
  • Hadoop平台警告解决:无法加载本机Hadoop库的全面应对方案
    本文探讨了在Hadoop平台上遇到“无法加载本机Hadoop库”警告的多种解决方案。首先,通过修改日志配置文件来忽略该警告,这一方法被证明是有效的。其次,尝试指定本地库的路径,但未能解决问题。接着,尝试不使用Hadoop本地库,同样没有效果。然后,通过替换现有的Hadoop本地库,成功解决了问题。最后,根据Hadoop的源代码自行编译本地库,也达到了预期的效果。以上方法适用于macOS系统。 ... [详细]
  • 在Java应用程序中调用`response.getStatus()`方法时遇到了`NoSuchMethodError`异常,经过分析,初步判断为依赖冲突问题。通过检查项目依赖树发现,当前项目版本与某些库的版本不兼容,导致该方法无法被正确识别。建议通过更新相关依赖版本或使用依赖管理工具(如Maven或Gradle)来解决此问题,确保所有依赖项版本一致且兼容。 ... [详细]
  • 本文探讨了如何利用自定义URI方案和注册表编辑,在Windows操作系统中实现从Web浏览器启动本地应用程序的方法,同时强调了这一过程中的安全考虑。 ... [详细]
  • MyBatis入门指南
    本文详细介绍了MyBatis的基础知识,包括如何整合日志框架(如log4j和logback),使用外部JDBC文件,getMapper()方法的应用,以及别名设置等技巧。 ... [详细]
  • 本文详细探讨了UML用例图中的两种重要关系——包含关系和扩展关系,通过具体示例解析这两种关系的应用场景及其实现方式。 ... [详细]
  • 本文介绍如何使用 Google 开发的 libphonenumber 库在 Java 应用中实现电话号码的有效性验证。该库不仅支持多种国际电话号码的格式化与解析,还提供了一系列强大的验证工具。 ... [详细]
  • PHP 5.4.8 编译安装指南
    本文详细介绍了如何在Linux环境下编译安装PHP 5.4.8,并配置为FastCGI模式运行。包括所需依赖包的安装、源代码下载、编译配置及启动服务等步骤。 ... [详细]
  • 本文探讨了在使用Apache Flink向Kafka发送数据过程中遇到的事务频繁失败问题,并提供了详细的解决方案,包括必要的配置调整和最佳实践。 ... [详细]
  • 如何正确配置与使用日志组件:Log4j、SLF4J及Logback的连接与整合方法
    在当前的软件开发实践中,无论是开源项目还是日常工作中,日志框架都是不可或缺的工具之一。本文详细探讨了如何正确配置与使用Log4j、SLF4J及Logback这三个流行的日志组件,并深入解析了它们之间的连接与整合方法,旨在帮助开发者高效地管理和优化日志记录流程。 ... [详细]
  • 增加Maven构建profile配置在项目最顶层的pom.xml添加common和release两个profile,并声明${app.run.env}作为环境切换变量<profiles> ... [详细]
author-avatar
银仔-zxy
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有