热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

支付宝资深架构师的分布式追踪&APM系统SkyWalking源码分析—DataCarrier异步处理库

1. 概述

本文主要分享 SkyWalking DataCarrier 异步处理库

基于生产者消费者的模式,大体结构如下图:

支付宝资深架构师的分布式追踪 & APM 系统 SkyWalking 源码分析— DataCarrier 异步处理库

实际项目中,没有 Producer 这个类。所以本文提到的 Producer ,更多的是一种角色

下面我们来看看整体的项目结构,如下图所示 :

支付宝资深架构师的分布式追踪 & APM 系统 SkyWalking 源码分析— DataCarrier 异步处理库

2. buffer

org.skywalking.apm.commons.datacarrier.buffer 包,主要包含 Channels 、Buffer 两个类。Channels 是 Buffer 数组的封装。

2.1 Buffer

具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加群。在公司待久了,过得很安逸,
但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加群。如果没有工作经验,但基础非常扎实,对java工作
机制,常用设计思想,常用java开发框架掌握熟练的可以加群。java架构群:582505643一起交流。

org.skywalking.apm.commons.datacarrier.buffer.Buffer ,缓存区。

buffer 属性,缓冲数组。Producer 保存的数据到 buffer 里。

strategy ,缓冲策略( org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy ) 。

index 属性,递增位置( org.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger )。

Buffer 在保存数据时,把 buffer 作为一个 ““,使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。通过这样的方式,带来良好的存储性能,避免扩容问题。But ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理。整体流程见 #save(data) 方法。

当 Buffer 被 Consumer 消费时,被调用 #obtain(start, end) 方法,获得数据并清空。为什么会带 start 、end 方法参数呢?下文揭晓答案。

2.2 Channels

org.skywalking.apm.commons.datacarrier.buffer.Channels ,内嵌多个 Buffer 的通道。

bufferChannels 属性,Buffer 数组。

dataPartitioner 属性,数据分区( org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner )。

strategy 属性,缓冲策略( org.skywalking.apm.commons.datacarrier.buffer.BufferStrategy ) 。

Channels 在保存数据时,相比 Buffer ,从 buffer 变成了多 buffer ,因此需要先选一个 buffer 。通过使用不同的 IDataPartitioner 实现类,进行 Buffer 的选择。当缓冲策略为 BufferStrategy.IF_POSSIBLE 时,根据 IDataPartitioner 定义的重试次数,进行多次保存数据直到成功。整体流程见 #save(data) 方法。

3. partition

org.skywalking.apm.commons.datacarrier.partition.IDataPartitioner ,数据分配者接口。定义了如下方法:

#partition(total, data) 接口方法,获得数据被分配的分区位置。

#maxRetryCount() 接口方法,获得最大重试次数。

IDataPartitioner 目前有两个子类实现:

ProducerThreadPartitioner ,基于线程编号分配策略的数据分配者实现类。

SimpleRollingPartitioner ,基于顺序分配策略的数据分配者实现类。

4. consumer

org.skywalking.apm.commons.datacarrier.consumer 包,主要包含 ConsumerPool 、ConsumerThread 、IConsumer 三个类。

ConsumerThread 使用 IConsumer ,消费数据

ConsumerPool 是 ConsumerThread 的线程池封装

4.1 IConsumer

org.skywalking.apm.commons.datacarrier.consumer.IConsumer ,消费者接口。定义了如下方法:

#init() 接口方法,初始化消费者。

#consume(List) 接口方法,批量消费消息。

#onError(List, Throwable) 接口方法,处理当消费发生异常。

#onExit() 接口方法,处理当消费结束。此处的结束时,ConsumerThread 关闭。

我们在使用时,自定义 Consumer 类,实现 IConsumer 接口。例如:RemoteMessageConsumer 。

4.2 ConsumerThread

org.skywalking.apm.commons.datacarrier.consumer.ConsumerThread ,继承 java.lang.Thread ,消费线程。

running 属性,是否运行中。

