热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

JavaAPI方式调用Kafka各种协议的方法

众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建t

众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议  这套协议的具体使用流程为:

1.客户端创建对应协议的请求

2.客户端发送请求给对应的broker

3.broker处理请求,并发送response给客户端

虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。 需要提前说明的是,本文给出的范例并没有考虑Kafka集群开启安全的情况。另外Kafka的KIP4应该一直在优化命令行工具以及各种管理操作,有兴趣的读者可以关注这个KIP。

本文中用到的API依赖于kafka-clients,所以如果你使用Maven构建的话,请加上:


  org.apache.kafka
  kafka-clients
  0.10.2.0

如果是gradle,请加上:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

底层框架

/**
   * 发送请求主方法
   * @param host     目标broker的主机名
   * @param port     目标broker的端口
   * @param request    请求对象
   * @param apiKey    请求类型
   * @return       序列化后的response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }

  /**
   * 发送序列化请求并等待response返回
   * @param socket      连向目标broker的socket
   * @param request      序列化后的请求
   * @return         序列化后的response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }

  /**
   * 发送序列化请求给socket
   * @param socket      连向目标broker的socket
   * @param request      序列化后的请求
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }

  /**
   * 从给定socket处获取response
   * @param socket      连向目标broker的socket
   * @return         获取到的序列化后的response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] respOnse= new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }

  /**
   * 创建Socket连接
   * @param hostName     目标broker主机名
   * @param port       目标broker服务端口, 比如9092
   * @return         创建的Socket连接
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }

  /**
   * 向给定socket发送请求
   * @param request    请求对象
   * @param apiKey    请求类型, 即属于哪种请求
   * @param socket    连向目标broker的socket
   * @return       序列化后的response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] respOnse= issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer respOnseBuffer= ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

有了这些方法的铺垫,我们就可以创建具体的请求了。

创建topic

/**
   * 创建topic
   * 由于只是样例代码,有些东西就硬编码写到程序里面了(比如主机名和端口),各位看官自行修改即可
   * @param topicName       topic名
   * @param partitions      分区数
   * @param replicationFactor   副本数
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map topics = new HashMap<>();
    // 插入多个元素便可同时创建多个topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creatiOnTimeoutMs= 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer respOnse= send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

查看位移

/**
   * 获取某个consumer group下的某个topic分区的位移
   * @param groupID      group id
   * @param topic       topic名
   * @param parititon     分区号
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer respOnse= send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitiOnData= resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }
/**
   * 获取某个consumer group下所有topic分区的位移信息
   * @param groupID      group id
   * @return         (topic分区 --> 分区信息)的map
   * @throws IOException
   */
  public Map getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer respOnse= send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, 上面就是“创建topic”和“查看位移”的样例代码,各位看官可以参考着这两个例子构建其他类型的请求。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • 本文详细介绍了如何在Linux系统上安装和配置Smokeping,以实现对网络链路质量的实时监控。通过详细的步骤和必要的依赖包安装,确保用户能够顺利完成部署并优化其网络性能监控。 ... [详细]
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 网络运维工程师负责确保企业IT基础设施的稳定运行,保障业务连续性和数据安全。他们需要具备多种技能,包括搭建和维护网络环境、监控系统性能、处理突发事件等。本文将探讨网络运维工程师的职业前景及其平均薪酬水平。 ... [详细]
  • 深入解析 Apache Shiro 安全框架架构
    本文详细介绍了 Apache Shiro,一个强大且灵活的开源安全框架。Shiro 专注于简化身份验证、授权、会话管理和加密等复杂的安全操作,使开发者能够更轻松地保护应用程序。其核心目标是提供易于使用和理解的API,同时确保高度的安全性和灵活性。 ... [详细]
  • 探讨了小型企业在构建安全网络和软件时所面临的挑战和机遇。本文介绍了如何通过合理的方法和工具,确保小型企业能够有效提升其软件的安全性,从而保护客户数据并增强市场竞争力。 ... [详细]
  • 本文详细介绍如何通过修改配置文件来隐藏Apache、Nginx和PHP的版本号,从而增强网站的安全性。我们将提供具体的配置步骤,并解释这些设置的重要性。 ... [详细]
  • 本文详细介绍如何利用已搭建的LAMP(Linux、Apache、MySQL、PHP)环境,快速创建一个基于WordPress的内容管理系统(CMS)。WordPress是一款流行的开源博客平台,适用于个人或小型团队使用。 ... [详细]
  • PHP 过滤器详解
    本文深入探讨了 PHP 中的过滤器机制,包括常见的 $_SERVER 变量、filter_has_var() 函数、filter_id() 函数、filter_input() 函数及其数组形式、filter_list() 函数以及 filter_var() 和其数组形式。同时,详细介绍了各种过滤器的用途和用法。 ... [详细]
  • 在成功安装和测试MySQL及Apache之后,接下来的步骤是安装PHP。为了确保安全性和配置的一致性,建议在安装PHP前先停止MySQL和Apache服务,并将MySQL集成到PHP中。 ... [详细]
  • 阿里云ecs怎么配置php环境,阿里云ecs配置选择 ... [详细]
  • Netflix利用Druid实现高效实时数据分析
    本文探讨了全球领先的在线娱乐公司Netflix如何通过采用Apache Druid,实现了高效的数据采集、处理和实时分析,从而显著提升了用户体验和业务决策的准确性。文章详细介绍了Netflix在系统架构、数据摄取、管理和查询方面的实践,并展示了Druid在大规模数据处理中的卓越性能。 ... [详细]
  • 本文深入探讨了MySQL中常见的面试问题,包括事务隔离级别、存储引擎选择、索引结构及优化等关键知识点。通过详细解析,帮助读者在面对BAT等大厂面试时更加从容。 ... [详细]
  • 构建Filebeat-Kafka-Logstash-ElasticSearch-Kibana日志收集体系
    本文介绍了如何使用Filebeat、Kafka、Logstash、ElasticSearch和Kibana构建一个高效、可扩展的日志收集与分析系统。各组件分别承担不同的职责,确保日志数据能够被有效收集、处理、存储及可视化。 ... [详细]
author-avatar
Mr-Leo-Chan
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有