avatar


1.操作方法

概述

什么是DataX

DataX是阿里巴巴的开源的一款异构数据源离线同步工具。
所谓的异构数据源离线同步,即在关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间进行数据同步。

官网(GitHub项目地址):https://github.com/alibaba/DataX

整体思路

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路。

DataX的设计

DataX作为中间传输载体负责连接各种数据源。
当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能和已有的数据源进行同步。

框架设计

DataX目前已经有了比较全面的 插件体系 ,各类主流的数据源,都已经支持。

具体支持的数据源,可以参考如下地址中的Support Data Channels部分。

如图,是DataX的源码:

支持的数据源

通过源码,我们还会发现很多模块都是"reader"结尾,或者"writer"结尾。
这就是DataX的框架设计。

框架设计

  • Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework
  • Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:用于连接readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

运行原理

DataX的整个运行原理,如图所示:

运行原理

  • Job
    • DataX完成单个数据同步的作业,我们称之为Job
    • DataX在收到一个Job之后,将启动一个进程来完成整个作业同步过程。
    • DataX的Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算分为为多个子Task)、TaskGroup管理等功能。
  • Task
    • DataX启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。
    • Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • Schedule
    • DataX在将Job在切分多个Task之后,会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。
    • 每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • TaskGroup
    • 每一个Task都由TaskGroup负责启动。
    • Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
    • 等待所有TaskGroup任务完成后,DataX才会退出。

举个例子,假设现在有一个DataX作业,并且配置了20个并发,作业是将一个100张表从MySql同步到Impala中。DataX的调度决策思路是:

  1. 根据分表,将任务切分成了100个Task
  2. 因为有20个并发,所以DataX会创建4个TaskGroup
    单个TaskGroup的并发数量为5;因此20个并发,是4个TaskGroup
  3. 4个TaskGroup公平分配这100个Task

现在,我们问,第一个TaskGroup会负责几个Task
25个?
不一定,视Task任务的工作量,如果该TaskGroup所拿到的Task恰好都是工作量很小的,那么可能负责的Task的数量会多于25个。

(有些资料说,每一个TaskGroup会以5个并发共计运行25个,这是不准确的。)

通过下一章《2.源码概览》的讨论,我们能对该部分有更清晰的理解。

安装

下载地址

一定要通过官网(GitHub项目地址)的如下图所示之处,下载DataX。

下载地址

有些资料会建议从这里下载:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
实际上,从这里下载的,不是最新的版本。
我在上述地址下载的,只有814M,而且缺少了包括elasticsearchwriter在内的诸多内容。而从官网(Github项目地址)提供的链接,下载的,高达1.4G。而且是最新版。

准备工作

想要安装DataX,还需要如下的内容:

  • JDK(1.8及以上)
  • Python(2或3都可以)

安装步骤

  1. 将下载好的datax.tar.gz上传至Linux机器。
    例如,上传至Linux机器的/opt/module目录。
  2. 解压datax.tar.gz
    示例代码:
    1
    tar -zxvf datax.tar.gz -C /opt/module/
    解压之后,我们会看到很多文件夹。部分的解释如下:
    • bin:运行命令和脚本
    • conf:配置文件
    • job:同步脚本
    • lib:依赖,特指公共依赖,不包括数据库连接的依赖。
    • plugin:插件,包括数据库连接的依赖,不含公共依赖。
    • script:脚本

自检

自检方法

安装完成后,我们可以通过如下的命令自检。

1
python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

如果运行结果如下,说明自检通过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2022-11-30 16:13:19.563 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.UnixOperatingSystem
2022-11-30 16:13:19.572 [main] INFO Engine - the machine info =>

【部分运行结果略】

2022-11-30 16:13:19.600 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"value":"DataX"
},
{
"type":"long",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
},
{
"type":"bytes",
"value":"test"
}
],
"sliceRecordCount":100000
}
},
"writer":{
"name":"streamwriter",
"parameter":{
"encoding":"UTF-8",
"print":false
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"byte":10485760
}
}
}

【部分运行结果略】

