avatar


2.源码概览

整体流程

整体流程

程序入口

我们从datax.py开始。
脚本的主入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if __name__ == "__main__":
printCopyright()
parser = getOptionParser()
options, args = parser.parse_args(sys.argv[1:])
if options.reader is not None and options.writer is not None:
generateJobConfigTemplate(options.reader, options.writer)
sys.exit(RET_STATE['OK'])
if len(args) != 1:
parser.print_help()
sys.exit(RET_STATE['FAIL'])

startCommand = buildStartCommand(options, args)
# print startCommand

child_process = subprocess.Popen(startCommand, shell=True)
register_signal()
(stdout, stderr) = child_process.communicate()

sys.exit(child_process.returncode)

其中getOptionParser(),作用是解析参数,上一章我们传入的-r-w以及--jvm,都是通过这里进行解析。该方法具体我们不讨论,重点关注buildStartCommand方法。

1
2
3
4
5
def buildStartCommand(options, args):

【解析参数,代码略】

return Template(ENGINE_COMMAND).substitute(**commandMap)

在该方法的最后有这么一段return Template(ENGINE_COMMAND).substitute(**commandMap)

ENGINE_COMMAND的定义如下:

1
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (DEFAULT_PROPERTY_CONF, CLASS_PATH)

如此,找到了程序的主入口,在com.alibaba.datax.core.Engine这个类中。
找到该类的的main方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws Exception {
int exitCode = 0;
try {
Engine.entry(args);
} catch (Throwable e) {
exitCode = 1;
LOG.error("\n\n经DataX智能分析,该任务最可能的错误原因是:\n" + ExceptionTracker.trace(e));
if (e instanceof DataXException) {
DataXException tempException = (DataXException) e;
ErrorCode errorCode = tempException.getErrorCode();
if (errorCode instanceof FrameworkErrorCode) {
FrameworkErrorCode tempErrorCode = (FrameworkErrorCode) errorCode;
exitCode = tempErrorCode.toExitValue();
}
}
System.exit(exitCode);
}
System.exit(exitCode);
}

点进Engine.entry(args)

1
2
3
4
5
6
public static void entry(final String[] args) throws Throwable {

【生成配置,代码略】

engine.start(configuration);
}

我们点进engine.start(configuration);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* check job model (job/task) first */
public void start(Configuration allConf) {

【部分代码略】

if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
} else {
container = new TaskGroupContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
taskGroupId = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
channelNumber = allConf.getInt(
CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
}

【部分代码略】

container.start();
}

注意container.start();container可能在两个地方进行定义:

  • container = new JobContainer(allConf);
  • container = new TaskGroupContainer(allConf);

先来看JobContainer,我们关注其start方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* jobContainer主要负责的工作全部在start()里面,包括init、prepare、split、scheduler、
* post以及destroy和statistics
*/
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if(isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {

【部分代码略】

} finally {

【部分代码略】

}
}

注意this.init();this.prepare();this.totalStage = this.split();this.schedule();this.post();,和本章开篇的图对上了。

Task切分逻辑

我们点进this.totalStage = this.split();,查看Task切分逻辑。

切分方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,
* 达到切分后数目相等,才能满足1:1的通道模型,所以这里可以将reader和writer的配置整合到一起,
* 然后,为避免顺序给读写端带来长尾影响,将整合的结果shuffler掉
*/
private int split() {
this.adjustChannelNumber();
if (this.needChannelNumber <= 0) {
this.needChannelNumber = 1;
}
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);
List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);
LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
/**
* 输入是reader和writer的parameter list,输出是content下面元素的list
*/
List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
readerTaskConfigs, writerTaskConfigs, transformerList);
LOG.debug("contentConfig configuration: "+ JSON.toJSONString(contentConfig));
this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);
return contentConfig.size();
}

先来看this.adjustChannelNumber();,点进去。

