热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

消息系统_使用Flink与Pulsar打造实时消息系统

篇首语:本文由编程笔记#小编为大家整理,主要介绍了使用Flink 与 Pulsar 打造实时消息系统相关的知识,希望对你有一定的参考价值。 最初,BIGO 的消息流平台主要采用开源 Kafka 作为数

篇首语:本文由编程笔记#小编为大家整理,主要介绍了使用Flink 与 Pulsar 打造实时消息系统相关的知识,希望对你有一定的参考价值。


最初,BIGO 的消息流平台主要采用开源 Kafka 作为数据支撑。随着数据规模日益增长,产品不断迭代,BIGO 消息流平台承载的数据规模出现了成倍增长,下游的在线模型训练、在线推荐、实时数据分析、实时数仓等业务对消息流平台的实时性和稳定性提出了更高的要求。开源的 Kafka 集群难以支撑海量数据处理场景,我们需要投入更多的人力去维护多个 Kafka 集群,这样成本会越来越高,主要体现在以下几个方面:


1、据存储和消息队列服务绑定,集群扩缩容/分区均衡需要大量拷贝数据,造成集群性能下降

2、当分区副本不处于 ISR(同步)状态时,一旦有 broker 发生故障,可能会造成数据丢失或该分区无法提供读写服务。

3当 Kafka broker 磁盘故障/空间占用率过高时,需要进行人工干预。

4、群跨区域同步使用 KMM(Kafka Mirror Maker),性能和稳定性难以达到预期。

5在 catch-up 读场景下,容易出现 PageCache 污染,造成读写性能下降。

6Kafka broker 上存储的 topic 分区数量有限,分区数越多,磁盘读写顺序性越差,读写性能越低。

7Kafka 集群规模增长导致运维成本急剧增长,需要投入大量的人力进行日常运维;在 BIGO,扩容一台机器到 Kafka 集群并进行分区均衡,需要 0.5 人/天;缩容一台机器需要 1 人/天。

 

如果继续使用 Kafka,成本会不断上升:扩缩容机器、增加运维人力。同时,随着业务规模增长,我们对消息系统有了更高的要求:系统要更稳定可靠、便于水平扩展、延迟低。为了提高消息队列的实时性、稳定性和可靠性,降低运维成本,我们开始考虑是否要基于开源 Kafka 做本地化二次开发,或者看看社区中有没有更好的解决方案,来解决我们在维护 Kafka 集群时遇到的问题。


为什么选择 Pulsar

2019 年 11 月,我们开始调研消息队列,对比当前主流消息流平台的优缺点,并跟我们的需求对接。在调研过程中,我们发现 Apache Pulsar 是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。Pulsar 能够无缝扩容、延迟低、吞吐高,支持多租户和跨地域复制。最重要的是,Pulsar 存储、计算分离的架构能够完美解决 Kafka 扩缩容的问题。Pulsar producer 把消息发送给 broker,broker 通过 bookie client 写到第二层的存储 BookKeeper 上。

 


Pulsar 采用存储、计算分离的分层架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。

 1、水平扩容:能够无缝扩容到成百上千个节点。

2高吞吐:已经在 Yahoo! 的生产环境中经受了考验,支持每秒数百万条消息的发布-订阅(Pub-Sub)。

3低延迟:在大规模的消息量下依然能够保持低延迟(小于 5 ms)。

4持久化机制:Pulsar 的持久化机制构建在 Apache BookKeeper 上,实现了读写分离。

5、读写分离:BookKeeper 的读写分离 IO 模型极大发挥了磁盘顺序写性能,对机械硬盘相对比较友好,单台 bookie 节点支撑的 topic 数不受限制。

 

为了进一步加深对 Apache Pulsar 的理解,衡量 Pulsar 能否真正满足我们生产环境大规模消息 Pub-Sub 的需求,我们从 2019 年 12 月开始进行了一系列压测工作。由于我们使用的是机械硬盘,没有 SSD,在压测过程中遇到了一些性能问题,在 StreamNative 的协助下,我们分别和  进行了一系列的,Pulsar 的吞吐和稳定性均有所提高。

 

经过 3~4 个月的压测和调优,我们认为 Pulsar 完全能够解决我们使用 Kafka 时遇到的各种问题,并于 2020 年 4 月在测试环境上线 Pulsar。


