热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink引出的kafka不同版本的兼容性Kafka协议兼容性改进

参考:官网协议介绍:http:kafka.apache.orgprotocol.html#The_Messages_Fetchkafka协议兼容性  http:www.cnblog

参考:

官网协议介绍:http://kafka.apache.org/protocol.html#The_Messages_Fetch

kafka协议兼容性  http://www.cnblogs.com/huxi2b/p/6784795.html 

 最近在使用flink的时候,在flink的官网对flink-connect-kafka有这样的一个版本对照:

Maven Dependency Supported since Consumer and 
Producer Class name
Kafka version Notes
flink-connector-kafka-0.8_2.11 1.0.0 FlinkKafkaConsumer08
FlinkKafkaProducer08
0.8.x Uses the SimpleConsumer API of Kafka internally. Offsets are committed to ZK by Flink.
flink-connector-kafka-0.9_2.11 1.0.0 FlinkKafkaConsumer09
FlinkKafkaProducer09
0.9.x Uses the new Consumer API Kafka.
flink-connector-kafka-0.10_2.11 1.2.0 FlinkKafkaConsumer010
FlinkKafkaProducer010
0.10.x This connector supports Kafka messages with timestamps both for producing and consuming.
flink-connector-kafka-0.11_2.11 1.4.0 FlinkKafkaConsumer011
FlinkKafkaProducer011
0.11.x

Since 0.11.x Kafka does not support scala 2.10. This connector

supports Kafka transactional messaging to provide exactly once semantic for the producer.

flink-connector-kafka_2.11 1.7.0 FlinkKafkaConsumer
FlinkKafkaProducer
>= 1.0.0

This universal Kafka connector attempts to track the latest version of the Kafka client.

The version of the client it uses may change between Flink releases.

Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later.

However for Kafka 0.11.x and 0.10.x versions, we recommend using dedicated

flink-connector-kafka-0.11_2.11 and flink-connector-kafka-0.10_2.11 respectively.

Attention: as of Flink 1.7 the universal Kafka connector is considered to be in a BETA 
status and might not be as stable as the 0.11 connector. In case of problems with
the universal connector, you can try to use flink-connector-kafka-0.11_2.11 which s
hould be compatible with all of the Kafka versions starting from 0.11.

就是每个版本的flink,对应的flink-connector-kafka(010 是kafka版本,2.11 是scala版本,这里忽略scala版本) 的版本,和connector 对应的kafka 版本。

最后一行的意思是,flin 1.7.0 对应 flink-connector-kafka,然后对应的kafka 版本是 >= 1.0.0。

这样有个问题是:我们使用flink 1.7.2 ,connector 是 flink-connector-kafka_2.11,maven 下 connector 自动就拉了 kafka-client-2.0.1 下来,但是我们的kafka版本是1.0.0,所以就想用kafka-clents-1.0.0,高版本的当然也是可以的,不过总是觉得对应版本最好了(就像,小孩子,穿自己的衣服和穿他爸爸的衣服一样)。

flink引出的kafka不同版本的兼容性
    




Kafka协议兼容性改进

就对connector处理一下,排除了kafka-clients,重新引用了一个kafka-clents-1.0.0 的进来。如下:

<dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
            <version>${flink.version}version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafkagroupId>
                    <artifactId>kafka-clentsartifactId>
                exclusion>
            exclusions>
            <scope>providedscope>
        dependency>
        <dependency>
            <groupId>org.apache.kafkagroupId>
            <artifactId>kafka-clentsartifactId>
            <version>1.0.0version>
        dependency>

这样一个问题,就这么机智的解决了。

-----------------分割线------------------------

 以为这样就结束的看官,可能比较年轻,问题是永远解决不完的。

新的问题是这样的,我们需要读其他的kafka,版本是0.10.x,同时使用两个不同版本的kafka, WTF???

回忆一下整个事件:flink 1.7.2 对应的kafka connector 的kafka 默认版本是2.0.1,我们换成了 1.0.0,同样去读kafka 0.10.x,也是可以的(实测可以),这样一来,把kafka-clents 版本换了1.0.0 就没有太大的意义了(穿他爸的衣服和穿他哥的衣服,都差不多了),所以我们把之前的改动去掉了。

然后,这个事情看起来就很奇怪了:

  高版本的kafka-clent 可以读写 低版本的broker(2.0.1 -> 1.0.0)

  低版本的kafka-clent 可以读写 高版本的broker(0.10.x -> 1.0.0)

不得不说,kafka这么通用,唯一的理由,就是“好用

这个时候,看到这个博客: 0.10.2.0 版本的broker 支持的 Fetch(1)的 版本是: 0 to 3 【usable: 3 】

 

kafka client 和broker 的交互都是http协议(为什么是http可以看这里:Some Common Philosophical Questions (一些哲学问题)),kafka的每个api,都有个api key

The following are the numeric codes that the ApiKey in the request can take for each of the below request types.