并发数的确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private void adjustChannelNumber() {
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;
boolean isByteLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 0) > 0);
if (isByteLimit) {
long globalLimitedByteSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_BYTE, 10 * 1024 * 1024);
// 在byte流控情况下,单个Channel流量最大值必须设置,否则报错!
Long channelLimitedByteSpeed = this.configuration
.getLong(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE);
if (channelLimitedByteSpeed == null || channelLimitedByteSpeed <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"在有总bps限速条件下,单个channel的bps值不能为空,也不能为非正数");
}
needChannelNumberByByte =
(int) (globalLimitedByteSpeed / channelLimitedByteSpeed);
needChannelNumberByByte =
needChannelNumberByByte > 0 ? needChannelNumberByByte : 1;
LOG.info("Job set Max-Byte-Speed to " + globalLimitedByteSpeed + " bytes.");
}
boolean isRecordLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 0)) > 0;
if (isRecordLimit) {
long globalLimitedRecordSpeed = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_RECORD, 100000);
Long channelLimitedRecordSpeed = this.configuration.getLong(
CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD);
if (channelLimitedRecordSpeed == null || channelLimitedRecordSpeed <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR,
"在有总tps限速条件下,单个channel的tps值不能为空,也不能为非正数");
}
needChannelNumberByRecord =
(int) (globalLimitedRecordSpeed / channelLimitedRecordSpeed);
needChannelNumberByRecord =
needChannelNumberByRecord > 0 ? needChannelNumberByRecord : 1;
LOG.info("Job set Max-Record-Speed to " + globalLimitedRecordSpeed + " records.");
}
// 取较小值
this.needChannelNumber = needChannelNumberByByte < needChannelNumberByRecord ?
needChannelNumberByByte : needChannelNumberByRecord;
// 如果从byte或record上设置了needChannelNumber则退出
if (this.needChannelNumber < Integer.MAX_VALUE) {
return;
}
boolean isChannelLimit = (this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL, 0) > 0);
if (isChannelLimit) {
this.needChannelNumber = this.configuration.getInt(
CoreConstant.DATAX_JOB_SETTING_SPEED_CHANNEL);
LOG.info("Job set Channel-Number to " + this.needChannelNumber
+ " channels.");
return;
}
throw DataXException.asDataXException(
FrameworkErrorCode.CONFIG_ERROR,
"Job运行速度必须设置");
}

其中:

1
public static final String DATAX_JOB_SETTING_SPEED_BYTE = "job.setting.speed.byte";
  • 对应我们JSON的setting.speed.byte
1
public static final String DATAX_JOB_SETTING_SPEED_RECORD = "job.setting.speed.record";
  • 对应我们JSON的setting.speed.record

然后,我们看到,在确定needChannelNumberByByteneedChannelNumberByRecord后,会取最小值。
如果this.needChannelNumber < Integer.MAX_VALUE,就return;而needChannelNumber的初始值为:

1
2
int needChannelNumberByByte = Integer.MAX_VALUE;
int needChannelNumberByRecord = Integer.MAX_VALUE;

也就是说,只有recordbyte都没设置,channel才可能有效,即优先级最低。
这样印证了我们在上一章《1.操作方法》,调优部分的讨论。

reader和writer任务数对等

this.adjustChannelNumber();后,还有两行

1
2
3
4
5
List<Configuration> readerTaskConfigs = this
.doReaderSplit(this.needChannelNumber);
int taskNumber = readerTaskConfigs.size();
List<Configuration> writerTaskConfigs = this
.doWriterSplit(taskNumber);

该部分是reader的切分和writer的切分,注意int taskNumber = readerTaskConfigs.size();,也就是说writer的切分参数来自reader。
我们还可以点进List<Configuration> writerTaskConfigs = this.doWriterSplit(taskNumber);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private List<Configuration> doWriterSplit(int readerTaskNumber) {
classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
PluginType.WRITER, this.writerPluginName));
List<Configuration> writerSlicesConfigs = this.jobWriter
.split(readerTaskNumber);
if (writerSlicesConfigs == null || writerSlicesConfigs.size() <= 0) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
"writer切分的task不能小于等于0");
}
LOG.info("DataX Writer.Job [{}] splits to [{}] tasks.",
this.writerPluginName, writerSlicesConfigs.size());
classLoaderSwapper.restoreCurrentThreadClassLoader();
return writerSlicesConfigs;
}