Apache Pulsar at BIGO:Pub-Sub 消费模式

2020 年 5 月,我们正式在生产环境中使用 Pulsar 集群。Pulsar 在 BIGO 的场景主要是 Pub-Sub 的经典生产消费模式,前端有 Baina 服务(用 C++ 实现的数据接收服务),Kafka 的 Mirror Maker 和 Flink,以及其他语言如 Java、Python、C++ 等客户端的 producer 向 topic 写入数据。后端由 Flink 和 Flink SQL,以及其他语言的客户端的 consumer 消费数据。

 

使用Flink 与 Pulsar 打造实时消息系统


在下游,我们对接的业务场景有实时数仓、实时 ETL(Extract-Transform-Load,将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程)、实时数据分析和实时推荐。大部分业务场景使用 Flink 消费 Pulsar topic 中的数据,并进行业务逻辑处理;其他业务场景消费使用的客户端语言主要分布在 C++、Go、Python 等。数据经过各自业务逻辑处理后,最终会写入 Hive、Pulsar topic 以及 ClickHouse、HDFS、Redis 等第三方存储服务。


使用Flink 与 Pulsar 打造实时消息系统



Pulsar + Flink 实时流平台

 

在 BIGO,我们借助 Flink 和 Pulsar 打造了实时流平台。在介绍这个平台之前,我们先了解下 Pulsar Flink Connector 的内部运行机理。在 Pulsar Flink Source/Sink API 中,上游有一个 Pulsar topic,中间是 Flink job,下游有一个 Pulsar topic。我们怎么消费这个 topic,又怎样处理数据并写入 Pulsar topic 呢?

 

使用Flink 与 Pulsar 打造实时消息系统



Pulsar topic 和 Flink 的消费如何联动呢?如下图所示,新建 FlinkPulsarSource 时,会为 topic 的每一个分区新创建一个 reader 对象。要注意的是 Pulsar Flink Connector 底层使用 reader API 消费,会先创建一个 reader,这个 reader 使用 Pulsar Non-Durable Cursor。Reader 消费的特点是读取一条数据后马上提交(commit),所以在监控上可能会看到 reader 对应的 subscription 没有 backlog 信息。

 

在 Pulsar 2.4.2 版本中,由 Non-Durable Cursor 订阅的 topic,在接收到 producer 写入的数据时,不会将数据保存在 broker 的 cache 中,导致大量数据读取请求落到 BookKeeper 中,降低数据读取效率。BIGO 在 Pulsar 2.5.1 版本中修正了这个问题。

 

使用Flink 与 Pulsar 打造实时消息系统


Reader 订阅 Pulsar topic 后,消费 Pulsar topic 中的数据,Flink 如何保证 exactly-once 呢?Pulsar Flink Connector 使用另外一个独立的 subscription,这个 subscription 使用的是 Durable Cursor。当 Flink 触发 checkpoint,Pulsar Flink Connector 会把 reader 的状态(包括每个 Pulsar Topic Partition 的消费位置) checkpoint 到文件、内存或 RocksDB 中,当 checkpoint 完成后,会发布一次 Notify Checkpoint Complete 通知。Pulsar Flink Connector 收到 checkpoint 完成通知后,把当前所有 reader 的消费 Offset,即 message id 以独立的 SubscriptionName 提交给 Pulsar broker,此时才会把消费 Offset 信息真正记录下来。

 

Offset Commit 完成后,Pulsar broker 会将 Offset 信息(在 Pulsar 中以 Cursor 表示)存储到底层的分布式存储系统 BookKeeper 中,这样做的好处是当 Flink 任务重启后,会有两层恢复保障。第一种情况是从 checkpoint 恢复:可以直接从 checkpoint 里获得上一次消费的 message id,通过这个 message id 获取数据,这个数据流就能继续消费。如果没有从 checkpoint 恢复,Flink 任务重启后,会根据 SubscriptionName 从 Pulsar 中获取上一次 Commit 对应的 Offset 位置开始消费。这样就能有效防止 checkpoint 损坏导致整个 Flink 任务无法成功启动的问题。

 

Checkpoint 流程如下图所示。

 

使用Flink 与 Pulsar 打造实时消息系统


