热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

DataX

1.什么是DataX?DataX是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FT

1.什么是DataX

?       DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

2. DataX的设计

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

                                       技术图片

3.  框架设计

                                 技术图片

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。

Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

4.  运行原理

                                     技术图片

1)  DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

2)  DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

3)  切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

4)  每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

5)  DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

 5.快速入门

  前置要求
    - Linux
    - JDK(1.8以上,推荐1.8)
    - Python(推荐Python2.6.X)
   安装
    1)将下载好的datax.tar.gz上传到hadoop101的/opt/software
      [kris@hadoop101 software]$ ls
        datax.tar.gz
    2)解压datax.tar.gz到/opt/module
      [kris@hadoop101 software]$ tar -zxvf datax.tar.gz -C /opt/module/
    3)运行自检脚本
      [kris@hadoop101 bin]$ cd /opt/module/datax/bin/
      [kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json

[kris@hadoop101 bin]$ python datax.py /opt/module/datax/job/job.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2019-07-14 07:51:25.661 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2019-07-14 07:51:25.676 [main] INFO  Engine - the machine info  => 

        osInfo: Oracle Corporation 1.8 25.144-b01
        jvmInfo:        Linux amd64 2.6.32-642.el6.x86_64
        cpu num:        8

        totalPhysicalMemory:    -0.00G
        freePhysicalMemory:     -0.00G
        maxFileDescriptorCount: -1
        currentOpenFileDescriptorCount: -1

        GC Names        [PS MarkSweep, PS Scavenge]

        MEMORY_NAME                    | allocation_size                | init_size                      
        PS Eden Space                  | 256.00MB                       | 256.00MB                       
        Code Cache                     | 240.00MB                       | 2.44MB                         
        Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
        PS Survivor Space              | 42.50MB                        | 42.50MB                        
        PS Old Gen                     | 683.00MB                       | 683.00MB                       
        Metaspace                      | -0.00MB                        | 0.00MB                         


2019-07-14 07:51:25.708 [main] INFO  Engine - 
{
        "content":[
                {
                        "reader":{
                                "name":"streamreader",
                                "parameter":{
                                        "column":[
                                                {
                                                        "type":"string",
                                                        "value":"DataX"
                                                },
                                                {
                                                        "type":"long",
                                                        "value":19890604
                                                },
                                                {
                                                        "type":"date",
                                                        "value":"1989-06-04 00:00:00"
                                                },
                                                {
                                                        "type":"bool",
                                                        "value":true
                                                },
                                                {
                                                        "type":"bytes",
                                                        "value":"test"
                                                }
                                        ],
                                        "sliceRecordCount":100000
                                }
                        },
                        "writer":{
                                "name":"streamwriter",
                                "parameter":{
                                        "encoding":"UTF-8",
                                        "print":false
                                }
                        }
                }
        ],
        "setting":{
                "errorLimit":{
                        "percentage":0.02,
                        "record":0
                },
                "speed":{
                        "byte":10485760
                }
        }
}


2019-07-14 07:51:35.892 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2019-07-14 07:51:35.896 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
                 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2019-07-14 07:51:35.897 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 07:51:35.898 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.018s |  All Task WaitReaderTime 0.047s | Percentage 100.00%
2019-07-14 07:51:35.899 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 07:51:25
任务结束时刻                    : 2019-07-14 07:51:35
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

6. 使用案例

①从stream流读取数据并打印到控制台

1)查看配置模板

[kris@hadoop101 bin]$ python datax.py -r streamreader -w streamwriter
技术图片技术图片
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

Please refer to the streamreader document:
     https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md 

Please refer to the streamwriter document:
     https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [], 
                        "sliceRecordCount": ""
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

②从MySQL的导入和导出

读取MySQL中的数据存放到HDFS

    查看官方模板

技术图片技术图片
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "username": "", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

技术图片

技术图片

[kris@hadoop101 job]$ vim mysql2hdfs 
技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://hadoop101:3306/test"
                                ], 
                                "table": [
                                    "stu1"
                                ]
                            }
                        ], 
                        "username": "root", 
                        "password": "123456"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            }
                        ],  
                        "defaultFS": "hdfs://hadoop101:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "student.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "2"
            }
        }
    }
}
View Code
2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - 
         [total cpu info] => 
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
                -1.00%                         | -1.00%                         | -1.00%
                        

         [total gc info] => 
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.071s             | 0.071s             | 0.071s             
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.072s             | 0.072s             | 0.072s             

2019-07-14 09:02:55.924 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-07-14 09:02:55.925 [job-0] INFO  StandAloneJobContainerCommunicator - Total 3 records, 30 bytes | Speed 3B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2019-07-14 09:02:55.927 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2019-07-14 09:02:43
任务结束时刻                    : 2019-07-14 09:02:55
任务总计耗时                    :                 12s
任务平均流量                    :                3B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0

技术图片

注意:HdfsWriter实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。

[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter  
技术图片技术图片
[kris@hadoop101 job]$ python /opt/module/datax/bin/datax.py -r mongodbreader -w hdfswriter     

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mongodbreader document:
     https://github.com/alibaba/DataX/blob/master/mongodbreader/doc/mongodbreader.md 

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md 
 
Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": [], 
                        "collectionName": "", 
                        "column": [], 
                        "dbName": "", 
                        "userName": "", 
                        "userPassword": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [], 
                        "compress": "", 
                        "defaultFS": "", 
                        "fieldDelimiter": "", 
                        "fileName": "", 
                        "fileType": "", 
                        "path": "", 
                        "writeMode": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}
View Code

读取HDFS数据写入MySQL

将上个案例上传的文件改名

  [kris@hadoop101 datax]$ hadoop fs -mv /student.txt* /student.txt