再点进List<Configuration> writerSlicesConfigs = this.jobWriter.split(readerTaskNumber);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.alibaba.datax.common.spi;

【部分代码略】

/**
* 每个Writer插件需要实现Writer类,并在其内部实现Job、Task两个内部类。
*
*
* */
public abstract class Writer extends BaseObject {
/**
* 每个Writer插件必须实现Job内部类
*/
public abstract static class Job extends AbstractJobPlugin {
/**
* 切分任务。<br>
*
* @param mandatoryNumber
* 为了做到Reader、Writer任务数对等,这里要求Writer插件必须按照源端的切分数进行切分。否则框架报错!
*
* */
public abstract List<Configuration> split(int mandatoryNumber);
}

【部分代码略】

}

注意split(int mandatoryNumber);方法的注释。即readwriter的任务数对等,一比一。

调度

看完了this.totalStage = this.split();,我们再点进this.schedule();

调度方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中,
* 同时不同的执行模式调用不同的调度策略,将所有任务调度起来
*/
private void schedule() {

【部分代码略】

List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());
ExecuteMode executeMode = null;
AbstractScheduler scheduler;
try {
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
//设置 executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}
if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
if (this.jobId <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}
}
LOG.info("Running by {} Mode.", executeMode);
this.startTransferTimeStamp = System.currentTimeMillis();
scheduler.schedule(taskGroupConfigs);
this.endTransferTimeStamp = System.currentTimeMillis();
} catch (Exception e) {
LOG.error("运行scheduler 模式[{}]出错.", executeMode);
this.endTransferTimeStamp = System.currentTimeMillis();
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
/**
* 检查任务执行情况
*/
this.checkLimit();
}

点进

1
List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,this.needChannelNumber, channelsPerTaskGroup);

确定组数和分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 公平的分配 task 到对应的 taskGroup 中。
* 公平体现在:会考虑 task 中对资源负载作的 load 标识进行更均衡的作业分配操作。
* TODO 具体文档举例说明
*/
public static List<Configuration> assignFairly(Configuration configuration, int channelNumber, int channelsPerTaskGroup) {
Validate.isTrue(configuration != null, "框架获得的 Job 不能为 null.");
List<Configuration> contentConfig = configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
Validate.isTrue(contentConfig.size() > 0, "框架获得的切分后的 Job 无内容.");
Validate.isTrue(channelNumber > 0 && channelsPerTaskGroup > 0,
"每个channel的平均task数[averTaskPerChannel],channel数目[channelNumber],每个taskGroup的平均channel数[channelsPerTaskGroup]都应该为正数");
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);
Configuration aTaskConfig = contentConfig.get(0);
String readerResourceMark = aTaskConfig.getString(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
String writerResourceMark = aTaskConfig.getString(CoreConstant.JOB_WRITER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK);
boolean hasLoadBalanceResourceMark = StringUtils.isNotBlank(readerResourceMark) ||
StringUtils.isNotBlank(writerResourceMark);
if (!hasLoadBalanceResourceMark) {
// fake 一个固定的 key 作为资源标识(在 reader 或者 writer 上均可,此处选择在 reader 上进行 fake)
for (Configuration conf : contentConfig) {
conf.set(CoreConstant.JOB_READER_PARAMETER + "." +
CommonConstant.LOAD_BALANCE_RESOURCE_MARK, "aFakeResourceMarkForLoadBalance");
}
// 是为了避免某些插件没有设置 资源标识 而进行了一次随机打乱操作
Collections.shuffle(contentConfig, new Random(System.currentTimeMillis()));
}
LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap = parseAndGetResourceMarkAndTaskIdMap(contentConfig);
List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);
// 调整 每个 taskGroup 对应的 Channel 个数(属于优化范畴)
adjustChannelNumPerTaskGroup(taskGroupConfig, channelNumber);
return taskGroupConfig;
}