2022-11-30 16:13:29.726 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2022-11-30 16:13:29.726 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2022-11-30 16:13:29.727 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/module/datax/hook
2022-11-30 16:13:29.729 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
3.40% | 3.40% | 3.40%


[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s

2022-11-30 16:13:29.730 [job-0] INFO JobContainer - PerfTrace not enable!
2022-11-30 16:13:29.730 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.050s | All Task WaitReaderTime 0.093s | Percentage 100.00%
2022-11-30 16:13:29.731 [job-0] INFO JobContainer -
任务启动时刻 : 2022-11-30 16:13:19
任务结束时刻 : 2022-11-30 16:13:29
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0

问题修复

如果在自检过程中,出现如下的报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
root@ysi-edh-app04:/opt/module/datax/bin #python datax.py /opt/module/datax/job/job.json 

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2022-11-30 16:03:40.913 [main] WARN ConfigParser - 插件[streamreader,streamwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/module/datax/plugin/reader/._txtfilereader/plugin.json]不存在. 请检查您的配置文件.
2022-11-30 16:03:41.921 [main] ERROR Engine -

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/opt/module/datax/plugin/reader/._txtfilereader/plugin.json]不存在. 请检查您的配置文件.
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)

root@ysi-edh-app04:/opt/module/datax/bin #timed out waiting for input: auto-logout

可以通过这种方法修复:

1
2
3
4
5
cd /plugin/reader
rm -rf ./._*

cd /plugin/writer
rm -rf ./._*

但,如果我们没有在官网(GitHub项目地址)下载DataX,即我们下载了旧版本的DataX,也可能出现这种错误。此时建议,重新安装最新版。

简单案例

我们从一个最简单的案例开始,直接从Stream流读取数据并打印到控制台。

查看配置模板

我们可以通过如下的命令查看配置模板。

1
python datax.py -r 【READER】 -w 【WRITER】

示例代码:

1
python datax.py -r streamreader -w streamwriter

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md

Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md

Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}

根据模板编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"job": {
"content": [{
"reader": {
"name": "streamreader",
"parameter": {
"column": [{
"type": "int",
"value": 10
},
{
"type": "string",
"value": "KakaWanYifan"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}],
"setting": {
"speed": {
"channel": 2
}
}
}
}

运行

示例代码:

1
python bin/datax.py /opt/module/datax/job/stream2stream.json

运行结果:

1
2
3
4
5
6
7
8
9
10
11
经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[StreamReader-02], Description:[不支持的column类型]. - 不支持类型[int]
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.plugin.reader.streamreader.StreamReader$Job.dealColumn(StreamReader.java:85)
at com.alibaba.datax.plugin.reader.streamreader.StreamReader$Job.init(StreamReader.java:38)
at com.alibaba.datax.core.job.JobContainer.initJobReader(JobContainer.java:673)
at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:303)
at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113)
at com.alibaba.datax.core.Engine.start(Engine.java:92)
at com.alibaba.datax.core.Engine.entry(Engine.java:171)
at com.alibaba.datax.core.Engine.main(Engine.java:204)

原因是DataX不支持int类型,我们把"type": "int"改成"type": "long",即可执行成功,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

【部分运行结果略】

10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan
10 KakaWanYifan

【部分运行结果略】

任务启动时刻 : 2022-11-30 17:00:57
任务结束时刻 : 2022-11-30 17:01:07
任务总计耗时 : 10s
任务平均流量 : 28B/s
记录写入速度 : 2rec/s
读出记录总数 : 20
读写失败总数 : 0

有没有问题?
为什么打印了20条?
因为"sliceRecordCount": 10,即产生10条数据,又因为"channel": 2,两个并发。
所以,合计产生了20条。

需要注意的是,仅针对Stream是这样的,但是针对数据库,不会因为channel的数字,导致重复,而会因为channel的数字,对任务进行切分。

mysql2mysql

应用场景

从MySQL导入到MySQL,这个的应用场景是在线库的表和历史库的表的同步。

假设,在线库中有一张表如下:

1
2
3
4
5
CREATE TABLE `online`(
`id` int(4) not null auto_increment,
`content` varchar(32) not null,
PRIMARY KEY(id)
)

历史库中的表如下:

1
2
3
4
5
CREATE TABLE `history`(
`id` int(4) not null auto_increment,
`content` varchar(32) not null,
PRIMARY KEY(id)
)

现在,我们要对两张库的表进行同步。

查看模板

同样,我们先查看官方模板。

1
python bin/datax.py -r mysqlreader -w mysqlwriter > mysql2mysql.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the mysqlwriter document:
https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"password": "",
"preSql": [],
"session": [],
"username": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}

