热门标签 | HotTags
当前位置:  开发笔记 > 前端 > 正文

Javakafka如何实现自定义分区类和拦截器

这篇文章主要介绍了Javakafka如何实现自定义分区类和拦截器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,CustomPartitioner实现Partitioner

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic 当前的发送的topic
   * @param key  当前的key值
   * @param keyBytes 当前的key的字节数组
   * @param value 当前的value值
   * @param valueBytes 当前的value的字节数组
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //这边根据返回值就是分区号, 这边就是固定发送到三号分区
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map configs) {

  }

}

2、producer配置文件指定,具体的分区类

// 具体的分区类
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer拦截器

拦截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.ProducerInterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor {

  @Override
  public void configure(Map configs) {
    System.out.println("这是MessageInterceptor的configure方法");
  }

  /**
   * 这个是消息发送之前进行处理
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord onSend(ProducerRecord record) {
    // 创建一个新的record,把uuid入消息体的最前部
    System.out.println("为消息添加uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   * 这个是生产者回调函数调用之前处理
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close 方法");
  }
}

2、定义计数拦截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map configs) {
    System.out.println("这是CounterInterceptor的configure方法");
  }

  @Override
  public ProducerRecord onSend(ProducerRecord record) {
    System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    // 统计成功和失败的次数
    System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    // 保存结果
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}

3、producer客户端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "localhost:9092");
    // 等待所有副本节点的应答
    props.put("acks", "all");
    // 消息发送最大尝试次数
    props.put("retries", 0);
    // 一批消息处理大小
    props.put("batch.size", 16384);
    // 请求延时,可能生产数据太快了
    props.put("linger.ms", 1);
    // 发送缓存区内存大小,数据是先放到生产者的缓冲区
    props.put("buffer.memory", 33554432);
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // 具体的分区类
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //定义拦截器
    List interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer producer = new KafkaProducer<>(props);
    for (int i = 0; i <1; i++) {
      producer.send(new ProducerRecord("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("这是producer回调函数");
        }
      });
    }
    /*System.out.println("现在执行关闭producer");
    producer.close();*/
    producer.close();
  }
}

总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:

(1)执行A的configure方法,执行B的configure方法

(2)执行A的onSend方法,B的onSend方法

(3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)执行producer自身的callback回调函数。

(5)执行A的close方法,B的close方法。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • window下kafka的安装以及测试
    目录一、安装JDK(需要安装依赖javaJDK)二、安装Kafka三、测试参考在Windows系统上安装消息队列kafka一、安装JDKÿ ... [详细]
  • 构建Filebeat-Kafka-Logstash-ElasticSearch-Kibana日志收集体系
    本文介绍了如何使用Filebeat、Kafka、Logstash、ElasticSearch和Kibana构建一个高效、可扩展的日志收集与分析系统。各组件分别承担不同的职责,确保日志数据能够被有效收集、处理、存储及可视化。 ... [详细]
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • HBase运维工具全解析
    本文深入探讨了HBase常用的运维工具,详细介绍了每种工具的功能、使用场景及操作示例。对于HBase的开发人员和运维工程师来说,这些工具是日常管理和故障排查的重要手段。 ... [详细]
  • 在本周的白板演练中,Apache Flink 的 PMC 成员及数据工匠首席技术官 Stephan Ewen 深入探讨了如何利用保存点功能进行流处理中的数据重新处理、错误修复、系统升级和 A/B 测试。本文将详细解释保存点的工作原理及其应用场景。 ... [详细]
  • 本文详细介绍了 Flink 和 YARN 的交互机制。YARN 是 Hadoop 生态系统中的资源管理组件,类似于 Spark on YARN 的配置方式。我们将基于官方文档,深入探讨如何在 YARN 上部署和运行 Flink 任务。 ... [详细]
  • 利用决策树预测NBA比赛胜负的Python数据挖掘实践
    本文通过使用2013-14赛季NBA赛程与结果数据集以及2013年NBA排名数据,结合《Python数据挖掘入门与实践》一书中的方法,展示如何应用决策树算法进行比赛胜负预测。我们将详细讲解数据预处理、特征工程及模型评估等关键步骤。 ... [详细]
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
  • 本文探讨了2019年前端技术的发展趋势,包括工具化、配置化和泛前端化等方面,并提供了详细的学习路线和职业规划建议。 ... [详细]
  • 本文深入探讨了CART(分类与回归树)的基本原理及其在随机森林中的应用。重点介绍了CART的分裂准则、防止过拟合的方法、处理样本不平衡的策略以及其在回归问题中的应用。此外,还详细解释了随机森林的构建过程、样本均衡处理、OOB估计及特征重要性的计算。 ... [详细]
  • Kafka Topic 数据管理与清理策略
    本文探讨了在生产环境中如何有效管理和定期清理Kafka Topic中的数据。介绍了基于时间、日志大小和日志起始偏移量三种清除方式,并重点讲解了基于时间的清除策略及其配置方法。 ... [详细]
  • 一面问题:MySQLRedisKafka线程算法mysql知道哪些存储引擎,它们的区别mysql索引在什么情况下会失效mysql在项目中的优化场景&# ... [详细]
  • 深入理解Kafka架构
    本文将详细介绍Kafka的内部工作机制,包括其工作流程、文件存储机制、生产者与消费者的具体实现,以及如何通过高效读写技术和Zookeeper支持来确保系统的高性能和稳定性。 ... [详细]
  • 58同城的Elasticsearch应用与平台构建实践
    本文由58同城高级架构师于伯伟分享,由陈树昌编辑整理,内容源自DataFunTalk。文章探讨了Elasticsearch作为分布式搜索和分析引擎的应用,特别是在58同城的实施案例,包括集群优化、典型应用实例及自动化平台建设等方面。 ... [详细]
  • 获得头条Offer后,我感激的七个技术公众号
    是否感觉订阅的公众号过多,浏览时缺乏目标性,未能获取实质性的知识?本文将介绍如何精简公众号列表,提升信息吸收效率,并推荐几个高质量的技术公众号。 ... [详细]
author-avatar
我爱你2602912303
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有