注意这段代码:

1
int taskGroupNumber = (int) Math.ceil(1.0 * channelNumber / channelsPerTaskGroup);

这就印证了我们在上一章《1.操作方法》说的,100个Taskchannel数是20,问需要多少个taskGroupNumberchannelsPerTaskGroup默认是5.
taskGroupNumber是4。

点进List<Configuration> taskGroupConfig = doAssign(resourceMarkAndTaskIdMap, configuration, taskGroupNumber);

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* /**
* 需要实现的效果通过例子来说是:
* <pre>
* a 库上有表:0, 1, 2
* a 库上有表:3, 4
* c 库上有表:5, 6, 7
*
* 如果有 4个 taskGroup
* 则 assign 后的结果为:
* taskGroup-0: 0, 4,
* taskGroup-1: 3, 6,
* taskGroup-2: 5, 2,
* taskGroup-3: 1, 7
*
* </pre>
*/
private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {

【部分代码略】

}

该方法的代码略,我们主要看注释。
现在有abc,三个库

  • a 库上有表:0, 1, 2
  • b 库上有表:3, 4
  • c 库上有表:5, 6, 7

然后我们有4个taskGrouptaskGroup-0处理a库的0taskGroup-1处理b库的3taskGroup-2处理c库的5taskGroup-3处理a库的1,以此类推。

这也是,我在上一章《1.操作方法》说的,100个Task,4个TaskGroup公平分配,不一定第一个TaskGroup就负责25个Task

此处,源代码存在一个注释错误。
应该是b 库上有表:3, 4,而不是a 库上有表:3, 4
关于该错误,我提交了一个pull request,于2022-12-30通过了。

pull request

特别的,我们可以再回到调度方法,关注一下这段代码

1
2
3
4
5
6
7
8
9
10
11
12
executeMode = ExecuteMode.STANDALONE;
scheduler = initStandaloneScheduler(this.configuration);
//设置 executeMode
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}
if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
if (this.jobId <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}
}

executeMode = ExecuteMode.STANDALONE;,已经写死了。
可以是在if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {,又做判断?
这一部分的判断是没有意义的。
为什么呢?
因为开源的DataX是"阉割版",所以在这里直接写死了。

数据传输

调用逻辑

我们可以继续点进JobContainerschedule()方法的scheduler.schedule(taskGroupConfigs);

1
2
3
4
5
6
7
8
9
10
public void schedule(List<Configuration> configurations) {

【部分代码略】

int totalTasks = calculateTaskCount(configurations);
startAllTaskGroup(configurations);

【部分代码略】

}

我们点进startAllTaskGroup(configurations);,是一个抽象方法。找到实现类的方法,是这个

1
2
3
4
5
6
7
8
9
10
@Override
public void startAllTaskGroup(List<Configuration> configurations) {
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());
for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
this.taskGroupContainerExecutorService.shutdown();
}

this.taskGroupContainerExecutorService = Executors.newFixedThreadPool(configurations.size());是在申请线程池,那么执行的任务是什么呢?

点进TaskGroupContainerRunner,找到run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.alibaba.datax.core.taskgroup.runner;

【部分代码略】

public class TaskGroupContainerRunner implements Runnable {

【部分代码略】

@Override
public void run() {
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

【部分代码略】

}

点进this.taskGroupContainer.start();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void start() {
try {

【部分代码略】

Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
taskStartTimeMap.put(taskId, System.currentTimeMillis());
taskExecutor.doStart();

【部分代码略】

} catch (Throwable e) {

【部分代码略】

}finally {

【部分代码略】

}
}

点进taskExecutor.doStart()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void doStart() {
this.writerThread.start();
// reader没有起来,writer不可能结束
if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
this.readerThread.start();
// 这里reader可能很快结束
if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
// 这里有可能出现Reader线上启动即挂情况 对于这类情况 需要立刻抛出异常
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR,
this.taskCommunication.getThrowable());
}
}

