简介
DolphinScheduler,数据调度工具。
官网:https://dolphinscheduler.apache.org
官方文档:https://dolphinscheduler.apache.org/zh-cn/docs
Github地址:https://github.com/apache/dolphinscheduler
部署
四种安装部署方式
按照DolphinScheduler官方的说法,有4种部署方式:
- 单机部署(Standalone)
- 伪集群部署(Pseudo-Cluster)
伪集群部署目的是在单台机器上部署DolphinScheduler服务,该模式下master、worker、api server 都在同一台机器上。 - 集群部署(Cluster)
在多台机器上部署DolphinScheduler服务,用于运行大量任务情况。 - Kubernetes部署
在Kubernetes集群中部署DolphinScheduler服务。
准备环境
需要准备:
- JDK8及以上的JAVA环境
可以参考《ElasticSearch实战入门(6.X):1.工具、概念、集群和倒排索引》中关于JDK的讨论。 - MySQL或PostgreSQL
可以参考《MySQL从入门到实践:1.概述和工具准备》。 - Zookeeper
可以参考《基于Java的后端开发入门:20.Dubbo和Zookeeper》。
单机部署
下载并解压
通过DolphinScheduler官网下载DolphinScheduler的二进制包。
截至2024年12月,最新版本是3.2.2,但是3.2.X版本的DolphinScheduler有很多BUG,官方也推荐使用3.1.9。
下载完成后,解压:
1 | tar -zxvf apache-dolphinscheduler-3.1.9-bin.tar.gz |
本文解压至/opt/dolphinscheduler/
。
修改配置文件
dolphinscheduler_env.sh
修改/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/bin/env/dolphinscheduler_env.sh
。
通常需要修改如下部分:
JAVA_HOME
:1
2JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/usr/java/jdk1.8.0-x64}- 数据库连接相关:注意,对于8版本的MySQL,连接URL需要有
1
2
3
4
5
6Database related configuration, set database type, username and password
export DATABASE=${DATABASE:-mysql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true"
export SPRING_DATASOURCE_USERNAME="root"
export SPRING_DATASOURCE_PASSWORD="【密码】"allowPublicKeyRetrieval=true
。 - Zookeeper地址:
1
2
3Registry center configuration, determines the type and link of the registry center
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING=${REGISTRY_ZOOKEEPER_CONNECT_STRING:-localhost:2181}
其他内容按需修改。
application.yaml
修改/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/standalone-server/conf/application.yaml
的数据库连接信息,修改为MySQL的。
需要修改两处:
- 配置文件前部的
1
2
3
4
5
6
7
8sql:
init:
schema-locations: classpath:sql/dolphinscheduler_h2.sql
datasource:
driver-class-name: org.h2.Driver
url: jdbc:h2:mem:dolphinscheduler;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true
username: sa
password: "" - 配置文件结尾的
1
2
3
4
5
6
7
8sql:
init:
schema-locations: classpath:sql/dolphinscheduler_mysql.sql
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true
username: root
password: 【密码】
注意,对于8版本的MySQL,连接URL需要有allowPublicKeyRetrieval=true
。
配置MySQL
创建dolphinscheduler数据库
1 | CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; |
配置MySQL驱动
通过https://mvnrepository.com
下载MySQL的驱动,本文下载的是:
1 | <!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j --> |
具体下载方式,可以参考《基于Java的后端开发入门:10.JDBC》的"用法-快速入门-例子-准备工作-导入驱动包"部分的讨论。
下载完成后,将驱动包复制到如下的地址:
1 | cp mysql-connector-j-8.3.0.jar /opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/worker-server/libs/ |
初始化数据库
执行/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/tools/bin
目录的upgrade-schema.sh
。
启停命令
dolphinscheduler-daemon.sh
位于/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/bin/
目录下。
- 启动:
./dolphinscheduler-daemon.sh start standalone-server
- 停止:
./dolphinscheduler-daemon.sh stop standalone-server
登录地址为:http://【HOSTNAME】:12345/dolphinscheduler/ui/login
,用户名:admin
,默认密码:dolphinscheduler123
。
项目操作
创建项目
创建项目
点击顶部的项目管理
,再点击创建项目
。
创建完成后,可以在项目列表看到我们创建的项目。
- 右侧是
修改
和删除
。 - 左侧点击项目名称,可以查看项目的详情。
项目首页
- 任务状态统计:在指定时间范围内,统计任务实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数。
- 流程状态统计:在指定时间范围内,统计工作流实例中状态为提交成功、正在运行、准备暂停、暂停、准备停止、停止、失败、成功、需要容错、kill、等待线程的个数。
- 工作流定义统计:统计用户创建的工作流定义及管理员授予该用户的工作流定义。
工作流定义
创建租户(准备工作)
点击顶部的安全中心
,再依次点击租户管理
、创建租户
。
关于什么是租户,在本文的"安全中心-租户管理"处会有讨论。
创建工作流
点击顶部的项目管理
,再依次点击项目的名称
(在本文是demo)、工作流定义
、创建工作流
。
会看到一个图形化的,可以拖拽编辑的页面。
这就是工作流DAG编辑页面。
创建任务
在本文,我们拖拽一个SHELL
任务在右侧画板上。
然后,会弹出编辑框如下:
解释说明:
运行标志
正常
:正常执行。
禁止执行
:工作流不会执行该任务。任务优先级
当worker线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行。超时告警
勾选超时告警
后,会看到超时策略有:超时告警
和超时失败
,可以两个都勾选。资源
资源
,是"资源中心->文件管理"页面创建或上传的文件。
配置依赖关系
点击任务节点的右侧加号连接其他任务,配置依赖关系。
也可以在创建节点的时候,选择其前置任务。
保存工作流定义
点击画布右上角的保存
,弹出基本信息
弹框,输入工作流定义名称等信息,点击确定
按钮,工作流定义创建成功。
执行策略有四种:
- 并行:如果对于同一个工作流定义,同时有多个工作流实例,则并行执行工作流实例。
- 串行等待:如果对于同一个工作流定义,同时有多个工作流实例,则串行执行工作流实例。
- 串行抛弃:如果对于同一个工作流定义,同时有多个工作流实例,则抛弃后生成的工作流实例并杀掉正在跑的实例。
- 串行优先:如果对于同一个工作流定义,同时有多个工作流实例,则按照优先级串行执行工作流实例。
工作流操作
操作概述
点击顶部的项目管理
,再依次点击项目的名称
(在本文是demo)、工作流定义
,能看到我们创建的工作流。
在右侧有各种按钮,鼠标移过去可以看到按钮的含义。
- 编辑:只能编辑
下线
的工作流定义。工作流DAG编辑同创建工作流定义。 - 上线:工作流状态为
下线
时,可以上线工作流,只有上线
状态的工作流能运行,但上线
状态的工作流不能编辑。 - 下线:工作流状态为
上线
时,下线工作流,下线
状态的工作流可以编辑,但不能运行。 - 运行:只有
上线
状态的工作流能运行。 - 定时:只有
上线
状态的工作流能设置定时,创建定时后的状态为"下线",需在定时管理页面上线定时才生效。 - 定时管理:编辑定时、上线定时、下线定时、删除定时。
- 删除:删除工作流。
- 下载:下载工作流定义到本地。
- 树形图:以树形结构查看任务节点的类型及任务状态。
运行工作流
步骤
步骤如下:
- 点击顶部的
项目管理
,在依次点击项目的名称
(在本文是demo),再点击工作流定义
。 - 点击右侧的
上线
,我们上线一个工作流。 - 点击右侧的
运行
,即运行工作流额。
参数说明
- 失败策略
当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。
继续
表示:某一任务失败后,其他任务节点正常执行。
结束
表示:终止所有正在执行的任务,并终止整个流程。 - 通知策略
当流程结束,根据流程状态发送流程执行信息通知邮件,包含任何状态都不发,成功发,失败发,成功或失败都发。 - 流程优先级
当master线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。 - Worker分组
该流程只能在指定的worker机器组里执行。
默认是default
,可以在任一worker上执行。 - 告警组
需要通知的成员组。 - 启动参数
在启动新的流程实例时,设置或覆盖全局参数的值。 - 补数
指运行指定日期范围内的工作流定义,根据补数策略生成对应的工作流实例。
我们可以点击确定
,然后点击工作流实例
中看到历史运行;在任务实例
中会看到历史上的任务运行;在任务实例
的右侧,点击按钮,可以查看运行日志。
补数策略
配置
调度日期的范围是左关右关区间。
串行执行
指定时间范围内,从开始日期至结束日期依次执行补数,依次生成多条流程实例。
例如,我们选择2024-12-01 00:00:00
至2024-12-13 00:00:00
,点击确定
,再点击工作流实例
,会看到我们补数的历史运行,并且是逐个运行的。
并行执行
并行执行,即同时执行。
在并行执行中,有一个参数并行度,是指在并行补数的模式下,最多并行执行的实例数。
依赖模式
如果配置了依赖模式。当一个工作流(我们称之为工作流A)成功完成了一个实例的运行,并且这个实例的状态被标记为"已上线",那么所有直接依赖于工作流A的其他工作流(我们称为工作流B、C等)中,那些设置了依赖于工作流A的任务节点将会被触发执行。
例如:
我们有两个工作流:WorkFlow_A
和WorkFlow_B
。WorkFlow_B
中有一个任务Task_B1
配置为依赖于WorkFlow_A
的成功完成,WorkFlow_A
是每日运行的工作流,负责收集并处理前一天的数据。
某一天,因为数据延迟,WorkFlow_A
没有按计划运行,导致当天的数据没有被处理。
后来,运维人员手动触发了WorkFlow_A
来处理遗漏的数据,这次运行被称为"补数"。
当WorkFlow_A
的补数实例成功完成并且状态被设置为"已上线"后,根据上述依赖模式,WorkFlow_B
中的Task_B1
将会被触发,因为它直接依赖于WorkFlow_A
的完成。但是,WorkFlow_B
中其他不依赖于WorkFlow_A
的任务将不会受到影响。
补数与定时配置的关系
有两种情况:
未配置定时
或已配置定时并定时状态下线
根据所选的时间范围结合定时默认配置(每天0点)进行补数。已配置定时并定时状态上线
根据所选的时间范围结合定时配置进行补数。
例如配置了定时(每日18时开始运行),流程实例为:
关于如何配置定时,会在下文讨论。
空跑
任务直接成功,不执行。
空跑的应用场景有:
- 验证工作流逻辑
空跑可以用来验证工作流内部的任务依赖关系、顺序和条件是否按照预期配置,确保所有任务都能按正确的顺序启动。 - 培训和演示
对于新用户或团队成员来说,空跑提供了一种安全的方式去熟悉系统和特定工作流的行为,而无需担心会对真实环境造成影响。
单独运行任务
右键选中任务,点击"运行"按钮(只有已上线的任务才能点击运行)。
弹出启动参数设置弹框,绝大部分参数与上文的"运行工作流"一致。
需要说明的是,节点执行类型:
- 单独执行,Single Execution。
仅执行指定的单个任务节点,而不影响该工作流中的其他任务。 - 向前执行,Execute Forward。
从选定的任务节点开始,按照任务之间的依赖关系,依次执行所有前置任务直到当前任务为止。 - 向后执行,Execute Backward。
从选定的任务节点开始,按照任务之间的依赖关系,依次执行所有后续依赖任务,直至工作流结束或遇到不再符合条件的任务为止。
工作流定时
绝大部分参数与上文的"运行工作流"一致。
需要说明的是,起止时间:在起止时间范围内,定时运行工作流;不在起止时间范围内,不再产生定时工作流实例。
配置好"定时"后,此时定时状态为下线
,定时需上线
才生效。
点击定时管理
按钮,进入定时管理页面,点击上线
按钮,定时状态变为上线
。
工作流实例
查看
关于查看工作流实例
、查看任务日志
、查看任务历史记录
,上文都有展示,本文不赘述。
这里我们讨论"查看运行参数",点击顶部的项目管理
,再依次点击项目的名称
(在本文是demo)、工作流实例
、点击某一个具体的实例。
操作
在工作流实例右侧有一批按钮,提供编辑、重跑、重跑失败任务等功能。
其中编辑功能,只能编辑结束状态(成功/失败/停止)的流程,点击保存
按钮,在弹框有"是否更新工作流定义"。若不勾选,则不更新工作流定义。
任务定义
- 在任务定义的批量任务中,可以查看所有的工作流的任务,并提供了搜索等功能。以及右侧的操作按钮,方便我们快速的定位到某一个任务,进行各种操作。
- 也可以通过该部分创建新的任务。
注意:编辑任务和创建任务,都只能针对"下线"状态的工作流进行操作。
数据源中心
点击顶部的数据源中心
,再点击创建数据源
,可以添加多个数据源的管理。包括MySQL、PostgreSQL、HIVE、Spark、ClickHouse等。
具体不赘述。
资源中心
简介
资源中心的功能包括:文件管理、UDF管理和任务组管理。
配置
对于Standalone环境,我们修改standalone-server/conf/common.properties
。
对于集群模式,我们修改api-server/conf/common.properties
和worker-server/conf/common.properties
。
可能需要修改内容如下:
resource.storage.upload.base.path
resource.storage.type
可选值有:NONE
,DolphinScheduler不参与资源文件的管理,所有的资源文件管理和维护都由用户自行负责。HDFS
,使用Hadoop分布式文件系统(HDFS)来存储资源文件。S3
,利用AmazonS3或兼容S3API的对象存储服务来存储资源文件。OSS
,使用阿里云对象存储服务(Object Storage Service, OSS)。
准备工作(设置租户)
如果我们以admin
身份进入并操作资源中心,需要先给admin
设置租户。
案例
我们在demo
目录下创建文件名sh_demo.sh
,先点进demo
目录,然后点击创建文件,文件名是sh_demo
,文件内容是:
1 | echo "demo" |
之后,我们创建工作流的任务,选择资源demo/sh_demo.sh
,编写脚本sh demo/sh_demo.sh
,调用sh_demo.sh
。
注意:脚本中的资源文件路径名称需要和所选择资源的路径名称一致。
例如,在上例中,脚本不能是sh demo.sh
。
任务类型
在DolphinsSheduler中,有很多种类型的任务。
在这里,我们再讨论一些,SQL、Python、DataX、SubProcess、Dependent、Conditions、Switch。
SQL
参数说明:
- 数据源:选择对应的数据源。
- SQL类型:支持查询和非查询两种。
对于非查询语句,需要设置分段执行符号,例如MySQL中是;
。 - SQL语句:需要执行的SQL语句
- 前置SQL:在SQL语句之前执行的SQL。
- 后置SQL:在SQL语句之后执行的SQL。
Python
准备工作
需要在部署DolphinsSheduler的机器上,配置Python环境。
本文继续考虑Miniconda的虚拟环境,关于Miniconda虚拟环境的搭建可以参考《数据调度工具Airflow》。
创建虚拟环境ds
,示例代码:
1 | conda create --name ds python=3.12 |
修改/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/bin/env/dolphinscheduler_env.sh
,配置PYTHON_HOME
:
1 | export PYTHON_HOME=${PYTHON_HOME:-/opt/miniconda3/envs/DolphinsSheduler/bin/python} |
/opt/miniconda3/envs/ds/bin/python
,即miniconda3安装路径/envs/虚拟环境的名称/bin/python
。
例子
在DolphinsSheduler配置脚本如下:
1 | import pandas as pd |
- 注意:
pandas
包需要在机器的虚拟环境中安装。
运行该工作流,通过任务实例查看日志,内容如下:
1 |
|
DataX
准备工作
需要在部署DolphinsSheduler的机器上,配置DataX环境。
关于DataX的安装,可以参考《离线异构数据同步工具DataX:1.操作方法》。
修改/opt/dolphinscheduler/apache-dolphinscheduler-3.1.9-bin/bin/env/dolphinscheduler_env.sh
,配置DATAX_HOME
:
1 | export DATAX_HOME=${DATAX_HOME:-/opt/datax/datax} |
解释说明:
- 在本文,
/opt/datax/datax
的内容如下。示例代码:运行结果:1
ll /opt/datax/datax
1
2
3
4
5
6
7
8
9
10total 4
drwxr-xr-x. 2 root root 59 Dec 13 22:00 bin
drwxr-xr-x. 2 root root 68 Dec 13 22:00 conf
drwxr-xr-x. 2 root root 22 Dec 13 22:00 job
drwxr-xr-x. 2 root root 4096 Dec 13 22:00 lib
drwxr-xr-x. 3 root root 24 Dec 13 22:01 log
drwxr-xr-x. 3 root root 24 Dec 13 22:01 log_perf
drwxr-xr-x. 4 root root 34 Dec 13 22:01 plugin
drwxr-xr-x. 2 root root 23 Dec 13 22:00 script
drwxr-xr-x. 2 root root 24 Dec 13 22:00 tmp - 可能会报错
/opt/miniconda3/envs/ds/bin/python2.7: No such file or directory
解决办法有:- 利用DataX支持Python3和Python2的特点,在本文的解决办法是建立软连接,
ln -s /usr/bin/python3 /usr/bin/python2.7
。 - 利用DolphinScheduler的"环境管理",可以参考本文"安全中心-环境管理"部分的讨论。
- 利用DataX支持Python3和Python2的特点,在本文的解决办法是建立软连接,
使用
有两种使用方式:
- DolphinsSheduler方式,对于DataX中的参数,提供了图形化界面。
- 自定义模板(DataX原生)
SUB_PROCESS
简介
SUB_PROCESS,子流程节点,把外部的某个工作流定义
当做一个节点去执行。
使用
子节点:选择外部的工作流定义
。
注意:在使用SUB_PROCESS
调用子结点任务的时候,需要保证定义的子结点为上线状态,否则SUB_PROCESS的工作流无法正常运行。
DEPENDENT
DEPENDENT,依赖检查节点。
例如,A流程依赖昨天的B流程执行成功,可以通过依赖节点会去检查B流程在昨天是否有执行成功的实例。
CONDITIONS
说明
CONDITIONS,条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。
当上游任务数超过一个时,可以通过且
以及或
操作符实现复杂上游依赖。
使用
需要先连完线
需要先连完线,才能编辑选择条件和分支。
否则,无法选择分支。
复杂条件的配置
对于复杂条件的配置,可以通过定义前置检查条件进行配置。
Switch
与CONDITIONS非常类似,不赘述。
参数操作
分类
根据参数来源,可以分为:
- 内置参数
- 自定义参数
根据参数作用域,可以分为:
- 全局参数
- 本地参数
内置参数
内置参数,DolphinScheduler系统自带的参数。
变量名 | 声明方式 | 含义 |
---|---|---|
system.biz.date |
${system.biz.date} |
日常调度实例定时的定时时间前一天,格式为yyyyMMdd 。 |
system.biz.curdate |
${system.biz.curdate} |
日常调度实例定时的定时时间,格式为yyyyMMdd 。 |
system.datetime |
${system.datetime} |
日常调度实例定时的定时时间,格式为yyyyMMddHHmmss 。 |
使用:
- 定义
$[...]
时间日期格式,例如$[yyyyMMddHHmmss]
、$[yyyyMMdd]
、$[HHmmss]
、$[yyyy-MM-dd]
等。 - 也可以通过以下两种方式,选择前后的日期。
- 使用
add_months()
函数,该函数用于加减月份。
第一个入口参数为[yyyyMMdd]
,表示返回时间的格式;
第二个入口参数为月份偏移量,表示加减多少个月。- 后N年:
$[add_months(yyyyMMdd,12*N)]
- 前N年:
$[add_months(yyyyMMdd,-12*N)]
- 后N月:
$[add_months(yyyyMMdd,N)]
- 前N月:
$[add_months(yyyyMMdd,-N)]
- 后N年:
- 直接加减数字(天)
在自定义格式后直接"+/-"数字- 后N周:
$[yyyyMMdd+7*N]
- 前N周:
$[yyyyMMdd-7*N]
- 后N天:
$[yyyyMMdd+N]
- 前N天:
$[yyyyMMdd-N]
- 后N小时:
$[HHmmss+N/24]
- 前N小时:
$[HHmmss-N/24]
- 后N分钟:
$[HHmmss+N/24/60]
- 前N分钟:
$[HHmmss-N/24/60]
- 后N周:
- 使用
全局参数
作用域
全局参数,针对整个工作流的所有任务节点都有效的参数。
在工作流定义页面配置。
例子
我们在工作流定义页面定义页面配置全局参数,参数名是val
,参数值是vvvvvv
。
定义一个Shell类型的任务,内容是:
1 | echo ${val} |
本质(字符串拼接)
我们运行上述工作流,会在任务实例的日志看到:
1 |
|
raw script : echo vvvvvv
,即,${}
的本质是字符串拼接。
本地参数
作用域
仅限该任务;或通过参数传递,将该参数作用到下游任务。
在任务定义页面配置。
例子
任务一(上游任务)
脚本:
1 | echo ${v1} |
自定义参数:
v1
、IN
、DATE
、$[yyyyMMdd]
v2
、OUT
、VARCHAR
注意:
$[yyyyMMdd]
,是内置参数。echo '${setValue(key=value)}'
为固定写法
任务二(下游任务)
脚本:
1 | echo ${v2} |
注意,此时不要定义v2
,否则会覆盖上游传递的参数;因为参数优先级。
参数优先级
本地参数
> 上游任务传递的参数
> 全局参数
告警管理
告警实例管理
点击顶部的安全中心
,再点击告警实例管理
。
DolphinsSheduler支持多种告警方式,本文以邮件为例。
在具体配置参数方面,可以参考《数据调度工具Airflow》,本文不赘述。
告警组管理
一个告警组可以包含多个告警实例。这样方便将告警信息发送给多个告警组。
点击顶部的安全中心
,再点击告警组管理
,可以进行告警组的管理。
应用
然后我们可以在工作流运行的弹框,选择通知策略和告警组信息。
对于定时工作,在定时管理中进行设置。
安全中心
租户管理
租户对应的是Linux的用户
,用于Worker提交作业所使用的用户。
如果Linux没有这个用户,则会任务运行失败。
用户管理
用户分类
用户分为:
- 管理员用户
管理员用户有授权和用户管理等权限。 - 普通用户
普通用户可以创建项目和对工作流定义的创建,编辑,执行等操作。
用户授权
可授予的权限包括项目权限、资源权限、数据源权限、UDF函数权限等。
环境管理
环境管理,在线配置worker运行环境。
等价于上文的dolphinscheduler_env.sh
文件。
一个环境可以关联多个worker,一个worker可以关联多个环境。
在任务执行时,可以将任务分配给指定的worker分组,根据worker分组选择对应的环境。最终由该组中的worker节点执行环境后执行该任务。