解释说明:

  • name:名称
  • column:需要同步的列名集合,使用JSON数组描述自带信息。*代表所有列。
  • jdbcUrl:数据库的JDBC连接信息。
  • table:需要同步的表,支持多个。
  • password:数据库用户名对应的密码。
  • username:数据库用户名。

关于更多参数的含义,可以参考官方给出的地址。

实现脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","content"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://【IP】:【端口】/【库】"],
"table": ["online"]
}
],
"password": "【密码】",
"username": "【用户名】",
"where": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["id","content"],
"connection": [
{
"jdbcUrl": "jdbc:mysql://【IP】:【端口】/【库】",
"table": ["history"]
}
],
"password": "【密码】",
"preSql": [],
"session": [],
"username": "【用户名】",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}

pg2es

应用场景

假设在postgresql,存在一张表如下:

1
2
3
4
5
6
7
create table from_postgresql
(
id varchar(64),
office_park varchar(200),
area numeric(10,2),
location point
)

在ElasticSearch中,存在一个index如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
PUT /to_elasticsearch
{
"mappings": {
"_doc": {
"properties": {
"id": {
"type": "keyword"
},
"office_park": {
"type": "keyword"
},
"area": {
"type": "double"
},
"location": {
"type": "geo_point"
}
}
}
}
}

现在,我们要从postgresql中,导入到elasticsearch。

查看模板

我们通过如下的命令,查看模板。
示例代码:

1
python datax.py -r postgresqlreader -w elasticsearchwriter

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the postgresqlreader document:
https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md

Please refer to the postgresqlreader document:
https://github.com/alibaba/DataX/blob/master/elasticsearchwriter/doc/elasticsearchwriter.md

Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

Read writer[elasticsearchwriter] template error: : can't find file /opt/module/datax/plugin/writer/elasticsearchwriter/plugin_job_template.json
Traceback (most recent call last):
File "datax.py", line 228, in <module>
generateJobConfigTemplate(options.reader, options.writer)
File "datax.py", line 156, in generateJobConfigTemplate
jobTemplate['job']['content'][0]['writer'] = writerPar
UnboundLocalError: local variable 'writerPar' referenced before assignment

为什么报错,因为确实缺少相关模板文件。

但这并不影响我们通过查阅官方文档。

实现脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"table": [
"from_postgresql"
],
"jdbcUrl": [
"jdbc:postgresql://【IP】:【端口】/【库】"
]
}
],
"username": "【用户名】",
"password": "【密码】",
"column": [
"id",
"office_park",
"area",
"concat(location[1],',',location[0])"
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://【IP】:9200",
"index": "to_elasticsearch",
"type":"_doc",
"column": [
{ "name": "id", "type": "keyword"},
{ "name": "office_park","type": "keyword" },
{ "name": "area","type": "double" },
{ "name": "location","type": "geo_point" }
]
}
}
}
]
}
}

hdfs2mysql

应用场景

假设现在存在一张表在Hive中,且该表每天更新,以textfile类型存储,以\u0001进行分隔。我们需要把该表中每天更新的数据,同步到MySQL中,MySQL的表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `manager` (
`id` VARCHAR(64) NULL DEFAULT NULL,
`ent_id` VARCHAR(64) NULL DEFAULT NULL,
`ent_name` VARCHAR(64) NULL DEFAULT NULL,
`per_id` VARCHAR(64) NULL DEFAULT NULL,
`per_name` VARCHAR(200) NULL DEFAULT NULL,
`per_position` VARCHAR(50) NULL DEFAULT NULL,
`is_corporation` VARCHAR(10) NULL DEFAULT NULL,
`delete_flg` CHAR(1) NULL DEFAULT NULL,
INDEX `ent_id` (`ent_id`),
INDEX `per_id` (`per_id`),
PRIMARY KEY (`id`)
);

