热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka原理及应用(一)

一.Kafka简介(1)消息中间件的两种实现模式JMS(JavaMessageService)对消息的发送和接收定义了两种模式:点对点模式:消息的生产和消费者均只有

一. Kafka简介

(1) 消息中间件的两种实现模式

JMS (Java Message Service) 对消息的发送和接收定义了两种模式:

  1. 点对点模式:消息的生产和消费者均只有一个,消息由生产者将消息发送到消息队列(queue)中,然后消息消费者从队列中取出消息进行消费,消息被取出后,queue中不再保存该消息。

  2. 发布订阅模式:消息的生产和消费者可能有多个,使用主题(Topic)来对消息进行分类,生产者将消息发送到主题,多个消费者均可以对这个主题进行消费。类似于对多个消费者做广播。

    左图简单来讲就是,消息生产者在Kafka集群上订阅主题后,可以并发的向集群发送消息,Kafka集群接受到消息会按机制将消息存在不同的分区,存哪个分区可以由生产者指定,如果生产者未指定则按key来hash或者采用round robin的方式保存保存。

    中间的图是一个左右两图总体概括。

    右图来自kafka官网,旨在说明kafka的消费都是以消费组的方式来消费,即使不指定也会默认创建一个消费组,不同的消费组对同一个主题的消费相互独立,同一消费组内不同消费者不能重复消费某一分区,两种极端的情况就是:

    1. 若消费组内消费者数量和分区数量相同,则每个消费者各自消费一个分区,一个分区一个消费者

    2. 若消费组内只有一个消费者,则该消费者需要消费所有分区,因为主题的完整消息时各分区消息的总和

      假如主题分区数为 N,消费组内消费者数量为 M,且M > N ,可以肯定是组内有 M - N 个消费者无法消费主题。

     

    (3) Kafka 常见使用场景

    1. 消息传输:即用作消息中间件

    2. 行为日志跟踪:

      Kafka 最早就是用于重建用户行为数据追踪系统的。很多网站上的用户操作都会以消息的形式发送到Kafka 的某个对应的topic 上。这些点击流蕴含了巨大的商业价值, 事实上,目前就有很多创业公司使用机器学习或其他实时处理框架来帮助收集并分析用户的点击流数据。鉴于这种点击流数据量是很大的, Kafka 超强的吞吐量特性此时就有了用武之地

    3. 审计数据收集:

      很多企业和组织都需要对关键的操作和运维进行监控和审计。这就需要从各个运维应用程序处实时汇总操作步骤信息进行集中式管理。在这种使用场景下,你会发现Kafka 是非常适合的解决方案,它可以便捷地对多路消息进行实时收集,同时由于其持久化的特性,使得后续离线审计成为可能。

    4. 日志收集:

      这可能是Kafka 最常见的使用方式了一一日志收集汇总解决方案。每个企业都会产生大量的服务日志,这些日志分散在不同的机器上。我们可以使用Kafka 对它们进行全量收集,井集中送往下游的分布式存储中(比如HDF S 等) 。比起其他主流的日志抽取框架Kafka 有更好的性能,而且提供了完备的可靠性解决方案,同时还保持了低延时的特点。

    5. 流处理:

      很多用户接触到Kafka 都是因为它的消息队列功能。自0.10.0.0 版本开始, Kafka 社区推出了一个全新的流式处理组件Kafka Streams 。这标志着Kafka 正式进入流式处理框架俱乐部。相比老牌流式处理框架Apache Storm 、Apache Samza,或是最近风头正劲的Spark Strearnir毡,抑或是Apache Flink, Kafka Streams 的竞争力如何?让我们拭目以待。


     

    二. Kafka工作原理

    (1) 重要术语解释

    1. broker: Kafka把服务器的物理机称为 broker

    2. topic: 发布订阅的消息模式中对消息的分类, 对应某个业务需求的消息。

    3. partition: kakfa在保存主题消息数据时对主题的划分,每个partition分别保存主题的一部分数据,所有分区的数据的总和就是主题的完整消息。

    4. leader & follower: 相当于 master 和 slaver的关系,分别代表分布式系统中的主节点和从节点。当主题的分区有多个副本(replication)时,有且仅有一个replication当选leader,其它的均为follower, follower的数据的直接来源是leader而不是生产者。

    5. replication:分区的备份,当leader节点挂了后, 从replica中选举出新的leader。Kafka中消息的读写都是分区的leader完成的,replica 只通过向leader fench数据保存备份并在leader宕机后从新当选leader,来保证高可用性。

    6. offset:生产者和消费者在写和读数据的时候,对消息写读进度的记录。Kafka服务器将消息数据保存在磁盘log文件上,采用对磁盘的append顺序写读的方式,offset相当于顺序写读的偏移量

    7. 消费组:消费者使用一个消费者组名(即group.id )来标记自己, topic 的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。kafka默认所有消费都使用消费组来消费。

    8. ISR: ISR 的全称是in-sync replica,翻译过来就是与leader replica 保持同步的replica 集合,只有这个集合中的replica 才能被选举为leader,也只有该集合中所有replica 都接收到了同一条消息, Kafka 才会将该消息置于“己提交”状态。

    (2) 消息生产

      生产者在连接kafka服务器的时候一般都会指定如下参数, 通过如下参数的设定来创建KafkaProducer对象,然后使用该producer对象来发送消息。

    1 props.put("bootstrap.servers", "10.118.65.203:9092");
    2 props.put("acks", "all");
    3 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    4 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

      其中当 bootstrap.servers 参数用来指定连接服务器的 地址与端口号, 通常kafka服务器会有多个broker, 该参数只需要指定其中的一个或者几个即可,连接上kafka的任意broker之后可以在zookeeper中的 /brokers/ids/ 下找到所有的id 以及  id对应的主机地址及端口号。

      生产者连接上broker之后, 能够得到所有的broker 的id 及地址端口, 但一个生产者默认情况下只能写该 topic 下的一个partition,这时如果生产者在发送的 ProduceRecord 中指定了消息的 key, kafka会更具该key 来自行计算该写入的partition编号。若生产者在建立连接后发送消息时未指定消息的key 值,可以通过自定义实现Partitioner接口的自定义类来制定写partion编号的规则。然后只需要在连接broker-list 时指定一个"partitioner.class"参数,该参数传自定义类的全路径名,类中覆盖接口的partition方法即可.一种分区策略如下:

     1     @Override
     2     public int partition(String topic, Object keyObj, byte[] keyBytes,
     3                         Object value, byte[] valueBytes, Cluster cluster) {
     4         String key  = (String) keyObj;
     5         List partitiOnInfos= cluster.availablePartitionsForTopic(topic);
     6 
     7         int partitiOnCount= partitionInfos.size();
     8         int myPartition = (1 == partitionCount) ? partitionCount : partitionCount - 1;
     9         boolean cOndition=  (key == null || key.isEmpty() || !key.contains("my"));
    10         return condition ? random.nextInt(partitionCount - 1): myPartition;
    11     }

      若生产者在建立连接时并未指定 partitioner.class 发消息时候也没有指定key, 这时默认情况下kafka会以round robin的机制选择该topic下的分区。

     
      生产者向主题的某个分区写数据, 是向该主题的该分区的leader写的数据,而不向该分区的follower写,但是可以通过设置参数让生产者写leader的同时follower也得到同步,这个参数就是前面提到的"acks"参数, 该参数的取值及对应的写流程如下:
      ① acks = 0            :生产者向leader发送完消息之后, leader不向producer发送生产者发送确认信息
           ② acks = 1            :生产者向leader发送完消息之后, leader无需等follower备份数据,将数据写入本地log后 直接向生产者发送确认信息
           ③ acks = -1或者all :生产者向leader发送完消息之后,leader将数据写入本地log后 ,还需要继续等follower写好各自备份并向leader 发送ack之后才向生产者发送ack。
      当acks = -1 时 , 生产者写数据的流程可参考下图:
          

    (3) 消息消费

       消费者客户端在连接服务器创建consumer对象时,通常需要设置以下四个参数:

    1 props.put("bootstrap.servers", "10.118.65.203:9092");
    2 props.put("group.id", "test");
    3 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    4 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

      以上参数是没有默认值的,需要用户自行指定。其中key,value 的解序列化类要求与生产者指定的序列化类对应。如果消费者不指定groupId,Kafka会自动的为该消费者实例生成一个groupId。

      

      消费者客户端在消费消息时会维护一个offset, 该offset就是当前消费者消费到分组-主题-分区下的什么位置的记录。例如当消费者A在消费完第N条消息后,自动或者手动的,消费者A会向kafka服务器提交一次位移,(注意这里是N,因为offset从0开始计数,属于第N+1条消息了),该offset会提交到log.dirs指定的路径下中的某一个__consumer_offsets中(如下图),这里的__consumer_offsets 其实也是kafka自己创建的一个主题,__consumer_offsets-n 路径里面保存的也是index. log 文件。默认情况下 kafka为该__consumer_offsets创建了50个分区。用来保存多个主题,多个分区,以及多个组的场景下的消费者位移。如下图

      

      kafka服务器在__consumer_offsets 主题下,实际保存的是消费者提交过来的offset的键值对,其中key是 group.id + topic + 分区号, value 为offset的实际取值。每当更新一个key的最新的offest时,该topic就会写入一条含有最新offset的消息,同时kafka也会定期的对topic做清理,即为每个消息key只保存含有最新offset。这样每次消费者在读取消息之前会先读取自己的offset,然后再根据offset的值来读取订阅主题的topic消息,即使在消费者服务器启动时没有指定offset的值也能自动的从上一次消费的地方开始消费。

       

      kafka消费者在消费消息时如果不指定group.id,默认会为单个的消费组指定消费组的ID, 这种意义上来讲,kakfa消费者一定会通过组来消费。之所以这样要求,原因在于消费者组是kafka实现消息队列消费和消费发布订阅两种模式的重要设计,同时从性能角度上来考虑,消费者组也可以提高消费消息的并发能力,且可以实现访问集群的高伸缩和容错能力。
                   消费组内消费者的消费策略是:
                   ①:对于同一个group ,topic的每条消息只能被发送到group下的一个consumer 实例上
                   ②:topic消息可以被发送到多个group 中
                   ③:一个consumer group 可以有若干个consumer 实例
                   
                   首先基于以上消费策略,对消费组内的消费者而言,topic的每条消息都会且仅会被一个消费者消费,这正是点对点模式。而对于不同group , 各个group都可以消费到主题的所有消息。而如果每个group都只包含一个消费者,topic的消息所有消息就会被每一个消费者消费到,这就是发布订阅模式。
                   
                   其次考虑一个消费组内有多个消费者的情况,对于并发量很大的业务而言,如果只采用单台服务器作为消费者,有可能给服务器造成太大的压力从而影响其他业务逻辑的执行,消费组内各消费者通过分别访问不重复的分区可以有效的将访问压力分摊到不同的服务器上。对于很多对顺序要求不高的业务场景可以有效的实现负载均衡,就算业务场景要求按顺序消费也可以通过在主题下为消息分类使用不同的key 或者使用自定义partitioner 来实现带负载均衡的分场景有序消费。
     
                   最后消费组可以实现访问集群的高伸缩和容错能力, 这依赖于消费组的一种叫做rebalance 机制,rebalance 本质上也是一种协议,它规定了一个消费组是如何达成一致来分配订阅topic的所有分区。假设某组有4个consumer 的实例,该组订阅了一个包含20个分区的topic。 正常情况下,kafka会为每个consumer平均分配5个分区。这个分配的过程就称为rebalance(重平衡)。对每个组而言,kafka的某个broker会被选举为组协调者(coordinator),由该coordinator 负责对组的状态进行管理, 它的主要职责就是当新成员到达时促成组内所有成员达成新的分区分配方案。
                    消费组内触发rebalance 的条件有以下几个:
                    ①:组成员发生变更,如新的consumer加入组,或者已有的consumer主动离开组,或者已有的consumer崩溃都会触发rebalance。
                    ②:组订阅的topic 数发生变更,比如使用正则表达式的订阅,当能匹配正则表达式的新topic被创建时就会触发rebalance.
                    ③:   组订阅tipic的分区数发生变更,比如使用命令行脚本增加了订阅topic的分区数。
     
                    真实场景中引发rebalance最常见的就是第一种情况,特别是consumer崩溃的情况,这里的崩溃不一定是consumer进程挂掉,或者consumer服务器宕机,当consumer 无法指定在时间内完成消息的处理,coordinator就认为该consumer已经崩溃,这样也会引发新一轮的rebalance。
                   例如:如果某个消费组group 中包含 5 个consumer, 且消费组订阅了一个有20个分区的topic,按kafka组消费策略,这5个consumer会不带重复的消费这20个分区,每个consumer消费4个分区。现在假设consumer-2 在消费消息的时候由于执行业务逻辑时间超时, 导致组协调者coordinator  认为消费组服务崩溃,这时coordinator  就会对剩下的consumer 进行重rebalance , rebalance 之后,每个consumer 消费5个分区,主题中的所有分区均会被消费到。现在经过一段时间的逻辑执行 consumer-2 又恢复了连接,这个时候coordinator  也会检测到并对所有消费组成员进行新一轮的rebalance。 这样我们就可以通过对消费组的重平衡管理消费者对分区的消费,从而实现服务消费的伸缩。
     

    (4) 消息存储

      Kafka采用将每个分区的消息数据写入磁盘文件的方式来存储, 在config/server.properties 文件中log.dir 指定的路径下,我们可以找到 [topic名-分区ID]格式的路径,选择任意一个路径进入可以看到如下文件列表:

       

      由图可以看到四个文件,其中一个log文件, 两个index文件 和 一个epoch 文件;其中的log文件就是用来记录消息数据的,两个index文件用来对log文件的中的数据建立索引,方便消费者快速读取到需要消费的数据。

      此外我们看到index 和 log文件的文件名是全数字表示的,这是因为默认情况下kafka主题的分区的leader会将数据顺序的存储到log文件中。但是kafka并不是将所有的数据都存到一个log文件中,而是将数据顺序的存入分段的log文件中,每个段(segment) 默认分配的大小为500M(可以设定),当一个segment数据满了之后会创建下一个segment的log文件, 然后在新的segment中继续顺序的保存数据。
     
      
             自0.11.0.0 版本之后, kafka 保存消息时采用了如下图的消息格式. 与此同时还引入消息集合(batch)的概念。单条消息及消息batch的格式如下图两图
                                V2 版本消息格式
      

                                Kafka 消息集格式

      

      

      消息中各分区的含义如下:
              1. 消息总长度: 在计算出消息总字节数后, 会将消息的总长度保存在该区域
              2. 属性:采用固定1 字节保存属性
              3. 时间戳增量:保存相对于 batch起始时间戳的差值(之前是采用固定长度的8字节来保存)
              4. 位移增量:保存消息位移与外层batch起始位移的差值(之前是采用固定长度的8字节来保存 )
              5. key长度: key的长度(key的长度是一个可变长度)
              6. key值:实际key的内容
              7. value长度: value的长度(value的长度也是一个可变长度)
              8. value值: 实际value的内容
              9. header个数:增加头部信息用来做集群间的消息路由的用途,或者用来承载消息的一些特定元数据信息,一般用不到
              10. header内容: 若无则不占用字节

       此外有消息集的格式可以看出,消息集的实际长度 = 61 + 消息长度 。因此我们可以简单的验证一下消息的数据存储是否符合上述描述。

      

    # 创建secondTopic 主题, 设定2个分区
    bin/kafka-topics.sh --create --topic secondTopic --zookeeper localhost:2181 --partitions 2 --replication-factor 1

      生产者向secondTopic 发送消息前的状态:

      

      开启控制台生产者,发送字符串 "1234567"  , 之后再发送 "hello" ,得到两个分区的log文件大小截图如下:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic secondTopic

      

      从发送时间先后来看,显然第一次发的"1234567" 保存在了分区0,第二次发的保存在分区1,第二次比第一次少了2个字节。根据上面分析的消息集大小计算方式可得"1234567" 保存在刚创建的消息集中的大小为 = 消息体大小 + 61。

      消息体大小 = 1(属性) + 1(时间戳增量) + 1(位移增量) + 1(key 长度) + 1(value 长度) + 7(value内容) + 1(header个数) +1(消息总字节数,需要计算才能确定字节数) = 14 , 因此计算的理论消息集的大小就是 14 + 61 = 75. 可以看到与实际存入log文件字节数一致。

       事实上采用消息集在消息并发量较大时可以有效节省消息存储空间,并且为消息的查询带来便利。

     

     


    三. Kafka的应用 (Demo 及 API介绍)

    (1) Kafka 集群服务搭建

      kafka环境的搭建十分简单,只需要简单的配置即可让服务运行起来;可以分两步

      1. zookeeper 环境搭建:

       ① zookeeper下载:https://www.apache.org/dyn/closer.cgi/zookeeper/(镜像地址)

       ②  zk下载后分别保存到 /opt/bigdata/zookeeper 路径下,解压后修改zookeeper 配置文件 zoo_sample.cfg 重命名为 zoo.cfg 

       ③  编辑zoo.cfg 文件, 加入以下配置

    dataDir=/tmp/data/zookeeper
    server.1=ubuntu:2888:3888
    server.2=ubuntu2:2888:3888
    server.3=ubuntu3:2888:3888

      在三台服务上的上述 dataDir 路径分别保存一个myid文件,文件中分别保存上述配置中主机名对应前面的server的ID即(1,2,3); 然后分别在三台服务器上启动zookeeper。

      

      2. Kafka 环境搭建

      ① 下载地址:http://kafka.apache.org/downloads 选择 kafka_2.12-1.0.2 版本 (下划线后面的2.12为 scala 语言的版本, 1.0.2 为kafka版本)

      ②  修改kafka 解压路径下 config/server.properties 文件

    zookeeper.cOnnect=ubuntu:2181,ubuntu2:2181,ubuntu3:2181

      ③ 至此就可以启动kafka服务器了

    ./kafka-server-start.sh -daemon ../config/server.properties

       ④ 指令指定了执行守护进程,因此启动成功后看不到任何结果, 可以查看9092端口号是否在监听,或者适应jps指令查看是否有kafka服务;接下来就是创建kafka主题了(默认副本数不能大于broker数)

    bin/kafka-topics.sh --create --topic secondTopic --zookeeper ubuntu:2181 --partitions 2 --replication-factor 1

     ⑤ 启动生产者向该主题写消息, (控制台生产者只能发送消息的value ,无法发送key)

    bin/kafka-console-producer.sh --broker-list ubuntu:9092 --topic secondTopic

     ⑥ 启动消费者消费消息

    bin/kafka-console-consumer.sh --topic secondTopic --bootstrap-server ubuntu:9092 --from-beginning

      其中控制台消费组启动的指令中的参数  --bootstrap-server 在老版本中使用的是  --zookeeper host:post, 但是自从消费组位移offset信息不再保存到zookeeper之后,消费者不用再连接zookeeper,而改为直接连接kafka集群。

      下面介绍java 工程连接Kafka服务器实现生产与消费的简单实现。

    (2) Kafka 生产与消费

       客户端连接Kafka以及Zookeeper 实现生产者的发送以及消费者的拉取消费,需要引入如下Maven依赖:

     1         
     2         <dependency>
     3             <groupId>org.apache.curatorgroupId>
     4             <artifactId>curator-clientartifactId>
     5             <version>4.0.1version>
     6         dependency>
     7         
     8         <dependency>
     9             <groupId>org.apache.curatorgroupId>
    10             <artifactId>curator-frameworkartifactId>
    11             <version>4.0.1version>
    12         dependency>
    13         
    14         <dependency>
    15             <groupId>org.apache.curatorgroupId>
    16             <artifactId>curator-recipesartifactId>
    17             <version>4.0.1version>
    18         dependency>
    19 
    20         
    21         <dependency>
    22             <groupId>org.apache.kafkagroupId>
    23             <artifactId>kafka_2.12artifactId>
    24             <version>${kafka.version}version>
    25         dependency>
    26 
    27         
    28         <dependency>
    29             <groupId>org.codehaus.jacksongroupId>
    30             <artifactId>jackson-mapper-aslartifactId>
    31             <version>1.9.13version>
    32         dependency>

      生产者端高级API 实现:

     1       static private final String TOPIC = "firstTopic";
     2       static private final String BROKER_LIST = "192.168.0.102:9092";
     3              ....
     4     
     5         Properties props = new Properties();
     6         props.put("bootstrap.servers",BROKER_LIST);
     7         props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
     8         props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
     9 
    10         // acks 指定了 partition 中leader broker 在接收到producer 的消息后必须写入的 副本数; acks 通常可能的取值有 0,1,all(-1)
    11         //    acks = 0  则表示producer 完全不理睬 leader broker 的处理结果, 在发送完一条消息后不等待leader broker 的返回结果就开始下一次发送
    12         //          由于不等待发送结果得  通常这种方式可以有效提高producer的吞吐率;同时如果发送失败了 producer是不知道的
    13         //    acks = 1 表示设置 leader broker 在接收到producer 的消息并将消息写入本地日志,就可以发送响应结果给producer
    14         //          而无需等待其它ISR中的副本,这样只要leader broker 一直存活,kafka 就能够保证这一条消息不丢失
    15         //    acks = -1(all)  表示 leader broker 在接收到producer 的消息之后 不经需要将记录写入本地日志,同时还要将记录写入ISR中所有的其它成员
    16         //          才会向 producer发送响应结果; 这样只要ISR中存在一个存活的副本,消息记录就不会丢失; 当副本数较多的 producer的吞吐量将变得较低
    17         props.put("acks","1");
    18         // 由于网络抖动或者leader选举等原因, producer 发送的消息可能会失败,可以在properties 参数中设置producer的重发次数
    19         //  retries = 0 表示不做重发; producer 认为的发送失败 有可能并不是真正的发送失败,而是在broker提交后发送响应给producer producer由于某种原因
    20         //  没有成功接收到, 这将导致producer 向broker 发送重复的消息,因此retries > 0 时需要consumer在消费时对消息采取去重处理
    21         props.put("retries","0");
    22         //  producer 将发往同一分区的多条消息封装进一个batch 中,当batch 满了的时候,producer 会发送batch中的所有消息
    23         //  可以通过 配置batch.size 来设置 batch 容量的大小; batch 过大占用过多内存,batch 过小
    24         props.put("batch.size","323840");
    25         // producer 在向broker发送消息时如果是等到 batch已经满了再发送 有可能因为 producer的吞吐量比较小,batch需要等较长时间才能满
    26         // 这个时候如果等待就会话较长时间, linger.ms 参数就是用来设置这种消息发送延时的行为的,linger 设置的较大会让生产者发送消息的延时变大
    27         // linger 设置的较小会让生产者发送消息的吞吐量变小, 吞吐量和延时之间存在矛盾 需要权衡设置
    28         props.put("linger.ms",2);
    29         // buffer.memory 指定producer 用户缓冲消息的内存大小,
    30         props.put("buffer.memory",33554432);
    31         // 设置单条消息最大大小
    32         props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1024*1024);
    33         // 设置请求超时时间,producer 向 broker发送消息后 等待时长,如果超过这个时长 producer就会认为响应超时了
    34         props.put("max.block.ms",3000);
    35 
    36         // 指定使用 topic 下的哪一个分区
    37         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.partitioner.MyPartitioner");
    38         Producer producer = new KafkaProducer<>(props);
    39 
    40         // 使用 producer 发送后的回调函数 做后续处理
    41 
    42         // 测试 对topic设定partition
    43         ProducerRecord record = new ProducerRecord<>(TOPIC,"my non-test","partition setting");
    44         producer.send(record);
    45 
    46         producer.close();

      消费者高级API实现:

     1      private static final String topicName = "firstTopic";
     2       private static final String groupId = "group1";
     3            ....
     4 
     5      Properties props = new Properties();
     6         // server, group.id, key.deserializer, value.deserializer四个参数无默认值,必须配置
     7         // 注意这里 服务器地址配置的 主机名:端口号, 需要在研发环境修改hosts 文件
     8         props.put("bootstrap.servers","ubuntu1:9092");
     9         props.put("group.id",groupId);
    10         props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    11         props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    12         // 是否允许consumer 位移自动提交
    13         props.put("enable.auto.commit","true");
    14         // consumer 位移自动提交时间间隔
    15         props.put("auto.commit.interval.ms","1000");
    16         // auto.offset.reset 设置为 earliest 指定从最早的位移开始消费,但是如果之前有位移提交,则启动时从位移提交处开始消费
    17         // auto.offset.reset 通常还可以设置为 latest, 设置为latest 指的从最新处位移开始消费
    18         props.put("auto.offset.reset","earliest");
    19 
    20         KafkaConsumer cOnsumer= new KafkaConsumer(props);
    21         consumer.subscribe(Arrays.asList(topicName));
    22 
    23         try {
    24             while(true){
    25                 ConsumerRecords records = consumer.poll(2000);
    26                 for(ConsumerRecord record : records){
    27                     System.out.printf("订阅消息 offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());
    28                 }
    29             }
    30         } catch (Exception e) {
    31             e.printStackTrace();
    32         } finally {
    33             consumer.close();
    34         }

      以上生产者与消费者端的实现虽然简单,但是在很多业务场景下是不满满足需求的,需要我们使用更多定制化的开发,譬如生产者如何设定分区规则,消费什么时候提交位移,这些后续文章再做进一步研究。

     


推荐阅读
  • flowable工作流 流程变量_信也科技工作流平台的技术实践
    1背景随着公司业务发展及内部业务流程诉求的增长,目前信息化系统不能够很好满足期望,主要体现如下:目前OA流程引擎无法满足企业特定业务流程需求,且移动端体 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 本文介绍了操作系统的定义和功能,包括操作系统的本质、用户界面以及系统调用的分类。同时还介绍了进程和线程的区别,包括进程和线程的定义和作用。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • Imtryingtofigureoutawaytogeneratetorrentfilesfromabucket,usingtheAWSSDKforGo.我正 ... [详细]
  • RouterOS 5.16软路由安装图解教程
    本文介绍了如何安装RouterOS 5.16软路由系统,包括系统要求、安装步骤和登录方式。同时提供了详细的图解教程,方便读者进行操作。 ... [详细]
author-avatar
houxue
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有