点击上方“芋道源码”,选择“设为星标”
管她前浪,还是后浪?
能浪的浪,才是好浪!
每天 10:33 更新文章,每天掉亿点点头发...
源码精品专栏
原创 | Java 2021 超神之路,很肝~
中文详细注释的开源项目
RPC 框架 Dubbo 源码解析
网络应用框架 Netty 源码解析
消息中间件 RocketMQ 源码解析
数据库中间件 Sharding-JDBC 和 MyCAT 源码解析
作业调度中间件 Elastic-Job 源码解析
分布式事务中间件 TCC-Transaction 源码解析
Eureka 和 Hystrix 源码解析
Java 并发源码
来源:yinzige.com/2020/04/
24/pulsar-survey/
Apache Pulsar
1. Kafka 概述
1.1 现存问题
1.3 优点
1.4 缺点
2. Pulsar 架构
2.1 Pulsar VS Kafka
2.2 Pulsar 架构
2.3 多租户与 Topic Lookup
2.4 Produce / Consume 策略
3. Bookkeeper 架构
3.1 概念
3.2 架构
3.3 写流程
3.4 读流程
4. 水平扩容
4.1 水平扩展 Broker
4.2 水平扩展 Bookie
5. Pulsar Consistency
5.1 一致性机制
5.2 Bookie Auto Recovery:Ensemble Change
5.3 Broker Recovery:Fencing
6. Distributed Log 与 Raft
6.1 概念对比
6.2 流程对比
6.3 总结
7. 总结
7.1 Pulsar 的优点
7.2 Pulsar 的缺点
简要总结下对 Pulsar 的调研。
内容:
Kafka : 优缺点。
Pulsar : 多租户,Topic Lookup,生产消费模式
Bookkeeper : 组件概念与读写流程
Horizontal Scale : Broker 或 Bookie 的横向扩展
Consistency : Broker 或 Bookie crash 后保证日志一致性
Distributed Log & Raft 算法
总结
推荐下自己做的 Spring Boot 的实战项目:
https://github.com/YunaiV/ruoyi-vue-pro
主要问题:
负载均衡需人工介入:手动按异构配置的 broker 对应生成 assignment 执行计划。
故障恢复不可控:broker 重启后需复制分区新数据并重建索引,其上的读写请求转移到其他 broker,流量激增场景下可能会导致集群雪崩。
其他问题:
跨数据中心备份需维护额外组件:MirrorMaker 官方也承认鸡肋,做跨机房的冗余复制依赖第三方组件如 uber 的 uReplicator
注:已脱敏。
生态成熟,易与 Flink 等现有组件集成。
可参考资料多,完善的官方文档和书籍。
模型简单易上手:partition 有 replication,以 segment 和 index 方式存储。
计算与存储耦合
存储节点有状态:读写只能走 Partition Leader,高负载集群中 Broker 重启容易出现单点故障,甚至雪崩。
手动负载均衡:集群扩容必须手动 Assign Partitions 到新 Broker,才能分散读写的负载。
漫画对比:https://jack-vanlightly.com/sketches/2018/10/2/kafka-vs-pulsar-rebalancing-sketch
推荐下自己做的 Spring Cloud 的实战项目:
https://github.com/YunaiV/onemall
Pular | Kafka | |
---|---|---|
数据集合 | Topic, Partition | Topic, Partition |
存储节点及读写组件 | Bookkeeper Bookie | Broker |
Pulsar Broker | Client SDK | |
数据存储单元 | Partition -> Ledgers -> Fragments | Partition -> Segments |
数据一致性保证 | Ensemble Size | metadata.broker.list |
Write Quorum Size(QW) | Replication Factor | |
Ack Quorum Size(QA) | request.required.acks |
注&#xff1a;(QW&#43;1)/2 <&#61; QA <&#61; QW <&#61; Ensemble Size <&#61; Bookies Count
Kafka&#xff1a;topic 切分为多个 partitions&#xff0c;各 partition 以目录形式在 leader broker 及其多副本 brokers 上持久化存储。
Pulsar&#xff1a;同样有多个 partitions&#xff0c;但一个 partition 只由一个 broker 负责读写&#xff08;ownership&#xff09;&#xff0c;而一个 partition 又会均匀分散到多台 bookie 节点上持久化存储。
Kafka&#xff1a;直接持久化到 broker&#xff0c;由 Client SDK 直接读写。
Pulsar&#xff1a;分散持久化到 bookie&#xff0c;由 broker 内嵌的 bookkeeper Client 负责读写。
Kafka&#xff1a;通过多 broker 集群&#xff0c;每个 partition 多副本&#xff0c;producer 指定发送确认机制保证。
Pulsar&#xff1a;通过多 broker 集群&#xff0c;broker Quorum Write 到 bookie&#xff0c;返回 Quorum ACK 保证。
topic 分三层&#xff1a;persistent://tenant/namespace/topic
&#xff0c;对应划分为 department -> app -> topics
&#xff0c;以 namespace 为单位进行过期时间设置&#xff0c;ACL 访问鉴权控制。
优点&#xff1a;按租户进行 topic 资源隔离&#xff0c;并混部在同一集群中&#xff0c;提高集群利用率。
Leader&#xff1a;即 Broker Leader&#xff0c;类似 Kafka Controller&#xff0c;汇总所有 Broker 的负载&#xff0c;合理地分配 topic 分区。
Wroker&#xff1a;等待分配 bundle 内的所有 topic partition
以 Namespace 为单位在 ZK 维护 bundle ring&#xff08;broker 的数量 2~3 倍&#xff09;&#xff0c;topic 分区按 hash(topic_partition)%N
落到 bundle 中。
Broker 唯一绑定到 bundle&#xff0c;就对 bundle 内的所有 topic partition 持有 ownership&#xff0c;用于 Broker Recovery 保证高可用。
上报负载&#xff1a;LoadManager Worker 负责向 ZK 汇报负载指标
zk> get /loadbalance/brokers/localhost:8080{ "pulsarServiceUrl": "pulsar://localhost:6650", "cpu": { "usage": 23, "limit": 50 }, "memory": { "usage": 1, "limit": 10 }, "msgThroughputIn": 100, "msgThroughputOut": 100}
bundle 为单位分配&#xff1a;LoadManager Leader 汇总其他 Brokers 的负载&#xff0c;根据负载分配 bundle
zk> get /loadbalance/leader{"serviceUrl":"http://localhost:8080","leaderReady":false}
分配结果&#xff1a;
zk> ls /namespace/public/default[0x00000000_0x40000000, 0x40000000_0x80000000, 0x80000000_0xc0000000, 0xc0000000_0xffffffff]zk> get /namespace/public/default/0x80000000_0xc0000000{"nativeUrl":"pulsar://localhost:6650","httpUrl":"http://localhost:8080","disabled":false}
不同于 kafka 将所有 topic ISR 等元数据记录到 zk&#xff0c;pulsar 只记录 topic 的分区数&#xff0c;不记录 topic 到 broker 的映射关系&#xff0c;zk 元数据数量极少&#xff0c;所以支持百万量级 topic
zk> get /admin/partitioned-topics/public/default/persistent/partitioned-topic-1{"partitions":2}
Client 向任一 BrokerA 发起 Lookup 请求&#xff0c;如 persistent://public/default/test-topic-1
BrokerA 计算 default namespace 下 hash(topic_partition)%N
的值&#xff0c;得到该 topic partition 对应的 bundle&#xff0c;从而查出 ownership BrokerX
BrokerA 返回 owner BrokerX 地址。
RoundRobinPartition&#xff08;默认&#xff09; &#xff1a;以 batching 为单位&#xff0c;通过轮询将消息均匀发给 brokers&#xff0c;以获得最大吞吐。
SinglePartition
有 KEY 则写固定分区&#xff0c;类似 hash(key) mod len(partitions)
写到指定分区。
无 KEY 则随机选一个分区&#xff0c;写入该 producer 的所有消息。
CustomPartition &#xff1a;用户可自定义针对具体到消息的分区策略&#xff0c;如 Java 实现 MessageRouter
接口。
Exclusive &#xff08;默认&#xff09; &#xff1a;独占消费&#xff0c;一对一&#xff0c;保证有序消费&#xff0c;能批量 ACK&#xff0c;是 Failover 特例&#xff0c;不保证高可用。
Failover &#xff1a;故障转移消费&#xff0c;一对一&#xff0c;备选多&#xff0c;保证有序消费&#xff0c;消费者高可用&#xff0c;能批量 ACK&#xff0c;保证高可用。
Shared &#xff1a;共享消费&#xff0c;多对多
Round Robin 分发消息&#xff0c;类似 Consumer Group 但不保证有序消费。
只能逐条 ACK&#xff1a;Consumer crash 时才能精确控制消息的重发。
水平扩展 Consumer 直接提读吞吐。不像 kafka 必须先扩 Partition 才能扩 Consumer
Key_Shared &#xff1a;按 KEY 共享消费&#xff0c;多对多&#xff0c;Exclusive 和 Shared 的折中模式。
KEY hash 相同的消息会被相同 consumer 消费&#xff0c;保证有序消费。
只能逐条 ACK
水平扩展 Consumer 提高读吞吐。
Consumer 可以同步或异步 p Receive 消息。
Consumer 可以本地注册 MessageListener 接口来等待 Broker Push 消息。
逐条 ACK、批量 ACK
取消 ACK&#xff1a;consumer 消费出错可请求重新消费&#xff0c;发送取消 ACK 后 broker 会重发消息。
exclusive, failover&#xff1a;只能取消上一次提交的 ACK&#xff0c;单个 consumer 可控回滚。
shared, key_shared&#xff1a;类比 ACK&#xff0c;consumers 只能取消上一条发出的 ACK
与 __consumer_offsets
机制类似 &#xff0c;Broker 收到各消费者的 ACK 后&#xff0c;会更新 Consumer 的消费进度 cursor&#xff0c;并持久化到特定的 ledger 中。
默认积极保留&#xff1a;最慢的 subscription 堆积的消息都不能被删除&#xff0c;最坏的情况是某个 subscription 下线后&#xff0c;cursor 依旧会保留在 message streaming 中&#xff0c;会导致消息过期机制失效。
消息过期&#xff1a;时间或大小两个维度设置限制&#xff0c;但只对积极保留之前的消息生效
TTL&#xff1a;强制移动旧慢 cursor 到 TTL 时间点&#xff0c;若 TTL &#61;&#61; Retention&#xff0c;则与 kafka 一样强制过期
两个指标
Topic Backlog&#xff1a;最慢的 subscription 的 cursor 到最新一条消息之间的消息数量。
Storage Size&#xff1a;topic 总空间。
按 segment 粒度删除&#xff0c;以 Last Motify Time 是否早于 Retention 为标准过期&#xff0c;与 kafka 一致
注&#xff1a;bookie 并非同步过期&#xff0c;空间释放是后台进程定期清理
append-only 的分布式 KV 日志系统&#xff0c;K 是 (Ledger_id, Entry_id)
二元组&#xff0c;V 是 (MetaData, RawData)
二进制数据。
高效写&#xff1a;append-only 磁盘顺序写。
高容错&#xff1a;通过 bookie ensemble 对日志进行冗余复制。
高吞吐&#xff1a;直接水平扩展 bookie 提高读写吞吐。
Ensemble Size&#xff1a;指定一段 日志要写的 bookies 数量。
Ensembles&#xff1a;指定写一段 日志的目标 bookies 集合。
Write Quorum&#xff1a;指定一条 日志要写 的 bookie 数量。
ACK Quorum&#xff1a;指定一条 日志要确认已写 入的 bookie 数量。
Segment / Ledger&#xff1a;要写入的一段 日志。
Fragment&#xff1a;写入的一条 日志。
Client 会以 Round Robin 的策略挑选出 bookie&#xff0c;依次顺延写 entry
Client 只等待 ACK Quorum 个 broker 返回 Append ACK 就认为写成功。
一个 Segment / Ledger 包含多个 Fragment
Fragment 内的 entry 呈带状连续 分布在 Ensembles Bookies 上。
一个周期内&#xff0c;一台 Bookie 会存储不连续 的 (EnsembleSize - WriteQuorum)
条 Entry
三个组件
zk / etcd&#xff1a;强一致性元数据存储
元数据存储&#xff1a;ledger 元数据。
服务发现&#xff1a;bookie 的注册中心&#xff0c;bookie 互相发现&#xff0c;client 读取集群全部 bookie 地址。
Bookie&#xff1a;存储节点&#xff0c;只允许 ADD
/ READ
两个操作&#xff0c;不保证一致性&#xff0c;不保证可用性 &#xff0c;功能简单。
Client&#xff1a;实现冗余复制的逻辑&#xff0c;保证数据的一致性&#xff0c;实现复杂且最重要。
Journal WAL
概念&#xff1a;用于持久化存储 bookie 操作 ledger 的事务日志&#xff0c;接收来自多个 Ledger Client 写入的不同 ledger entries&#xff0c;直接 高效地 append 到内存&#xff0c;随后 fsync 顺序写磁盘&#xff0c;延迟低。
清理&#xff1a;当 Write Cache 完成 Flush 落盘后自动删除。
Entry Logs
概念&#xff1a;真正落盘的日志文件&#xff0c;有序保存不同 ledger 的 entries&#xff0c;并维护 Write Cache 加速热日志的查找。
清理&#xff1a;bookie 后台 GC 线程定期检查其关联的 ledgers 是否在 zk 上已删除&#xff0c;若已删除则自动清理。
Index Files
概念&#xff1a;高效顺序写的副作用是&#xff0c;必须在外围维护 (ledger_id, entry_id)
到 Entry_Log
的映射索引&#xff0c;才能实现高效读&#xff0c;故 Flush Cache 时会分离出索引文件。
实现&#xff1a;可选 RocksDB 和文件存储索引。
Clients 混乱地给 Bookie 发来不同 ledger 的日志。
Bookie 往追加写 Journal&#xff0c;同时向 Write Cache 有序写&#xff08;Write Cache 内部使用 SkipList 实现动态有序&#xff0c;同时保证读写都高效&#xff09;
WriteCache 写满后 Flush 分离出 index 文件和落盘的日志文件。
删除旧 Journal&#xff0c;创建新 Journal 继续追加写&#xff0c;如此循环。
broker 内部为每个 ledger 持久化了其存储的 entry logs&#xff0c;并建立索引提高读效率。
Client 发来 (ledger_id, entry_id)
的 KEY
热 KEY&#xff1a;在 Write Cache 中则直接返回。
冷 KEY&#xff1a;读取 ledger_id 对应的 index 文件&#xff0c;根据 index 找出 entry_id 对应的 entry log 再返回。
如同轮询写&#xff0c;Cleint 也会轮询 Ensembles 均摊读取&#xff0c;同样不存在 leader 读瓶颈。
若某个 Bookie 读响应确实很慢&#xff0c;Client 会向其他副本 Bookie 发起读请求&#xff0c;同时等待&#xff0c;从而保证读延时低。
Client 往 bookie 写是轮询无序地写&#xff0c;故从 Ensembles 中读到是消息是无序的&#xff0c;需在 Client 端自行按 entry_id 重新排序&#xff0c;以保证有序响应。
新 Broker 加入集群后&#xff0c;Broker Leader 会将高负载 Broker 的部分 topic ownership 转移给新 Broker&#xff0c;从而分摊读写压力。
新 Bookie 加入集群后&#xff0c;Broker 通过 ZK 感知到&#xff0c;并将 ledger 的新 entry log 写到新 Bookie&#xff0c;提高存储层的读写吞吐、存储容量。
日志的冗余复制、一致性保证均由 Bookkeeper Client 实现。
由如上的 Eensembles 的 QW 和 QA 的多副本写&#xff0c;保证每条日志确实持久化到了 bookie 中。
滑动窗口&#xff1a;[0, ..., READABLE ... LAC], [LAC&#43;1, ... WAIT_QUOROM ..., LAP]
LAP&#xff08;Last Add Pushed&#xff09;&#xff1a;Client 发出的最后一条 entry_id&#xff08;从 0 自增的正整数&#xff09;
LAC&#xff08;Last Add Confirmed&#xff09;&#xff1a;Client 收到的最后一条 ACK 的 entry_id&#xff0c;是一致性的边界。
实现一致性的三个前置条件&#xff1a;
写 ledger 只能以 Append-Only 方式追加写&#xff0c;写满后变为 Read-Only
一个 Ledger 同一时间只会有一个 Client 在写。
LAC 必须按照 LAP 的顺序&#xff0c;依次进行 ACK 确认&#xff1a;保证 LAC 作为一致性边界&#xff0c;前边的日志可读&#xff0c;后边的日志等待多副本复制。
bookie crash 下线后&#xff0c;需恢复副本数量。
存在 Leader Bookie 5 作为 Daemon Auditor&#xff0c;不断向其他 Bookies 发送心跳保活。
Auditor 发现 Bookie 4 超时&#xff0c;读取 zk 发现 ledger x 的 [0, 7)
entry_id 区间需要从 4 转移到新 Bookie
找出负载较小的 Bookie 6&#xff0c;并根据 Ensembles 发现冗余数据分布在 {B1, B2, B3, B5}
按轮询均摊复制读压力的方式&#xff0c;将 entry log 逐一复制到 Bookie 6
复制完毕后修改 ZK 元数据&#xff0c;将 LAC0 的副本 4 替换为 6
写请求快速转移&#xff1a;
Bookie 6 加入 Ensembles 后&#xff0c;直接代替 Bookie 4 继续 Append 日志。因为副本数恢复是各个 Ensembles 内部各节点的 Auditor 线程后台异步复制&#xff0c;不会导致 Client 的写中断&#xff0c;整个 Recovery 过程对 Client 几乎透明。
LAC 分界线记录 Ensemble Change 历史&#xff1a;
在 ZK 的 ledger metadata 中&#xff0c;会记录每次 Recovery 导致的 ensembles 更新&#xff0c;即记录了 ledger 各 entry log 区间的分布情况。如下元数据记录了 ledger16 在 LAC46 处&#xff0c;Bookie 3183 下线&#xff0c;随后 Bookie 3182 上线从 LAC47 处继续处理请求&#xff1a;
> get /ledgers/00/0000/L0016ensembleSize: 3quorumSize: 2ackQuorumSize: 2lastEntryId: -1state: OPENsegment { ensembleMember: "10.13.48.57:3185" ensembleMember: "10.13.48.57:3184" ensembleMember: "10.13.48.57:3183" firstEntryId: 0}segment { ensembleMember: "10.13.48.57:3185" ensembleMember: "10.13.48.57:3184" ensembleMember: "10.13.48.57:3182" firstEntryId: 47}
注意&#xff1a;右上可看出 ZK 中各 ledger 的元数据硬编码了 Bookie 的 IP&#xff0c;容器部署时若 Bookie 重启后 IP 变化&#xff0c;会导致旧 Ledger 的该副本作废&#xff0c;故在 k8s 上部署时应选择 DaemonSet 或 StatefulSet
Broker crash&#xff0c;或 Broker 与 ZK 出现网络分区导致脑裂&#xff0c;需进行 partition ownership 转移。
Broker1 心跳超时后&#xff0c;ZK 将 topic partition 的 ownership 转移到 Broker2
Broker2 向 Ensemble 发起 Fencing ledger_X 请求&#xff0c;Bookies 纷纷将 ledger_X 置为 Fencing 不可写状态。
Broker1 写数据失败收到 FenceException&#xff0c;说明该 partition 已被 Broker 接管&#xff0c;主动放弃 ownership
Client 收到异常后与 Broker1 断开连接&#xff0c;进行 Topic Lookup 与 Broker2 建立长连接。
同时&#xff0c;Broker2 对 ledger_X LAC1 之后的 entry log 依次逐一进行 Forwarding Recovery&#xff08;若 unknow 状态的 entry 副本数实际上已达到 WQ&#xff0c;则认为该 entry 写成功&#xff0c;LAC1 自增为 LAC2&#xff09;
Broker2 更新 ledger_X 的 metadata&#xff0c;将其置为 CLOSE 状态&#xff0c;再创建新 ledger&#xff0c;继续处理 Client 的写请求。
不复用旧 ledger&#xff0c;降低复杂度 若复用旧 ledger_X&#xff0c;必须保证所有 ensemble 的 LAC 一致&#xff0c;同时涉及尾部 entry 的强一致复制&#xff0c;逻辑复杂。直接 CLOSE 能保证旧 ledger 不会再被写入。
Recovery 逻辑简单&#xff0c;耗时短 在 Client 的视角&#xff0c;只需等待两个过程&#xff1a;
等待结束后&#xff0c;直接往新 Broker 的新 ledger 上追加写数据&#xff0c;Broker 不参与任何数据冗余复制的流程&#xff0c;所以是无状态的&#xff0c;可以直接水平扩展提升以提升吞吐。
ZK 进行 partition ownership 的转移。
新 Broker 对 UNKNOWN 状态的尾部 entry 进行 Forwarding Recovery
概念 | Raft | DL |
---|---|---|
role | Leader 与 Followers | Writer (broker) 与 Bookies |
failover | term | ledger_id |
replication | Majority AppendEntries RPC | Quorum Write |
consistency | Last Committed Index | Last Add Confirmed&#xff08;LAC&#xff09; |
brain split | Majority Vote | Broker Fencing |
LAC 与 LAP 的存在&#xff0c;使 entry 能以内嵌顺序元数据的方式&#xff0c;均匀分散存储到各台 bookie 中。
DL 与 Raft 不同之处在于&#xff1a;
各 bookie 节点的数据不是从单个节点异步复制而来&#xff0c;而是由 Client 直接轮询分发。
为保证 bookie 能快速 append 日志&#xff0c;bookkeeper 设计了 Journal Append-only 顺序写日志机制。
为保证 bookie 能快速根据 (lid, eid)
读取消息(entry)
&#xff0c;bookkeeper 设计了 Ledger Store
因此&#xff0c;各 bookie 存储节点的身份是平等的&#xff0c;没有传统一致性算法的 Leader 和 Follower 的概念&#xff0c;完美避开了读写只能走 Leader 导致 Leader 容易成为单点瓶颈的问题。同时&#xff0c;能直接添加新 Bookie 提升读写吞吐&#xff0c;并降低其他旧 Bookie 的负载。
直接解决 Kafka 容器平台现有的手工扩容、故障恢复慢的问题。
稳定性可用性高&#xff1a;秒级 Broker / Bookie 的快速故障恢复。
水平线性扩容&#xff1a;存储与计算分离&#xff0c;可对 Broker 扩容提升读写吞吐&#xff0c;可对 Bookie 扩容降低集群负载并提升存储容量。
扩容负载均衡&#xff1a;Bookie 扩容后新的 ledger 会在新 Bookie 上创建&#xff0c;自动均摊负载。
概念多&#xff0c;系统复杂&#xff0c;隐藏 bug 修复门槛高。
背书少&#xff0c;国内仅腾讯金融和智联招聘在使用。
欢迎加入我的知识星球&#xff0c;一起探讨架构&#xff0c;交流源码。加入方式&#xff0c;长按下方二维码噢&#xff1a;
已在知识星球更新源码解析如下&#xff1a;
最近更新《芋道 SpringBoot 2.X 入门》系列&#xff0c;已经 101 余篇&#xff0c;覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。
提供近 3W 行代码的 SpringBoot 示例&#xff0c;以及超 4W 行代码的电商微服务项目。
获取方式&#xff1a;点“在看”&#xff0c;关注公众号并回复 666 领取&#xff0c;更多内容陆续奉上。
文章有帮助的话&#xff0c;在看&#xff0c;转发吧。
谢谢支持哟 (*^__^*&#xff09;