查看官方模板

  [kris@hadoop101 datax]$ python bin/datax.py -r hdfsreader -w mysqlwriter

创建配置文件

技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                        "column": ["*"], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "encoding": "UTF-8", 
                        "fieldDelimiter": "\t", 
                        "fileType": "text", 
                        "path": "/student.txt"
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                            "id",
                            "name"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://hadoop102:3306/datax", 
                                "table": ["student2"]
                            }
                        ], 
                        "password": "000000", 
                        "username": "root", 
                        "writeMode": "insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

在MySQL的datax数据库中创建student2

  mysql> use test;

  mysql> create table stu1(id int,name varchar(20));

执行任务

  [kris@hadoop101 datax]$ bin/datax.py job/hdfs2mysql.json

③DataX导入导出案例

读取MongoDB的数据导入到HDFS

编写配置文件

[kris@hadoop101 datax]$ vim job/mongdb2hdfs.json
技术图片技术图片
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mongodbreader", 
                    "parameter": {
                        "address": ["127.0.0.1:27017"], 
                        "collectionName": "atguigu", 
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "dbName": "test", 
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"name",
                                "type":"string"
                            },
                            {
                                "name":"url",
                                "type":"string"
                            }
                        ], 
                        "defaultFS": "hdfs://hadoop102:9000", 
                        "fieldDelimiter": "\t", 
                        "fileName": "mongo.txt", 
                        "fileType": "text", 
                        "path": "/", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
View Code

mongodbreader参数解析

  address: MongoDB的数据地址信息,因为MonogDB可能是个集群,则ip端口信息需要以Json数组的形式给出。【必填】

  userName:MongoDB的用户名。【选填】

  userPassword: MongoDB的密码。【选填】

  collectionName: MonogoDB的集合名。【必填】

  column:MongoDB的文档列名。【必填】

  name:Column的名字。【必填】

  type:Column的类型。【选填】

  splitter:因为MongoDB支持数组类型,但是Datax框架本身不支持数组类型,所以mongoDB读出来的数组类型要通过这个分隔符合并成字符串。【选填】

执行
[kris@hadoop101 datax]$ bin/datax.py job/mongdb2hdfs.json

读取MongoDB的数据导入MySQL

  在MySQL中创建表 mysql> create table kris(name varchar(20),url varchar(20));

编写DataX配置文件

  [kris@hadoop101 datax]$ vim job/mongodb2mysql.json

DataX


推荐阅读
  • 本文介绍了一种支付平台异步风控系统的架构模型,旨在为开发类似系统的工程师提供参考。 ... [详细]
  • 本文介绍了多种开源数据库及其核心数据结构和算法,包括MySQL的B+树、MVCC和WAL,MongoDB的tokuDB和cola,boltDB的追加仅树和mmap,levelDB的LSM树,以及内存缓存中的一致性哈希。 ... [详细]
  • MySQL 数据库连接方法
    本文介绍了如何使用 MySQL 命令行工具连接到指定的数据库。 ... [详细]
  • importpymysql#一、直接连接mysql数据库'''coonpymysql.connect(host'192.168.*.*',u ... [详细]
  • 本文介绍了Java编程语言的基础知识,包括其历史背景、主要特性以及如何安装和配置JDK。此外,还详细讲解了如何编写和运行第一个Java程序,并简要介绍了Eclipse集成开发环境的安装和使用。 ... [详细]
  • Bootstrap 缩略图展示示例
    本文将展示如何使用 Bootstrap 实现缩略图效果,并提供详细的代码示例。 ... [详细]
  • 网络爬虫的规范与限制
    本文探讨了网络爬虫引发的问题及其解决方案,重点介绍了Robots协议的作用和使用方法,旨在为网络爬虫的合理使用提供指导。 ... [详细]
  • [c++基础]STL
    cppfig15_10.cppincludeincludeusingnamespacestd;templatevoidprintVector(constvector&integer ... [详细]
  • ZooKeeper 入门指南
    本文将详细介绍ZooKeeper的工作机制、特点、数据结构以及常见的应用场景,包括统一命名服务、统一配置管理、统一集群管理、服务器动态上下线和软负载均衡。 ... [详细]
  • 自动验证时页面显示问题的解决方法
    在使用自动验证功能时,页面未能正确显示错误信息。通过使用 `dump($info->getError())` 可以帮助诊断和解决问题。 ... [详细]
  • DAO(Data Access Object)模式是一种用于抽象和封装所有对数据库或其他持久化机制访问的方法,它通过提供一个统一的接口来隐藏底层数据访问的复杂性。 ... [详细]
  • 本文详细介绍了如何解决DNS服务器配置转发无法解析的问题,包括编辑主配置文件和重启域名服务的具体步骤。 ... [详细]
  • 数字资产量化交易通过大数据分析,以客观的方式制定交易决策,有效减少人为的主观判断和情绪影响。本文介绍了几种常见的数字资产量化交易策略,包括搬砖套利和趋势交易,并探讨了量化交易软件的开发前景。 ... [详细]
  • 网站访问全流程解析
    本文详细介绍了从用户在浏览器中输入一个域名(如www.yy.com)到页面完全展示的整个过程,包括DNS解析、TCP连接、请求响应等多个步骤。 ... [详细]
  • 自定义滚动条美化页面内容
    当页面内容超出显示范围时,为了提升用户体验和页面美观,通常会添加滚动条。如果默认的浏览器滚动条无法满足设计需求,我们可以自定义一个符合要求的滚动条。本文将详细介绍自定义滚动条的实现过程。 ... [详细]
author-avatar
手机用户2502936521
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有