热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

深入理解Kafka核心设计及原理(二):生产者

转载请注明出处:2.1Kafka生产者客户端架构2.2Kafka进行消息生产发送代码示例及ProducerRecord对象kafka进行消息生产发送代码示例:publicclass

转载请注明出处:

2.1Kafka生产者客户端架构

                                             

2.2 Kafka 进行消息生产发送代码示例及ProducerRecord对象

  kafka进行消息生产发送代码示例:

public class KafkaProducerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static Properties initConfig() (
         Properties props = new Properties();
         props.put("bootstrap.servers", brokerList);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
      properties. put ("client. id", "producer. client. id. demo");
         return props;
    }
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer producer = new KafkaProducer<>(props);
        ProducerRecord record = new ProducerRecord<> (topic, "hello, Kafka1 ");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 }

  构建的消息对象ProducerRecord, 它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个value属性,比如"Hello, Kafka!"只是ProducerRecord对象中的一个属性。 ProducerRecord类的定义如下:

public class ProducerRecord {
        private final String topic; //主题
        private final Integer partition; //分区号
        private final Headers headers; //消息头部
        private final K key; //
        private final V value; //
        private final Long timestamp; //消息的时间戳
  //省略其他成员方法和构造方法
}

    其中topic和 partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,它大多用来设定 一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键, 它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。

    key可以让消息再进行二次归类, 同 一个key的消息会被划分到同 一个分区中, 有key的消息还可以支持日志压缩的功能,value是指消息体,一般不为空,如果为空则表示特定的消息 一墓碑消息;timestamp是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间.

    KafkaProducer是线程安全的, 可以在多个线程***享单个KafkaProducer实例,也 可以将KafkaProducer实例进行池化来供其他线程调用。

2.3 发送消息的三种模式及实现区别

  发送消息主要有三种模式: 发后即忘(fire-and-forget)、同步(sync)及异步Casync)。

  发后即忘,它只管往Kafka中发送消息而并不关心消息是否正确到达。 在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。 这种发送方式的性能最高, 可靠性也最差。

  KafkaProducer的 send()方法并非是void类型, 而是Future类型, send()方法有2个重载方法,具体定义如下:

public Future send(ProducerRecord record)
public Future send(ProducerRecord record,Callback callback)

  实现同步的发送方式, 可以利用返回的 Future 对象实现:

try {
    producer.send(record) .get();
} catch (ExecutionException I InterruptedException e) {
    e.printStackTrace();
}

  send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。 示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待Kaflca的响应,直到消息发送成功, 或者发生异常。 如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

try {
    Future future = producer.send{record);
    RecordMetadata metadata= future.get();
    System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset());
    } catch (ExecutionException I InterruptedException e) {
    e.printStackTrace () ;
}

  这样可以获取一个RecordMetadata对象, 在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、 时间戳等。

 

2.4 序列化

  生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。 而在对侧, 消费者需要用反序列化器(Deserializer)把从Kafka 中收到的字节数组转换成相应的对象。

  为 了方便, 消息的key和value都使用了字符串, 对应程序中的序列化器也使用了客户端自带的org.apache.kafka. common. serialization. StringSerializer, 除了用于String 类型的序列化器,还有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long这几种类型, 它们都实现了org.apache.kafka. common. serialization. Serializer接口

 

