Kafka 是一个分布式的、可水平扩展的、基于发布/订阅模式的、支持容错的消息系统。
Kafka 使用 Zookeeper 来维护集群成员的信息。每个 broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在 broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。Kafka 组件订阅 Zookeeper 的 /broker/ids 路径,当有 broker 加入集群或退出集群时,这些组件就可以获得通知。
ZooKeeper 两个重要特性:
- 客户端会话结束时,ZooKeeper 就会删除临时节点。
- 客户端注册监听它关心的节点,当节点状态发生变化(数据变化、子节点增减变化)时,ZooKeeper 服务会通知客户端。
复制功能是 Kafka 架构的核心。复制之所以关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。
Kafka 使用 Topic 来组织数据,每个 Topic 被分为若干个 Partition。每个 Partition 有多个副本。每个 broker 可以保存成百上千个属于不同 Topic 和 Partition 的副本。Kafka 副本的本质是一个只能追加写入的提交日志。
副本有两种类型:
为了与 Leader 保持同步,Follower 向 Leader 发起异步请求,拉取数据。请求消息里包含了 Follower 想要获取消息的偏移量,而这些偏移量总是有序的。通过查看每个 Follower 请求的最新偏移量,Leader 就会知道每个 Follower 复制的进度。
ISR 即 In-sync Replicas,表示同步副本。Follower 副本不提供服务,只是定期地异步拉取领导者副本中的数据而已。既然是异步的,说明和 Leader 并非数据强一致性的。
判断 Follower 是否与 Leader 同步的标准:
Kafka Broker 端参数 replica.lag.time.max.ms 参数,指定了 Follower 副本能够落后 Leader 副本的最长时间间隔,默认为 10s。这意味着:只要一个 Follower 副本落后 Leader 副本的时间不连续超过 10 秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的,即使此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
ISR 是一个动态调整的集合,会不断将同步副本加入集合,将不同步副本移除集合。Leader 副本天然就在 ISR 中。
控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 ZooKeeper 的帮助下管理和协调整个 Kafka 集群。
集群中任意一台 Broker 都能充当控制器的角色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和协调的职责。实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 临时节点的 Broker 会被指定为控制器。
选举控制器详细流程:
(1)第一个在 ZooKeeper 中成功创建 /controller 临时节点的 Broker 会被指定为控制器。
(2)其他 broker 在控制器节点上创建 Zookeeper watch 对象。
(3)如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 临时节点就会消失。集群中的其他 broker 通过 watch 对象得到状态变化的通知,它们会尝试让自己成为新的控制器。
(4)第一个在 Zookeeper 里创建一个临时节点 /controller 的 broker 成为新控制器。其他 broker 在新控制器节点上创建 Zookeeper watch 对象。
(5)每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他节点会忽略旧的 epoch 的消息。
(6)当一个 broker 离开集群,并且这个 broker 是某些 Partition 的 Leader。此时,控制器会遍历这些 Partition,并用轮询方式确定谁应该成为新 Leader,随后,新 Leader 开始处理生产者和消费者的请求,而 Follower 开始从 Leader 那里复制消息。
简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行 Partition Leader 选举。控制器使用 epoch 来避免“脑裂”,“脑裂”是指两个节点同时被认为自己是当前的控制器。
主题管理(创建、删除、增加分区)
这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。
分区重分配
分区重分配主要是指,kafka-reassign-partitions 脚本(关于这个脚本,后面我也会介绍)提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。
领导者选举
Preferred 领导者选举主要是 Kafka 为了避免部分 Broker 负载过重而提供的一种换 Leader 的方案。在后面说到工具的时候,我们再详谈 Preferred 领导者选举,这里你只需要了解这也是控制器的职责范围就可以了。
集群成员管理
集群成员管理,包括自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。
比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。
数据服务
控制器的最后一大类工作,就是向其他 Broker 提供数据服务。控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
控制器中保存了多种数据,比较重要的的数据有:
值得注意的是,这些数据其实在 ZooKeeper 中也保存了一份。每当控制器初始化时,它都会从 ZooKeeper 上读取对应的元数据并填充到自己的缓存中。有了这些数据,控制器就能对外提供数据服务了。这里的对外主要是指对其他 Broker 而言,控制器通过向这些 Broker 发送请求的方式将这些数据同步到其他 Broker 上。
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader。
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本。通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。Broker 端参数 unclean.leader.election.enable 控制是否允许 Unclean 领导者选举。
开启 Unclean 领导者选举可能会造成数据丢失,但好处是:它使得 Partition Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
broker 的大部分工作是处理客户端、Partition 副本和控制器发送给 Partition Leader 的请求。Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 broker 如何对请求作出响应。
如果我们来为Kafka画一张类似的图的话,那它应该是这个样子的:
Kafka的Broker端有个SocketServer组件,类似于Reactor模式中的Dispatcher,它也有对应的Acceptor线程和一个工作线程池,只不过在Kafka中,这个工作线程池有个专属的名字,叫网络线程池。Kafka提供了Broker端参数num.network.threads,用于调整该网络线程池的线程数。其默认值是3,表示每台Broker启动时会创建3个网络线程,专门处理客户端发送的请求。
Acceptor线程采用轮询的方式将入站请求公平地发到所有网络线程中,因此,在实际使用过程中,这些线程通常都有相同的几率被分配到待处理请求。这种轮询策略编写简单,同时也避免了请求处理的倾斜,有利于实现较为公平的请求处理调度。
既然客户端发来的请求会被Broker端的Acceptor线程分发到任意一个网络线程中,由它们来进行处理,那么当网络线程接收到请求后,它是怎么处理的呢?实际上,Kafka在这个环节又做了一层异步线程池的处理,我们一起来看一看下面这张图。
当网络线程拿到请求后,它不是自己处理,而是将请求放入到一个共享请求队列中。Broker端还有个IO线程池,负责从该队列中取出请求,执行真正的处理。如果是PRODUCE生产请求,则将消息写入到底层的磁盘日志中;如果是FETCH请求,则从磁盘或页缓存中读取消息。
IO线程池处中的线程才是执行请求逻辑的线程。Broker端参数num.io.threads控制了这个线程池中的线程数。目前该参数默认值是8,表示每台Broker启动后自动创建8个IO线程处理请求。
如果你的机器上CPU资源非常充裕,你完全可以调大该参数,允许更多的并发请求被同时处理。当IO线程处理完请求后,会将生成的响应发送到网络线程池的响应队列中,然后由对应的网络线程负责将Response返还给客户端。
客户端怎么知道哪个是主副本呢?客户端通过使用另一种类型的请求来实现,那就是元数据请求(metadata request)。这种请求包含了客户端感兴趣的 Topic 列表。broker 的响应消息指明了这些 Topic 所包含的 Partition、Partition 有哪些副本,以及那个副本是 Leader。元数据请求可以发给任意一个 broker,因为所有 broker 都缓存了这些信息。
客户端会把这些信息缓存起来,并直接往目标 broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms 来配置),从而知道元数据是否发生了变化。
acks 参数控制多少个副本确认写入成功后生产者才认为消息生产成功。这个参数的取值可以为:
如果主副本收到生产消息,它会执行一些检查逻辑,包含:
之后,消息被写入到本地磁盘。一旦消息本地持久化后,如果 acks 被设为 0 或 1 那么会返回结果给客户端,如果 acks 被设为 all 那么会将请求放置在一个称为 purgatory 的缓冲区中等待其他的副本写入完成。
主副本处理拉取请求和处理生产请求的方式很相似:
客户端可以指定返回的最大数据量,防止数据量过大造成客户端内存溢出。同时,客户端也可以指定返回的最小数据量,当消息数据量没有达到最小数据量时,请求会一直阻塞直到有足够的数据返回。指定最小的数据量在负载不高的情况下非常有用,通过这种方式可以减轻网络往返的额外开销。当然请求也不能永远的阻塞,客户端可以指定最大的阻塞时间,如果到达指定的阻塞时间,即便没有足够的数据也会返回。
Kafka 使用零复制(zero-copy)来提高性能。也就是说,Kafka 将文件(更准确的说,是文件系统缓存)的消息直接传给网络通道,并没有使用中间的 buffer。这避免了内存的字节拷贝和 buffer 维护,极大地提高了性能。
不是所有主副本的数据都能够被读取。当数据被所有同步副本写入成功后,它才能被客户端读取。主副本知道每个消息会被复制到哪个副本上,在消息还没有被写入到所有同步副本之前,是不会发送给消费者的。
因为还没有被足够的副本持久化的消息,被认为是不安全的——如果主副本发生故障,另一个副本成为新的主副本,这些消息就丢失了。如果允许读取这些消息,就可能会破坏数据一致性。
这也意味着,如果 broker 间的消息复制因为某些原因变慢了,那么消息到达消费者的时间也会随之边长。延迟时间可以通过 replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大延迟时间。
我们讨论了 Kafka 中最常见的三种请求类型:元信息请求,生产请求和拉取请求。这些请求都是使用的是 Kafka 的自定义二进制协议。集群中 broker 间的通信请求也是使用同样的协议,这些请求是内部使用的,客户端不能发送。比如在选举 Partition 主副本过程中,控制器会发送 LeaderAndIsr 请求给新的主副本和其他跟随副本。
这个协议目前已经支持 20 种请求类型,并且仍然在演进以支持更多的类型。
Kafka 的基本存储单元是 Partition。Partition 无法在多个 broker 间进行再细分。
当创建一个新 Topic 时,Kafka 首先需要决定如何分配 Partition 到不同的 broker。分配策略主要的考虑因素如下:
Kafka 不会一直保留数据,也不会等待所有的消费者读取了消息才删除消息。只要数据量达到上限或者数据达到过期时间,Kafka 会删除老的消息数据。
因为在一个大文件中查找和删除消息是非常耗时且容易出错的。所以,Kafka 将每个 Partition 切割成段(segment)。默认每个段大小不超过 1G,且只包含 7 天的数据。如果段的消息量达到 1G,那么该段会关闭,同时打开一个新的段进行写入。
正在写入的段称为活跃段(active segment),活跃段不会被删除。
对于每个 Partition 的每个段(包括不活跃的段),broker 都会维护文件句柄,因此打开的文件句柄数通常会比较多,这个需要适度调整系统的进程文件句柄参数。
Kafka 的消息和偏移量保存在文件里。保存在磁盘上的数据格式和从生产者发送过来或消费者读取的数据格式是一样的。使用相同的数据格式使得 Kafka 可以进行零复制技术给消费者发送消息,同时避免了压缩和解压。
除了键、值和偏移量外,消息里还包含了消息大小、校验和(检测数据损坏)、魔数(标识消息格式版本)、压缩算法(Snappy、GZip 或者 LZ4)和时间戳(0.10.0 新增)。时间戳可以是生产者发送消息的时间,也可以是消息到达 broker 的时间,这个是可配的。
如果发送者发送压缩的消息,那么批量发送的消息会压缩在一起,以“包装消息”(wrapper message)来发送,如下所示:
如果生产者使用压缩功能,那么发送更大的批量消息可以得到更好的网络传输效率,并且节省磁盘存储空间。
Kafka 允许消费者从任意有效的偏移量位置开始读取消息。Kafka 为每个 Partition 都维护了一个索引,该索引将偏移量映射到片段文件以及偏移量在文件里的位置。
索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和。如果索引出现损坏,Kafka 会通过重读消息并录制偏移量和位置来重新生成索引。
每个日志片段可以分为以下两个部分:
如果在 Kafka 启动时启用了清理功能(通过 log.cleaner.enabled 配置),每个 broker 会启动一个清理管理器线程和若干个清理线程,每个线程负责一个 Partition。
清理线程会读取污浊的部分,并在内存里创建一个 map。map 的 key 是消息键的哈希码,value 是消息的偏移量。对于相同的键,只保留最新的位移。其中 key 的哈希大小为 16 字节,位移大小为 8 个字节。也就是说,一个映射只有 24 字节,假设消息大小为 1KB,那么 1GB 的段有 1 百万条消息,建立这个段的映射只需要 24MB 的内存,映射的内存效率是非常高效的。
在配置 Kafka 时,管理员需要设置这些清理线程可以使用的总内存。如果设置 1GB 的总内存同时有 5 个清理线程,那么每个线程只有 200MB 的内存可用。在清理线程工作时,它不需要把所有脏的段文件都一起在内存中建立上述映射,但需要保证至少能够建立一个段的映射。如果不能同时处理所有脏的段,Kafka 会一次清理最老的几个脏段,然后在下一次再处理其他的脏段。
一旦建立完脏段的键与位移的映射后,清理线程会从最老的干净的段开始处理。如果发现段中的消息的键没有在映射中出现,那么可以知道这个消息是最新的,然后简单的复制到一个新的干净的段中;否则如果消息的键在映射中出现,这条消息需要抛弃,因为对于这个键,已经有新的消息写入。处理完会将产生的新段替代原始段,并处理下一个段。
对于一个段,清理前后的效果如下:
对于只保留最新消息的清理策略来说,Kafka 还支持删除相应键的消息操作(而不仅仅是保留最新的消息内容)。这是通过生产者发送一条特殊的消息来实现的,该消息包含一个键以及一个 null 的消息内容。当清理线程发现这条消息时,它首先仍然进行一个正常的清理并且保留这个包含 null 的特殊消息一段时间,在这段时间内消费者消费者可以获取到这条消息并且知道消息内容已经被删除。过了这段时间,清理线程会删除这条消息,这个键会从 Partition 中消失。这段时间是必须的,因为它可以使得消费者有一定的时间余地来收到这条消息。
Kafka 由 Scala 语言和 Java 语言编写而成,编译之后的源代码就是普通的“.class”文件。本来部署到哪个操作系统应该都是一样的,但是不同操作系统的差异还是给 Kafka 集群带来了相当大的影响。
目前常见的操作系统有 3 种:Linux、Windows 和 macOS。应该说部署在 Linux 上的生产环境是最多的。主要是在下面这三个方面上,Linux 的表现更胜一筹。
在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作(通过追加的方式写文件),一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。
磁盘容量如何估算?
规划磁盘容量时需要考虑的元素:
每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 _ 1KB _ 2 / 1000 / 1000 = 200GB。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB _ 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 _ 3 = 2.25TB。
普通的以太网络,带宽也主要有两种:1Gbps 的千兆网络和 10Gbps 的万兆网络。
通常情况下只能假设 Kafka 会用到 70% 的带宽资源,因为总要为其他应用或进程留一些资源。根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。
通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb 的数据,除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。