NAME KEY
Produce 0
Fetch 1
ListOffsets 2
Metadata 3
LeaderAndIsr 4
StopReplica 5
UpdateMetadata 6
ControlledShutdown 7
OffsetCommit 8
OffsetFetch 9
FindCoordinator 10
JoinGroup 11
Heartbeat 12
LeaveGroup 13
SyncGroup 14
DescribeGroups 15
ListGroups 16
SaslHandshake 17
ApiVersions 18
CreateTopics 19
DeleteTopics 20
DeleteRecords 21
InitProducerId 22
OffsetForLeaderEpoch 23
AddPartitionsToTxn 24
AddOffsetsToTxn 25
EndTxn 26
WriteTxnMarkers 27
TxnOffsetCommit 28
DescribeAcls 29
CreateAcls 30
DeleteAcls 31
DescribeConfigs 32
AlterConfigs 33
AlterReplicaLogDirs 34
DescribeLogDirs 35
SaslAuthenticate 36
CreatePartitions 37
CreateDelegationToken 38
RenewDelegationToken 39
ExpireDelegationToken 40
DescribeDelegationToken 41
DeleteGroups 42
ElectPreferredLeaders 43

其中 fetch 的key 是: 1

fetch 请求和返回都是有固定格式的(不然也不认识),这就是kafka 内部的fetch 协议,不同的格式,就是不同的版本,以 fetch request (有请求协议,就有响应协议 )v0 和v1 举例

Fetch API (Key: 1):
Requests:
Fetch Request (Version: 0) => replica_id max_wait_time min_bytes [topics] 
  replica_id => INT32
  max_wait_time => INT32
  min_bytes => INT32
  topics => topic [partitions] 
    topic => STRING
    partitiOns=> partition fetch_offset partition_max_bytes 
      partition => INT32
      fetch_offset => INT64
      partition_max_bytes => INT32
Fetch Request (Version: 1) => replica_id max_wait_time min_bytes [topics] replica_id => INT32 max_wait_time => INT32 min_bytes => INT32 topics => topic [partitions] topic => STRING partitiOns=> partition fetch_offset partition_max_bytes partition => INT32 fetch_offset => INT64 partition_max_bytes => INT32 Responses: Fetch Response (Version: 0) => [responses] respOnses=> topic [partition_responses] topic => STRING partition_respOnses=> partition_header record_set partition_header => partition error_code high_watermark partition => INT32 error_code => INT16 high_watermark => INT64 record_set => RECORDS Fetch Response (Version: 1) => throttle_time_ms [responses] throttle_time_ms => INT32 respOnses=> topic [partition_responses] topic => STRING partition_respOnses=> partition_header record_set partition_header => partition error_code high_watermark partition => INT32 error_code => INT16 high_watermark => INT64 record_set => RECORDS

注: 不要找了,我也没找到reques 0 和 1 有什么区别。

  更多协议详情 查看官网介绍

执行  kafka-broker-api-versions.sh 查看服务端的版本:

./bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 (我的kafka版本是:kafka_2.11-2.2.0)
localhost:9092 (id: 0 rack: null) -> (
    Produce(0): 0 to 7 [usable: 7],
    Fetch(1): 0 to 10 [usable: 10],
    ListOffsets(2): 0 to 5 [usable: 5],
    Metadata(3): 0 to 7 [usable: 7],
    LeaderAndIsr(4): 0 to 2 [usable: 2],
    StopReplica(5): 0 to 1 [usable: 1],
    UpdateMetadata(6): 0 to 5 [usable: 5],
    ControlledShutdown(7): 0 to 2 [usable: 2],
    OffsetCommit(8): 0 to 6 [usable: 6],
    OffsetFetch(9): 0 to 5 [usable: 5],
    FindCoordinator(10): 0 to 2 [usable: 2],
    JoinGroup(11): 0 to 4 [usable: 4],
    Heartbeat(12): 0 to 2 [usable: 2],
    LeaveGroup(13): 0 to 2 [usable: 2],
    SyncGroup(14): 0 to 2 [usable: 2],
    DescribeGroups(15): 0 to 2 [usable: 2],
    ListGroups(16): 0 to 2 [usable: 2],
    SaslHandshake(17): 0 to 1 [usable: 1],
    ApiVersions(18): 0 to 2 [usable: 2],
    CreateTopics(19): 0 to 3 [usable: 3],
    DeleteTopics(20): 0 to 3 [usable: 3],
    DeleteRecords(21): 0 to 1 [usable: 1],
    InitProducerId(22): 0 to 1 [usable: 1],
    OffsetForLeaderEpoch(23): 0 to 2 [usable: 2],
    AddPartitionsToTxn(24): 0 to 1 [usable: 1],
    AddOffsetsToTxn(25): 0 to 1 [usable: 1],
    EndTxn(26): 0 to 1 [usable: 1],
    WriteTxnMarkers(27): 0 [usable: 0],
    TxnOffsetCommit(28): 0 to 2 [usable: 2],
    DescribeAcls(29): 0 to 1 [usable: 1],
    CreateAcls(30): 0 to 1 [usable: 1],
    DeleteAcls(31): 0 to 1 [usable: 1],
    DescribeConfigs(32): 0 to 2 [usable: 2],
    AlterConfigs(33): 0 to 1 [usable: 1],
    AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
    DescribeLogDirs(35): 0 to 1 [usable: 1],
    SaslAuthenticate(36): 0 to 1 [usable: 1],
    CreatePartitions(37): 0 to 1 [usable: 1],
    CreateDelegationToken(38): 0 to 1 [usable: 1],
    RenewDelegationToken(39): 0 to 1 [usable: 1],
    ExpireDelegationToken(40): 0 to 1 [usable: 1],
    DescribeDelegationToken(41): 0 to 1 [usable: 1],
    DeleteGroups(42): 0 to 1 [usable: 1],
    ElectPreferredLeaders(43): 0 [usable: 0]
)

 

 可以看到 Fetch(1): 0 to 10 [usable: 10] , 支持的 fetch版本是 0 到 10(最新版本)(请求和响应协议是一一对应的).

 服务端的协议版本找到了,就该找客户端的协议版本了。