readerThread为例,点击this.readerThread.start();readerThread,发现其是一个成员变量,是由构造方法进行赋值的。
找到构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public TaskExecutor(Configuration taskConf, int attemptCount) {

【部分代码略】

/**
* 生成writerThread
*/
writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
this.writerThread = new Thread(writerRunner,
String.format("%d-%d-%d-writer",
jobId, taskGroupId, this.taskId));

【部分运行结果略】

/**
* 生成readerThread
*/
readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
this.readerThread = new Thread(readerRunner,
String.format("%d-%d-%d-reader",
jobId, taskGroupId, this.taskId));

}

发现writerRunnerreaderRunner均来自generateRunner,并进行强制转换。点进ReaderRunner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.alibaba.datax.core.taskgroup.runner;

【部分代码略】

/**
* Created by jingxing on 14-9-1.
* <p/>
* 单个slice的reader执行调用
*/
public class ReaderRunner extends AbstractRunner implements Runnable {

【部分代码略】

@Override
public void run() {
assert null != this.recordSender;

Reader.Task taskReader = (Reader.Task) this.getPlugin();

//统计waitWriterTime,并且在finally才end。
PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
try {
channelWaitWrite.start();

LOG.debug("task reader starts to do init ...");
PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
initPerfRecord.start();
taskReader.init();
initPerfRecord.end();

LOG.debug("task reader starts to do prepare ...");
PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
preparePerfRecord.start();
taskReader.init();
preparePerfRecord.end();

LOG.debug("task reader starts to read ...");
PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
dataPerfRecord.start();
taskReader.startRead(recordSender);
recordSender.terminate();

dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
dataPerfRecord.end();

LOG.debug("task reader starts to do post ...");
PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
postPerfRecord.start();
taskReader.post();
postPerfRecord.end();
// automatic flush
// super.markSuccess(); 这里不能标记为成功,成功的标志由 writerRunner 来标志(否则可能导致 reader 先结束,而 writer 还没有结束的严重 bug)
} catch (Throwable e) {
LOG.error("Reader runner Received Exceptions:", e);
super.markFail(e);
} finally {
LOG.debug("task reader starts to do destroy ...");
PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
desPerfRecord.start();
super.destroy();
desPerfRecord.end();

【部分代码略】

}
}

public void shutdown(){
recordSender.shutdown();
}
}

注意看其中的run方法,有子方法taskReader.init();taskReader.init();taskReader.startRead(recordSender);taskReader.post();super.destroy();。和本章开篇的图"Schedule"部分对上了。

我们重点关注startRead,这是一个抽象方法,实现类有很多。我们以MySQL的为例。

1
2
3
4
5
6
@Override
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(this.readerSliceConfig, recordSender,
super.getTaskPluginCollector(), fetchSize);
}

点进commonRdbmsReaderTask.startRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void startRead(Configuration readerSliceConfig,
RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize) {
ResultSet rs = null;
try {
while (rs.next()) {
rsNextUsedTime += (System.nanoTime() - lastTime);
this.transportOneRecord(recordSender, rs,
metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
lastTime = System.nanoTime();
}
}catch (Exception e) {
throw RdbmsException.asQueryException(this.dataBaseType, e, querySql, table, username);
} finally {
DBUtil.closeDBResources(null, conn);
}
}

点进transportOneRecord

示例代码:

1
2
3
4
5
6
7
protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding,
TaskPluginCollector taskPluginCollector) {
Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector);
recordSender.sendToWriter(record);
return record;
}

限速方法