先做 checkpoint N,完成后发布一次 notify Checkpoint Complete,等待一定时间间隔后,接下来做 checkpoint N+1,完成后也会进行一次 notify Checkpoint Complete 操作,此时把 Durable Cursor 进行一次 Commit,最终 Commit 到 Pulsar topic 的服务端上,这样能确保 checkpoint 的 exactly-once,也能根据自己设定的 subscription 保证 message “keep alive”。


Topic/Partition Discovery 要解决什么问题呢?当 Flink 任务消费 topic 时,如果 Topic 增加分区,Flink 任务需要能够自动发现分区。Pulsar Flink Connector 如何实现这一点呢?订阅 topic 分区的 reader 之间相互独立,每个 task manager 包含多个 reader thread,根据哈希函数把单个 task manager 中包含的 topic 分区映射过来,topic 中新增分区时,新加入的分区会映射到某个 task manager 上,task manager 发现新增分区后,会创建一个 reader,消费掉新数据。用户可以通过设置 `partition.discovery.interval-millis` 参数,调配检测频率。

 

使用Flink 与 Pulsar 打造实时消息系统


为了降低 Flink 消费 Pulsar topic 的门槛,让 Pulsar Flink Connector 支持更加丰富的 Flink 新特性,BIGO 消息队列团队为 Pulsar Flink Connector 增加了 Pulsar Flink SQL DDL(Data Definition Language,数据定义语言) 和 Flink 1.11 支持。此前官方提供的 Pulsar Flink SQL 只支持 Catalog,要想通过 DDL 形式消费、处理 Pulsar topic 中的数据不太方便。在 BIGO 场景中,大部分 topic 数据都以 JSON 格式存储,而 JSON 的 schema 没有提前注册,所以只能在 Flink SQL 中指定 topic 的 DDL 后才可以消费。针对这种场景,BIGO 基于 Pulsar Flink Connector 做了二次开发,提供了通过 Pulsar Flink SQL DDL 形式消费、解析、处理 Pulsar topic 数据的代码框架(如下图所示)。

 

使用Flink 与 Pulsar 打造实时消息系统


左边的代码中,第一步是配置 Pulsar topic 的消费,首先指定 topic 的 DDL 形式,比如 rip、rtime、uid 等,下面是消费 Pulsar topic 的基础配置,比如 topic 名称、service-url、admin-url 等。底层 reader 读到消息后,会根据 DDL 解出消息,将数据存储在 test_flink_sql 表中。第二步是常规逻辑处理(如对表进行字段抽取、做 join 等),得出相关统计信息或其他相关结果后,返回这些结果,写到 HDFS 或其他系统上等。第三步,提取相应字段,将其插入一张 hive 表。由于 Flink 1.11 对 hive 的写入支持比 1.9.1 更加优秀,所以 BIGO 又做了一次 API 兼容和版本升级,使 Pulsar Flink Connector 支持 Flink 1.11。


BIGO 基于 Pulsar 和 Flink 构建的实时流平台主要用于实时 ETL 处理场景和 AB-test 场景。


实时 ETL 处理场景

实时 ETL 处理场景主要运用 Pulsar Flink Source 及 Pulsar Flink Sink。这个场景中,Pulsar topic 实现几百甚至上千个 topic,每个 topic 都有独立的 schema。我们需要对成百上千个 topic 进行常规处理,如字段转换、容错处理、写入 HDFS 等。每个 topic 都对应 HDFS 上的一张表,成百上千个 topic 会在 HDFS 上映射成百上千张表,每张表的字段都不一样,这就是我们遇到的实时 ETL 场景。

 

这种场景的难点在于 topic 数量多。如果每个 topic 维护一个 Flink 任务,维护成本太高。之前我们想通过 HDFS Sink Connector 把 Pulsar topic 中的数据直接 sink 到 HDFS 上,但处理里面的逻辑却很麻烦。最终我们决定使用一个或多个 Flink 任务去消费成百上千个 topic,每个 topic 配自己的 schema,直接用 reader 来订阅所有 topic,进行 schema 解析后处理,将处理后的数据写到 HDFS 上。

 