实现脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
{
"job": {
"content": [{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 8,
"type": "string"
}
],
"defaultFS": "hdfs://【HIVE地址】:8020",
"encoding": "UTF-8",
"fieldDelimiter": "\u0001",
"fileType": "text",
"path": "/user/hive/warehouse/【hive层】.db/【表名】/dt=${dt}"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["id", "ent_id", "ent_name", "per_id", "per_name", "per_position", "is_corporation", "delete_flg"],
"connection": [{
"jdbcUrl": "jdbc:mysql://【地址】:【端口】/【库名】",
"table": ["【表名】"]
}],
"password": "【用户名】",
"username": "【密码】",
"writeMode": "replace",
"postSql": [
"delete from 【表名】 where delete_flg = '1'"
]
}
}
}],
"setting": {
"speed": {
"channel": 10
}
}
}
}

关于配置的含义及其作用,可以参考:

因为每天一个文件,所以在hdfsreaderpath,配置如下:/user/hive/warehouse/【hive层】.db/【表名】/dt=${dt}
需要以如下的方式传入参数:

1
python /opt/datax/bin/datax.py -p "-Ddt=$(date '+%Y%m%d')" /opt/datax/job/hdfs2mysql-manager.json

(关于$(date '+%Y%m%d'),可以参考我们在《Linux操作系统使用入门:2.命令》,关于时间日期的讨论。)

优化

关键参数

  • job.setting.speed.channelchannel并发数
  • job.setting.speed.record:所有的channelrecord限速
  • job.setting.speed.byte:所有的channelbyte限速
  • core.transport.channel.speed.record:单个channelrecord限速
  • core.transport.channel.speed.byte:单个channelbyte限速

注意:

  • 如果recordbyte都设置了,以最小的为准。
    只有recordbyte都没设置,channel才可能有效,即优先级最低。
  • 单个channel的配置在core.json

提升每个channel的速度

我们可以提升每个channel的速度,在DataX内部对每个Channel会有严格的速度控制,分两种:

  • 控制每秒同步的记录数
  • 控制每秒同步的字节数,默认的速度限制是1MB/S。

我们可以根据具体硬件情况进行修改。

提升Channel并发数

我们还可以提高Channel的并发数,有三种配置方式:

  1. 配置"全局Byte的限速"以及"单Channel的Byte的限速"
  2. 配置"全局Record的限速"以及"单Channel的Record的限速"
  3. 直接配置Channel个数

Channel个数=全局Byte限速单Channel的Byte限速\text{Channel个数} = \frac{\text{全局Byte限速}}{\text{单Channel的Byte限速}}

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576
}
}
}
},
"job": {
"setting": {
"speed": {
"byte": 5242880
}
},
}
}

52428801048576=5\frac{5242880}{1048576} = 5

Channel个数=全局Record限速单个Channel的Record限速\text{Channel个数} = \frac{\text{全局Record限速}}{\text{单个Channel的Record限速}}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"core": {
"transport": {
"channel": {
"speed": {
"record": 100
}
}
}
},
"job": {
"setting": {
"speed": {
"record": 500
}
},
}
}

500100=5\frac{500}{100} = 5

直接配置Channel个数

只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的channel数。

例如:

1
2
3
4
5
6
7
8
9
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
}
}

提高JVM堆内存

当提升Channel并发数时,内存的占用会显著增加。
为了防止OOM等错误,建议调大JVM的堆内存,可以调整为4G或者8G(具体根据实际情况来定)。

调整JVM的xmsxmx参数的方式,为在在启动的时候,加上对应的参数。
如下:

1
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11401
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区