consumer 属性,消费者对象。

dataSources 属性,消费消息的数据源( DataSource )数组。一个 ConsumerThread ,可以消费多个 Buffer ,并且单个 Buffer 消费的分区范围可配置,即一个 Buffer 可以被多个 ConsumerThread 同时无冲突的消费。在 「4.3 ConsumerPool」详细解析 ConsumerThread 分配 Buffer 的方式。

#addDataSource(sourceBuffer, start, end) 方法,添加 Buffer 部分范围。

#addDataSource(sourceBuffer) 方法,添加 Buffer 全部范围。

#run() 实现方法,不断批量的消费数据。代码如下:

第 78 至 88 行:不断消费,直到线程关闭( #shutdown() )。

第 80 行:调用 #consume() 方法,批量消费数据。

第 82 至 87 行:当未消费到数据,说明 dataSources 为空,等待 20 ms ,避免 CPU 空跑。

第 93 行:当线程关闭,调用 #consume() 方法,消费完 dataSources 剩余的数据。

第 95 行:调用 IConsumer#onExit() 方法,处理当消费结束。

#consume() 方法,批量消费数据。代码如下:

第 107 至 117 行:从 dataSources 中,获取要消费的数据。

第 120 至 126 行:当有数据可消费时,调用 IConsumer#consume(List) 方法。当消费发生异常时,调用IConsumer#onError(List, Throwable) 方法。

第 127 行:返回是否有消费数据。

4.3 ConsumerPool

org.skywalking.apm.commons.datacarrier.consumer.ConsumerPool ,消费者池,提供了对 Channels 启动指定数量的 ConsumerThread 进行消费。

running 属性,是否运行中。

consumerThreads 属性,ConsumerThread 数组,通过构造方法的 num 参数进行指定。

channels 属性,数据通道。

lock 属性,锁。保证 ConsumerPool 启动或关闭时的线程安全。

#begin() 方法,启动 ConsumerPool ,进行数据消费。代码如下:

第 97 至 99 行:正在运行中,直接返回。

第 101 行:获得锁。

第 104 行:调用 #allocateBuffer2Thread() 方法,将 channels 的多个 Buffer ,分配给 consumerThreads 的多个ConsumerThread。

第 107 至 109 行:启动每个 ConsumerThread ,开始消费。

第 112 行:标记正在运行中。

第 114 行:释放锁。

close() 方法,关闭 ConsumerPool 。代码如下:

第 168 行:获得锁。

第 169 行:标记不在运行中。

第 170 至 172 行:关闭每个 ConsumerThread ,结束消费。

第 174 行:释放锁。

#allocateBuffer2Thread() 方法,将 channels 的多个 Buffer ,分配给 consumerThreads 的多个 ConsumerThread。一共会有三种情况:

Buffer 数量等于 ConsumerThread 数量,这个十分好分配,一比一。

Buffer 数量大于 ConsumerThread 数量,那么按照 Buffer 数量 % ConsumerThread 数量进行分组,分配给 ConsumerThread ,如下图所示:

支付宝资深架构师的分布式追踪 & APM 系统 SkyWalking 源码分析— DataCarrier 异步处理库

Buffer 数量大于 ConsumerThread 数量,那么按照 ConsumerThread 数量 % Buffer 数量进行分组,分配给 Buffer 。其中,一个 Buffer 会被均分给多个 ConsumerThread ,如下图所示:

支付宝资深架构师的分布式追踪 & APM 系统 SkyWalking 源码分析— DataCarrier 异步处理库

这个就是为什么 Buffer 里面,提供了 Buffer#obtain(start, end) 方法的原因。

4. DataCarrier

具有1-5工作经验的,面对目前流行的技术不知从何下手,需要突破技术瓶颈的可以加群。在公司待久了,过得很安逸,
但跳槽时面试碰壁。需要在短时间内进修、跳槽拿高薪的可以加群。如果没有工作经验,但基础非常扎实,对java工作
机制,常用设计思想,常用java开发框架掌握熟练的可以加群。java架构群:582505643一起交流。

org.skywalking.apm.commons.datacarrier.DataCarrier ,DataCarrier 异步处理库的入口程序。通过创建 DataCarrier 对象,使用生产者消费者的模式,执行异步执行逻辑。

构造方法 ,代码如下:

channels 属性,数据通道。在构造方法中,我们可以看到默认使用 SimpleRollingPartitioner 作为数据分区分配者,使用BufferStrategy.BLOCKING 作为缓冲策略。

#setPartitioner(IDataPartitioner) 方法,设置数据分区分配者。

#setBufferStrategy(BufferStrategy) 方法,设置缓冲策略。

channelSize 方法参数,通道大小。

bufferSize 方法参数,缓冲区大小。

设置消费者和消费线程数量

#consume(Class>, num)

#consume(IConsumer, num)

生产消息

#produce(data)

关闭消费

#shutdownConsumers()


推荐阅读
  • 2018年人工智能大数据的爆发,学Java还是Python?
    本文介绍了2018年人工智能大数据的爆发以及学习Java和Python的相关知识。在人工智能和大数据时代,Java和Python这两门编程语言都很优秀且火爆。选择学习哪门语言要根据个人兴趣爱好来决定。Python是一门拥有简洁语法的高级编程语言,容易上手。其特色之一是强制使用空白符作为语句缩进,使得新手可以快速上手。目前,Python在人工智能领域有着广泛的应用。如果对Java、Python或大数据感兴趣,欢迎加入qq群458345782。 ... [详细]
  • Oracle分析函数first_value()和last_value()的用法及原理
    本文介绍了Oracle分析函数first_value()和last_value()的用法和原理,以及在查询销售记录日期和部门中的应用。通过示例和解释,详细说明了first_value()和last_value()的功能和不同之处。同时,对于last_value()的结果出现不一样的情况进行了解释,并提供了理解last_value()默认统计范围的方法。该文对于使用Oracle分析函数的开发人员和数据库管理员具有参考价值。 ... [详细]
  • 本文介绍了贝叶斯垃圾邮件分类的机器学习代码,代码来源于https://www.cnblogs.com/huangyc/p/10327209.html,并对代码进行了简介。朴素贝叶斯分类器训练函数包括求p(Ci)和基于词汇表的p(w|Ci)。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • 也就是|小窗_卷积的特征提取与参数计算
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了卷积的特征提取与参数计算相关的知识,希望对你有一定的参考价值。Dense和Conv2D根本区别在于,Den ... [详细]
  • CentOS 7部署KVM虚拟化环境之一架构介绍
    本文介绍了CentOS 7部署KVM虚拟化环境的架构,详细解释了虚拟化技术的概念和原理,包括全虚拟化和半虚拟化。同时介绍了虚拟机的概念和虚拟化软件的作用。 ... [详细]
  • Day2列表、字典、集合操作详解
    本文详细介绍了列表、字典、集合的操作方法,包括定义列表、访问列表元素、字符串操作、字典操作、集合操作、文件操作、字符编码与转码等内容。内容详实,适合初学者参考。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • Java和JavaScript是什么关系?java跟javaScript都是编程语言,只是java跟javaScript没有什么太大关系,一个是脚本语言(前端语言),一个是面向对象 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • EPPlus绘制刻度线的方法及示例代码
    本文介绍了使用EPPlus绘制刻度线的方法,并提供了示例代码。通过ExcelPackage类和List对象,可以实现在Excel中绘制刻度线的功能。具体的方法和示例代码在文章中进行了详细的介绍和演示。 ... [详细]
  • Python使用Pillow包生成验证码图片的方法
    本文介绍了使用Python中的Pillow包生成验证码图片的方法。通过随机生成数字和符号,并添加干扰象素,生成一幅验证码图片。需要配置好Python环境,并安装Pillow库。代码实现包括导入Pillow包和随机模块,定义随机生成字母、数字和字体颜色的函数。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
author-avatar
迷路的小孩w
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有