2.5 分区器

  消息在通过send( )方法发往broker 的过程中,有可能需要经过拦截器(Interceptor)、 序列化器(Serializer)和分区器(Parttitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的, 而序列化器是必需的。 消息 经过 序列化 之后就需要确定它发往的分区 ,如果消息ProducerRecord中指定了 partitition字段, 那么就不需要分区器的作用, 因为partition代表的就是所要发往的分区号。

   如果消息ProducerRecord中没有 指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。 分区器的作用 就是为消息 分配分区

  Kafka 中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它实现了org.apache.kafka.clients.producer.Partitioner 接口, 这个接口中定义了2个方法, 具体如下所示。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();

  其中 partition()方法用来计算分区号,返回值为 int 类型。partition()方法中的参数分别表示主题 、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。 close()方法在关闭分区器的时候用来回收一些资源 。

  默认的分区器会对key 进行哈希(采用MurmurHash2 算法 ,具备高运算性能及低碰撞率),最终根据得到 的 哈希值来计算分区号, 拥有相同 key 的消息会被写入同一个分区 。 如果 key 为 null ,那么消息将会以轮询的方式发往主题内的各个可用分区。

 

2.6 拦截器

  生产者拦截器既可以用 来在消息发送前做一些准备工作 ,比如按照某个规则过滤不符合要求的消 息、修改消 息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

  生产者拦截器 的 使用 也 很方便,主要是自定义实现org .apache.kafka. clients. producer.Producerlnterceptor 接口。ProducerInterceptor 接 口中包含 3 个方法 :

public ProducerRecord onSend (ProducerRecord record);
public void onAcknowledgement(RecordMetadata metadata, Excepti on exception );
public void close() ;

  KafkaProducer 在将消息序列化和计算分区之前会调用 生产者拦截器 的 onSend()方法来对消息进行相应 的定制化操作。KafkaProducer 会在消息被应答( Acknowledgement )之前或消息发送失败时调用生产者拦截器的onAcknowledgement()方法,优先于用户设定的Callback 之前执行。

 

2.6 消息累加器

  整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程 (发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator,也称为消息收集器〉中。Sender 线程负责从RecordAccumulator 中 获取消息并将其发送到 Kafka 中 。

  RecordAccumulator 主要用来缓存消息 以便Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 。RecordAccumulator 缓存的大 小可以通过生产者客户端参数buffer. memory 配置,默认值为 33554432B ,即32MB 。 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer 的 send()方法调用要么被阻塞,要么抛出异常,这个取决于参数 max. block . ms 的配置,此参数的默认值为 6 0000,即 60 秒 。

   Sender 从RecordAccumulator 中 获取缓存的消息之后,会进一 步将原本<分区,Deque

  元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的 leader 副本分配在哪个节点上,follower 副本分配在哪些节点上,哪些副本在 AR 、ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

 

2.7 重要的生产者参数

  1.acks

     这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。  acks 参数有 3 种类型的值(都是字符串类型)。

    acks =1 : 默认值即为l 。生产者发送消息之后,只要分区的leader 副本成功写入消息,那么它就会收到来自服务端的成功响应 。 如果消息无法写入 leader 副本,比如在leader 副本崩溃、重新选举新的leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息 。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息 。 acks 设置为l ,是消息可靠性和吞吐量之间的折中方案。

     acks = 0 :生产者发送消 息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0 可以达到最大的吞吐量。

     acks =- l 或 acks =all : 生产者在消 息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all )可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks= l 的情况。

  2.max.request.size

     这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B ,即lMB 。一般情况下,这个默认值就可以满足大多数的应用场景了。

  3.retries 和 retry. backoff.ms

     retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries大于 0 的值,以此通过 内 部重试来恢复而不是一昧地将异常抛给生产者的应用程序。 如果重试达到设定的次数 ,那么生产者就会放弃重试并返回异常。

    不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。 重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100 ,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试 。

  4.compression.type

     这个参数用来指定消息的压缩方式,默认值为“ none ”,即默认情况下,消息不会被压缩。该参数还可以配置为“ gzip ”,“ snappy ” 和“ lz4 ”。 对消息进行压缩可以极大地减少网络传输量 、降低网络 IO ,从而提高整体的性能 。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩 。

  5. request.timeout.ms

     这个参数用来配置 Producer 等待请求响应的最长时间,默认值为 3 0000( ms )。请求超时之后可以选择进行重试。注意这个参数需要 比 broker 端参数 replica.lag.time.max.ms 的值要大 ,这样可以减少因客户端重试而引起的消息重复的概率。

 


推荐阅读
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 本文介绍了为什么要使用多进程处理TCP服务端,多进程的好处包括可靠性高和处理大量数据时速度快。然而,多进程不能共享进程空间,因此有一些变量不能共享。文章还提供了使用多进程实现TCP服务端的代码,并对代码进行了详细注释。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了计算机网络的定义和通信流程,包括客户端编译文件、二进制转换、三层路由设备等。同时,还介绍了计算机网络中常用的关键词,如MAC地址和IP地址。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • Linux如何安装Mongodb的详细步骤和注意事项
    本文介绍了Linux如何安装Mongodb的详细步骤和注意事项,同时介绍了Mongodb的特点和优势。Mongodb是一个开源的数据库,适用于各种规模的企业和各类应用程序。它具有灵活的数据模式和高性能的数据读写操作,能够提高企业的敏捷性和可扩展性。文章还提供了Mongodb的下载安装包地址。 ... [详细]
  • 本文讨论了在使用PHP cURL发送POST请求时,请求体在node.js中没有定义的问题。作者尝试了多种解决方案,但仍然无法解决该问题。同时提供了当前PHP代码示例。 ... [详细]
  • 本文整理了Java中org.gwtbootstrap3.client.ui.Icon.addDomHandler()方法的一些代码示例,展示了Icon.ad ... [详细]
  • 目录1、将mysql数据导出到SQL文件中(数据库存在的情况)2、将现有的sql文件数据导入到数据库中(前提数据库存在) 3、利用Navicat导出SQL文件和导入SQL文件1)从 ... [详细]
  • 视图分区_组复制常规操作网络分区amp;混合使用IPV6与IPV4 | 全方位认识 MySQL 8.0 Group Replication...
    网络分区对于常规事务而言,每当组内有事务数据需要被复制时,组内的成员需要达成共识(要么都提交,要么都回滚)。对于组成员资格的变更也和保持组 ... [详细]
  • Spark Streaming和Kafka整合之路(最新版本)
    2019独角兽企业重金招聘Python工程师标准最近完成了SparkStreaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少 ... [详细]
author-avatar
才女我是周宇
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有