直接打开kafka-clents 的源码:

flink引出的kafka不同版本的兼容性
    




Kafka协议兼容性改进

进入 方法: FetchRequest.schemaVersions()

public static Schema[] schemaVersions() {
        return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
            FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8};
    };
FetchResponse.schemaVersions()
public static Schema[] schemaVersions() {
        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8};
    }

可以看到,kafka-clients 2.0.1(flink 1.7.2 默认的 kafka client 版本) 支持的 fetch reques 和 response 的协议版本 是 0 - 8.

kafka 0.10.x 支持的版本是 0 - 3

kafka 1.0.0 支持的版本是: 0 - 6

更多请参考官网:http://kafka.apache.org/protocol.html#The_Messages_Fetch

总结: kafka 做的真的是好,服务的和客户端的各个版本的完全兼容性非常好,新版本支持旧版本,旧版本也可以和新版一起使用,只有支持的协议版本又交叉


推荐阅读
  • 在对WordPress Duplicator插件0.4.4版本的安全评估中,发现其存在跨站脚本(XSS)攻击漏洞。此漏洞可能被利用进行恶意操作,建议用户及时更新至最新版本以确保系统安全。测试方法仅限于安全研究和教学目的,使用时需自行承担风险。漏洞编号:HTB23162。 ... [详细]
  • Maven Web项目创建时JSP文件常见错误及解决方案
    Maven Web项目创建时JSP文件常见错误及解决方案 ... [详细]
  • 在多线程并发环境中,普通变量的操作往往是线程不安全的。本文通过一个简单的例子,展示了如何使用 AtomicInteger 类及其核心的 CAS 无锁算法来保证线程安全。 ... [详细]
  • php更新数据库字段的函数是,php更新数据库字段的函数是 ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 为了确保iOS应用能够安全地访问网站数据,本文介绍了如何在Nginx服务器上轻松配置CertBot以实现SSL证书的自动化管理。通过这一过程,可以确保应用始终使用HTTPS协议,从而提升数据传输的安全性和可靠性。文章详细阐述了配置步骤和常见问题的解决方法,帮助读者快速上手并成功部署SSL证书。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 本文详细介绍了在 CentOS 7 系统中配置 fstab 文件以实现开机自动挂载 NFS 共享目录的方法,并解决了常见的配置失败问题。 ... [详细]
  • 解决Only fullscreen opaque activities can request orientation错误的方法
    本文介绍了在使用PictureSelectorLight第三方框架时遇到的Only fullscreen opaque activities can request orientation错误,并提供了一种有效的解决方案。 ... [详细]
  • 如何在Linux服务器上配置MySQL和Tomcat的开机自动启动
    在Linux服务器上部署Web项目时,通常需要确保MySQL和Tomcat服务能够随系统启动而自动运行。本文将详细介绍如何在Linux环境中配置MySQL和Tomcat的开机自启动,以确保服务的稳定性和可靠性。通过合理的配置,可以有效避免因服务未启动而导致的项目故障。 ... [详细]
  • XAMPP 遇到 404 错误:无法找到请求的对象
    在使用 XAMPP 时遇到 404 错误,表示请求的对象未找到。通过详细分析发现,该问题可能由以下原因引起:1. `httpd-vhosts.conf` 文件中的配置路径错误;2. `public` 目录下缺少 `.htaccess` 文件。建议检查并修正这些配置,以确保服务器能够正确识别和访问所需的文件路径。 ... [详细]
  • 在项目开发中,我们搭建了私有的Maven仓库服务器,以方便管理和下载所需的JAR包。然而,某些外部JAR包可能无法从公共Maven仓库获取,或者我们自行开发了一些仅供公司内部使用的插件,这些都需要上传到私有仓库中进行共享。本文详细介绍了如何使用Maven命令行工具将这些第三方JAR包部署至Nexus仓库服务器,确保团队成员能够轻松访问和使用这些资源。 ... [详细]
  • 本文详细解析了 Yii2 框架中视图和布局的各种函数,并综述了它们在实际开发中的应用场景。通过深入探讨每个函数的功能和用法,为开发者提供了全面的参考,帮助他们在项目中更高效地利用这些工具。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
author-avatar
魅由心生先_941
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有