热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Datax数据统计

DataX所有的统计信息都会保存到Communication类里面。Communication支持下列数据的统计计数器,比如读取的字节速度,写入成功的数据条数

DataX所有的统计信息都会保存到Communication类里面。Communication支持下列数据的统计

  • 计数器,比如读取的字节速度,写入成功的数据条数
  • 统计的时间点
  • 字符串类型的消息
  • 执行时的异常
  • 执行的状态, 比如成功或失败

Communication有下列属性保存统计数据

// 计数器
private Map counter;

// 执行状态
private State state;

// 异常记录 
private Throwable throwable;

//在哪个时间点统计数据
private long timestamp;

// 消息集合
Map> message;

如果需要汇总多个Communication的数据,Communication提供了mergeFrom方法。根据不同的数据类型,对应着不同的操作

  • 计数器类型,相同的key的数值累加
  • 合并异常,当自身的异常为null,才合并别的异常
  • 合并状态,如果有任意一个的状态失败了,那么返回失败的状态。如果* * 有任意一个的状态正在运行,那么返回正在运行的状态
  • 合并消息, 相同的key的消息添加到同一个列表
Communication管理

对于每个task组都有一个单独的Communication,用来存储这个组的统计数据。对于这些Communication,LocalTGCommunicationManager类实现了集中管理。接下来看看LocalTGCommunicationManager的原理。

LocalTGCommunicationManager有个重要的属性 taskGroupCommunicationMap, 它是一个Map,保存了每个task组的统计数据。

public final class LocalTGCommunicationManager {
    // Key为task group id, Value为对应的Communication
    private static Map taskGroupCommunicatiOnMap=
        new ConcurrentHashMap();
}

当task组在初始化的时候,都会向LocalTGCommunicationManager这里注册。

// 这里只是简单保存到taskGroupCommunicationMap变量里
public static void registerTaskGroupCommunication(
        int taskGroupId, Communication communication) {
    taskGroupCommunicationMap.put(taskGroupId, communication);
}

当需要统计所有的数据时,getJobCommunication实现了这个功能

public static Communication getJobCommunication() {
    // 初始一个新的Communication,然后更新它的数据
    Communication communication = new Communication();
    communication.setState(State.SUCCEEDED);
    // 遍历所有任务的Communication, 调用mergeFrom合并统计结果
    for (Communication taskGroupCommunication :
            taskGroupCommunicationMap.values()) {
        communication.mergeFrom(taskGroupCommunication);
    }

    return communication;
}
注册Communication

AbstractScheduler会根据切分后的任务,为每个task组注册一个Communication。registerCommunication接收task配置列表,里面每个配置都包含了task group id。

Datax数据统计
image.png

task group里的每个task也会向其对应的task group进行注册:
this.containerCommunicator.registerCommunication(taskConfigs);

    public void registerTaskCommunication(List taskConfigurationList) {
        for (Configuration taskConfig : taskConfigurationList) {
            int taskId = taskConfig.getInt(CoreConstant.TASK_ID);
            this.taskCommunicationMap.put(taskId, new Communication());
        }
    }

更新统计数据

每个任务执行都会对应着Channel,Channel当每处理一条数据时,都会更新对应Communication的统计信息。

比如下面的pull方法是Writer从Channel拉取数据,每次pull的时候,都会调用statPull函数,会更新写入数据条数和字节数的信息。

public abstract class Channel {

    private Communication currentCommunication;
    
    public Record pull() {
        Record record = this.doPull();
        this.statPull(1L, record.getByteSize());
        return record;
    }

    private void statPull(long recordSize, long byteSize) {
        currentCommunication.increaseCounter(
                CommunicationTool.WRITE_RECEIVED_RECORDS, recordSize);
        currentCommunication.increaseCounter(
                CommunicationTool.WRITE_RECEIVED_BYTES, byteSize);
    }

收集数据

  1. AbstractScheduler想统计汇总后的数据,需要调用AbstractContainerCommunicator的collect方法

  2. StandAloneJobContainerCommunicator继承AbstractContainerCommunicator,实现了collect方法,它会调用AbstractCollector的collectFromTaskGroup方法获取数据

  3. ProcessInnerCollector实现了AbstractCollector的collectFromTaskGroup方法,它会调用LocalTGCommunicationManager的getJobCommunication方法

  4. getJobCommunication方法会统计所有task的数据,然后返回。

Datax数据统计
image.png

可以简单理解为: 每个task先汇总到taskgroup维度, taskgroup汇总到job维度。


推荐阅读
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • Redis底层数据结构之压缩列表的介绍及实现原理
    本文介绍了Redis底层数据结构之压缩列表的概念、实现原理以及使用场景。压缩列表是Redis为了节约内存而开发的一种顺序数据结构,由特殊编码的连续内存块组成。文章详细解释了压缩列表的构成和各个属性的含义,以及如何通过指针来计算表尾节点的地址。压缩列表适用于列表键和哈希键中只包含少量小整数值和短字符串的情况。通过使用压缩列表,可以有效减少内存占用,提升Redis的性能。 ... [详细]
  • 本文讨论了在手机移动端如何使用HTML5和JavaScript实现视频上传并压缩视频质量,或者降低手机摄像头拍摄质量的问题。作者指出HTML5和JavaScript无法直接压缩视频,只能通过将视频传送到服务器端由后端进行压缩。对于控制相机拍摄质量,只有使用JAVA编写Android客户端才能实现压缩。此外,作者还解释了在交作业时使用zip格式压缩包导致CSS文件和图片音乐丢失的原因,并提供了解决方法。最后,作者还介绍了一个用于处理图片的类,可以实现图片剪裁处理和生成缩略图的功能。 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • Summarize function is doing alignment without timezone ?
    Hi.Imtryingtogetsummarizefrom00:00otfirstdayofthismonthametric, ... [详细]
  • JVM:33 如何查看JVM的Full GC日志
    1.示例代码packagecom.webcode;publicclassDemo4{publicstaticvoidmain(String[]args){byte[]arr ... [详细]
author-avatar
我可不是文章
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有