随着程序运行,我们发现这种方案也存在问题:算子之间压力不均衡。因为有些 topic 流量大,有些流量小,如果完全通过随机哈希的方式映射到对应的 task manager 上去,有些 task manager 处理的流量会很高,而有些 task manager 处理的流量很低,导致有些 task 机器上积塞非常严重,拖慢 Flink 流的处理。所以我们引入了 slot group 概念,根据每个 topic 的流量情况进行分组,流量会映射到 topic 的分区数,在创建 topic 分区时也以流量为依据,如果流量很高,就多为 topic 创建分区,反之少一些。分组时,把流量小的 topic 分到一个 group 中,把流量大的 topic 单独放在一个 group 中,很好地隔离了资源,保证 task manager 总体上流量均衡。

 

使用Flink 与 Pulsar 打造实时消息系统



AB-test 场景 

实时数仓需要提供小时表或天表为数据分析师及推荐算法工程师提供数据查询服务,简单来讲就是 app 应用中会有很多打点,各种类型的打点会上报到服务端。如果直接暴露原始打点给业务方,不同的业务使用方就需要访问各种不同的原始表从不同维度进行数据抽取,并在表之间进行关联计算。频繁对底层基础表进行数据抽取和关联操作会严重浪费计算资源,所以我们提前从基础表中抽取用户关心的维度,将多个打点合并在一起,构成一张或多张宽表,覆盖上面推荐相关的或数据分析相关的 80% ~ 90% 场景任务。

 

在实时数仓场景下还需实时中间表,我们的解决方案是,针对 topic A 到 topic K ,我们使用 Pulsar Flink SQL 将消费到的数据解析成相应的表。通常情况下,将多张表聚合成一张表的常用做法是使用 join,如把表 A 到 K 按照 uid 进行 join 操作,形成非常宽的宽表;但在 Flink SQL 中 join 多张宽表效率较低。所以 BIGO 使用 union 来替代 join,做成很宽的视图,以小时为单位返回视图,写入 ClickHouse,提供给下游的业务方实时查询。使用 union 来替代 join 加速表的聚合,能够把小时级别的中间表产出控制在分钟级别。

 


输出天表可能还需要 join 存放在 hive 上的表或其他存储介质上的离线表,即流表和离线表之间 join 的问题。如果直接 join,checkpoint 中需要存储的中间状态会比较大,所以我们在另外一个维度上做了优化。

 

左侧部分类似于小时表,每个 topic 使用 Pulsar Flink SQL 消费并转换成对应的表,表之间进行 union 操作,将 union 得到的表以天为单位输入到 HBase(此处引入 HBase 是为了做替代它的 join)。

 

右侧需要 join 离线数据,使用 Spark 聚合离线的 Hive 表(如表 a1、a2、a3),聚合后的数据会通过精心设计的 row-key 写入 HBase 中。数据聚合后状态如下:假设左边数据的 key 填了宽表的前 80 列,后面 Spark 任务算出的数据对应同样一个 key,填上宽表的后 20 列,在 HBase 中组成一张很大的宽表,把最终数据再次从 HBase 抽出,写入 ClickHouse,供上层用户查询,这就是 AB-test 的主体架构。

 


业务收益

从 2020 年 5 月上线至今,Pulsar 运行稳定,日均处理消息数百亿,字节入流量为 2~3 GB/s。Apache Pulsar 提供的高吞吐、低延迟、高可靠性等特性极大提高了 BIGO 消息处理能力,降低了消息队列运维成本,节约了近 50% 的硬件成本。目前,我们在几十台物理主机上部署了上百个 Pulsar broker 和 bookie 进程,采用 bookie 和 broker 在同一个节点的混部模式,已经把 ETL 从 Kafka 迁移到 Pulsar,并逐步将生产环境中消费 Kafka 集群的业务(比如 Flink、Flink SQL、ClickHouse 等)迁移到 Pulsar 上。随着更多业务的迁移,Pulsar 上的流量会持续上涨。

 

我们的 ETL 任务有一万多个 topic,每个 topic 平均有 3 个分区,使用 3 副本的存储策略。之前使用 Kafka,随着分区数增加,磁盘由顺序读写逐渐退化为随机读写,读写性能退化严重。Apache Pulsar 的存储分层设计能够轻松支持百万 topic,为我们的 ETL 场景提供了优雅支持。

 


未来展望

 

