热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

开发笔记:Kafka踩坑记录

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。

篇首语:本文由编程笔记#小编为大家整理,主要介绍了Kafka踩坑记录相关的知识,希望对你有一定的参考价值。






1.生产者发送消息如果没有key值,要设置成null,不能设置成空字符串,否则会认为空字符串是key值,会把所有消息发送到一个分区上。

2.生产者设置消息批量发送,需要设置两个属性:batch.size和linger.ms。前者是消息积攒到多大时,发送给broker。后者是超过多少毫秒时,发送给broker。两者只要触发一个条件,就会把积攒的消息批量发送给broker。
需要注意的是,不管是否批量发送消息,produce都要显式调用close方法,只不过批量发送消息时,KafkaProducer会把消息存起来,触发条件后,才统一发送给broker。

当还没有到达batch.size的阈值,也没有到达linger.ms的阈值时,如果此时线程突然中断了,那么这批次的消息就会丢失,不会发送给broker。当这两个条件都没触发时,但是调用了produce的close方法,会把这个批次的消息提交至broker的。

3.batch.size的值,设置大小要合理,设置太大,会造成阻塞,效率反而更慢,下面复原一下问题。
本来想测试一下消费端的数据处理能力,所以在生产者端,我把batch.size的值设置成了21300000,不要问为什么设置成这个值,随便瞎写的。把linger.ms设置成了20000,即20秒发送一批数据。生产者通过循环产生数据。
运行的时候,产生了令人费解的一幕。生产者生产数据的速度非常慢,几秒钟,才会进入下一次循环。
然后经过了多次测试发现了更神奇的现象。当消息的key值设置为null或动态变化时,生产消息的速率非常慢。而当把消息的key值设置成固定的时,生产消息的速率又恢复了正常。并且,把batch.size的值设置小一些的时候(4096),生产速率也是正常的。
由上面的现象可知。batch.size设置很大,会影响生产者的效率,具体为什么影响,不得而知,需要追溯源码。 而batch.size设置很大时,效率也跟消息的key值有关系。key值固定不变时,效率快的原因猜测是因为不用进行分区的负载计算,只是一个往一个分区发,所以快。而key是null或者动态变化时,效率慢的原因猜测是因为需要计算不同key值的分区负载,所以影响了效率。那为什么当batch.size值小的时候,不会受key值的影响呢?这就说明batch.size和key值之间,有相互关系,但是还不是完全的影响关系。

带着上面的猜想和疑问,我们来看producer的源码。问题的根本,都集中在了producer的send方法里,所以,我们看send方法的源码。

这里,我们看源码的目的是找到为什么生产效率慢。所以,我采用了断点的方式,来定位到底是哪块代码,影响了速率。下面记录一下寻找过程。
由send方法经过一顿点入,进入了doSend方法,经过一番分析,初步猜测,是这个方法影响了效率,如下:
在这里插入图片描述
然后就在这里打个断点,然后再在它的下一行打个断点,用断点法验证是不是这里影响了性能。通过测试发现,从这个方法到下一个方法断点走的速度很快。而下一个断点到返回的send方法的时间,反而很慢。这就说明,我们猜测的这个断点,不是影响效率的地方。而是在这个断点的后面,某段代码,影响了效率。
索性把后面的代码,每行都打上断点,来测试到底哪行代码性能慢,如下图所示:
在这里插入图片描述
然后继续采用断点发,一个断点一个断点走,看哪个断点执行时间长。最终,发现是这个代码执行时间长:
在这里插入图片描述我们点进去这个方法,可以继续采用断点发一个一个排查哪个代码影响了性能,也可以大体看一遍方法,猜测一下可能哪里影响性能,缩小定位范围,最终,定位到是这里影响了性能:

在这里插入图片描述
这个方法就是最终的原因所在,我们把其全部代码贴出来:

/**
* Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool
* is configured with blocking mode.
*
* @param size The buffer size to allocate in bytes
* @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available
* @return The buffer
* @throws InterruptedException If the thread is interrupted while blocked
* @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block
* forever)
*/

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs &#61; time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed &#61; !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs &#61; time.nanoseconds();
timeNs &#61; Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " &#43; maxTimeToBlockMs &#43; " ms.");
}
remainingTimeToBlockNs -&#61; timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
if (accumulated &#61;&#61; 0 && size &#61;&#61; this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer &#61; this.free.pollFirst();
accumulated &#61; size;
} else {
// we&#39;ll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got &#61; (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -&#61; got;
accumulated &#43;&#61; got;
}
}
// Don&#39;t reclaim memory on throwable since nothing was thrown
accumulated &#61; 0;
} finally {
// When this loop was not able to successfully terminate don&#39;t loose available memory
this.nonPooledAvailableMemory &#43;&#61; accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
if (!(this.nonPooledAvailableMemory &#61;&#61; 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer &#61;&#61; null)
return safeAllocateByteBuffer(size);
else
return buffer;
}

结合注释看代码&#xff0c;可以得知&#xff0c;核心原因如下图:
在这里插入图片描述
当超出size时&#xff0c;会进入阻塞&#xff0c;这就解释了为什么batch.size很大时&#xff0c;效率反而慢了。

那么还有一个疑问没解决&#xff0c;为什么key值设置成固定值时&#xff0c;效率很快呢&#xff0c;它没有阻塞吗&#xff1f;

通过断点可知&#xff0c;当key值固定时&#xff0c;代码没走进阻塞方法&#xff0c;它走了其他分支&#xff0c;所以效率很快&#xff0c;如下图:
在这里插入图片描述


总结

batch.size的值需要合理设置&#xff0c;否则会进入阻塞&#xff0c;效率很慢。






推荐阅读
  • 本文详细介绍了 `org.apache.tinkerpop.gremlin.structure.VertexProperty` 类中的 `key()` 方法,并提供了多个实际应用的代码示例。通过这些示例,读者可以更好地理解该方法在图数据库操作中的具体用途。 ... [详细]
  • Asynchronous JavaScript and XML (AJAX) 的流行很大程度上得益于 Google 在其产品如 Google Suggest 和 Google Maps 中的应用。本文将深入探讨 AJAX 在 .NET 环境下的工作原理及其实现方法。 ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
  • 本文详细介绍了如何使用C#实现不同类型的系统服务账户(如Windows服务、计划任务和IIS应用池)的密码重置方法。 ... [详细]
  • 1、编写一个Java程序在屏幕上输出“你好!”。programmenameHelloworld.javapublicclassHelloworld{publicst ... [详细]
  • 本文详细介绍了 Redis 中的主要数据类型,包括 String、Hash、List、Set、ZSet、Geo 和 HyperLogLog,并提供了每种类型的基本操作命令和应用场景。 ... [详细]
  • 解决Win10 1709版本文件共享安全警告问题
    每当Windows 10发布新版本时,由于兼容性问题往往会出现各种故障。近期,一些用户在升级至1709版本后遇到了无法访问共享文件夹的问题,系统提示‘文件共享不安全,无法连接’。本文将提供多种解决方案,帮助您轻松解决这一难题。 ... [详细]
  • 本文深入探讨了WPF框架下的数据验证机制,包括内置验证规则的使用、自定义验证规则的实现方法、错误信息的有效展示策略以及验证时机的选择,旨在帮助开发者构建更加健壮和用户友好的应用程序。 ... [详细]
  • Zabbix自定义监控与邮件告警配置实践
    本文详细介绍了如何在Zabbix中添加自定义监控项目,配置邮件告警功能,并解决测试告警时遇到的邮件不发送问题。 ... [详细]
  • td{border:1pxsolid#808080;}参考:和FMX相关的类(表)TFmxObjectIFreeNotification ... [详细]
  • Maven + Spring + MyBatis + MySQL 环境搭建与实例解析
    本文详细介绍如何使用MySQL数据库进行环境搭建,包括创建数据库表并插入示例数据。随后,逐步指导如何配置Maven项目,整合Spring框架与MyBatis,实现高效的数据访问。 ... [详细]
  • 本文详细介绍了如何在Oracle VM VirtualBox中实现主机与虚拟机之间的数据交换,包括安装Guest Additions增强功能,以及如何利用这些功能进行文件传输、屏幕调整等操作。 ... [详细]
  • 利用 Calcurse 在 Linux 终端高效管理日程与任务
    对于喜爱使用 Linux 终端进行日常操作的系统管理员来说,Calcurse 提供了一种强大的方式来管理日程安排、待办事项及会议。本文将详细介绍如何在 Linux 上安装和使用 Calcurse,帮助用户更有效地组织工作。 ... [详细]
  • 本文深入探讨了Go语言中的接口型函数,通过实例分析其灵活性和强大功能,帮助开发者更好地理解和运用这一特性。 ... [详细]
  • 本文详细介绍了 Java 中 org.apache.jena.atlas.lib.ByteBufferLib 类下的 acopyArray 方法,并提供了多个实际应用中的代码示例,帮助开发者更好地理解和使用该方法。 ... [详细]
author-avatar
坐着驴车追宝马
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有