点进sendToWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void sendToWriter(Record record) {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
Validate.notNull(record, "record不能为空.");
if (record.getMemorySize() > this.byteCapacity) {
this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("单条记录超过大小限制,当前限制为:%s", this.byteCapacity)));
return;
}
boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
if (isFull) {
flush();
}
this.buffer.add(record);
this.bufferIndex++;
memoryBytes.addAndGet(record.getMemorySize());
}

点进flush()方法:

1
2
3
4
5
6
7
8
9
10
@Override
public void flush() {
if(shutdown){
throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
}
this.channel.pushAll(this.buffer);
this.buffer.clear();
this.bufferIndex = 0;
this.memoryBytes.set(0);
}

点进this.channel.pushAll(this.buffer);

1
2
3
4
5
6
public void pushAll(final Collection<Record> rs) {
Validate.notNull(rs);
Validate.noNullElements(rs);
this.doPushAll(rs);
this.statPush(rs.size(), this.getByteSize(rs));
}

点进this.statPush(rs.size(), this.getByteSize(rs));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void statPush(long recordSize, long byteSize) {
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_RECORDS,
recordSize);
currentCommunication.increaseCounter(CommunicationTool.READ_SUCCEED_BYTES,
byteSize);
//在读的时候进行统计waitCounter即可,因为写(pull)的时候可能正在阻塞,但读的时候已经能读到这个阻塞的counter数
currentCommunication.setLongCounter(CommunicationTool.WAIT_READER_TIME, waitReaderTime);
currentCommunication.setLongCounter(CommunicationTool.WAIT_WRITER_TIME, waitWriterTime);
boolean isChannelByteSpeedLimit = (this.byteSpeed > 0);
boolean isChannelRecordSpeedLimit = (this.recordSpeed > 0);
if (!isChannelByteSpeedLimit && !isChannelRecordSpeedLimit) {
return;
}
long lastTimestamp = lastCommunication.getTimestamp();
long nowTimestamp = System.currentTimeMillis();
long interval = nowTimestamp - lastTimestamp;
if (interval - this.flowControlInterval >= 0) {
long byteLimitSleepTime = 0;
long recordLimitSleepTime = 0;
if (isChannelByteSpeedLimit) {
long currentByteSpeed = (CommunicationTool.getTotalReadBytes(currentCommunication) -
CommunicationTool.getTotalReadBytes(lastCommunication)) * 1000 / interval;
if (currentByteSpeed > this.byteSpeed) {
// 计算根据byteLimit得到的休眠时间
byteLimitSleepTime = currentByteSpeed * interval / this.byteSpeed
- interval;
}
}
if (isChannelRecordSpeedLimit) {
long currentRecordSpeed = (CommunicationTool.getTotalReadRecords(currentCommunication) -
CommunicationTool.getTotalReadRecords(lastCommunication)) * 1000 / interval;
if (currentRecordSpeed > this.recordSpeed) {
// 计算根据recordLimit得到的休眠时间
recordLimitSleepTime = currentRecordSpeed * interval / this.recordSpeed
- interval;
}
}
// 休眠时间取较大值
long sleepTime = byteLimitSleepTime < recordLimitSleepTime ?
recordLimitSleepTime : byteLimitSleepTime;
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_BYTES,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_BYTES));
lastCommunication.setLongCounter(CommunicationTool.READ_SUCCEED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS));
lastCommunication.setLongCounter(CommunicationTool.READ_FAILED_RECORDS,
currentCommunication.getLongCounter(CommunicationTool.READ_FAILED_RECORDS));
lastCommunication.setTimestamp(nowTimestamp);
}
}

找到了!
会根据byteLimit计算得到的休眠时间,根据recordLimit计算得到的休眠时间。然后取最大的,再进行Sleep,以此限速。

这种通过"Sleep"进行速度控制的方法,还在一个地方有,Kakfa。
具体可以参考《分布式事件流平台Kafka:5.监控和调优》的"Broker调优"的"动态配置"。

文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/11402
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

评论区