BIGO 在 Pulsar broker 负载均衡、broker cache 命中率优化、broker 相关监控、BookKeeper 读写性能优、BookKeeper 磁盘 IO 性能优化、Pulsar 与 Flink、Pulsar 与 Flink SQL 结合等方面做了大量工作,提升了 Pulsar 的稳定性和吞吐,也降低了 Flink 与 Pulsar 结合的门槛,为 Pulsar 的推广打下了坚实基础。

 

未来,我们会增加 Pulsar 在 BIGO 的场景应用,帮助社区进一步优化、完善 Pulsar 功能,具体如下:

1、为 Apache Pulsar 研发新特性,比如支持 topic policy 相关特性。

2迁移更多任务到 Pulsar。这项工作涉及两方面,一是迁移之前使用 Kafka 的任务到 Pulsar。二是新业务直接接入 Pulsar。

3BIGO 准备使用 KoP 来保证数据迁移平滑过渡。因为 BIGO 有大量消费 Kafka 集群的 Flink 任务,我们希望能够直接在 Pulsar 中做一层 KoP,简化迁移流程。

4对 Pulsar 及 BookKeeper 持续进行性能优化。由于生产环境中流量较高,BIGO 对系统的可靠性和稳定性要求较高。

5持续优化 BookKeeper 的 IO 协议栈。Pulsar 的底层存储本身是 IO 密集型系统,保证底层 IO 高吞吐,才能够提升上层吞吐,保证性能稳定。



推荐阅读
  • 本文详细介绍了Java代码分层的基本概念和常见分层模式,特别是MVC模式。同时探讨了不同项目需求下的分层策略,帮助读者更好地理解和应用Java分层思想。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
  • 以Flink为例,消除流处理常见的六大谬见
    以Flink为例,消除流处理常见的六大谬见 ... [详细]
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • B站服务器故障影响豆瓣评分?别担心,阿里巴巴架构师分享预防策略与技术方案
    13日晚上,在视频观看高峰时段,B站出现了服务器故障,引发网友在各大平台上的广泛吐槽。这一事件导致了连锁反应,大量用户纷纷涌入A站、豆瓣和晋江等平台,给这些网站带来了突如其来的流量压力。为了防止类似问题的发生,阿里巴巴架构师分享了一系列预防策略和技术方案,包括负载均衡、弹性伸缩和容灾备份等措施,以确保系统的稳定性和可靠性。 ... [详细]
  • 本文深入探讨了NoSQL数据库的四大主要类型:键值对存储、文档存储、列式存储和图数据库。NoSQL(Not Only SQL)是指一系列非关系型数据库系统,它们不依赖于固定模式的数据存储方式,能够灵活处理大规模、高并发的数据需求。键值对存储适用于简单的数据结构;文档存储支持复杂的数据对象;列式存储优化了大数据量的读写性能;而图数据库则擅长处理复杂的关系网络。每种类型的NoSQL数据库都有其独特的优势和应用场景,本文将详细分析它们的特点及应用实例。 ... [详细]
  • 2021年Java开发实战:当前时间戳转换方法详解与实用网址推荐
    在当前的就业市场中,金九银十过后,金三银四也即将到来。本文将分享一些实用的面试技巧和题目,特别是针对正在寻找新工作机会的Java开发者。作者在准备字节跳动的面试过程中积累了丰富的经验,并成功获得了Offer。文中详细介绍了如何将当前时间戳进行转换的方法,并推荐了一些实用的在线资源,帮助读者更好地应对技术面试。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 美团优选推荐系统架构师 L7/L8:算法与工程深度融合 ... [详细]
  • 2020年9月15日,Oracle正式发布了最新的JDK 15版本。本次更新带来了许多新特性,包括隐藏类、EdDSA签名算法、模式匹配、记录类、封闭类和文本块等。 ... [详细]
  • MongoDB核心概念与基础知识解析
    MongoDB 是一种基于分布式文件存储的非关系型数据库系统,主要采用 C++ 语言开发。本文将详细介绍 MongoDB 的核心概念和基础知识,包括其与传统 SQL 数据库的区别,数据库及集合的基本操作,如数据的插入、更新、删除和查询等。通过本文,读者可以全面了解 MongoDB 的基本功能及其应用场景。 ... [详细]
author-avatar
手机用户2502928867
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有