avatar


数据调度工具Airflow

简介

Airflow,一个用于调度和监控的工作流平台。

在Airflow中,用户定义一组基于有向无环图(DAG)的任务,Airflow将会按照依赖依次执行,并进行调度。

为什么不用Linux-crontab?
Linux-crontab的不足:

  • 在多任务调度执行的情况下,难以理清任务之间的依赖关系。
  • 不便于查看当前执行到哪一个任务。
  • 任务执行失败时没有自动的重试和报警机制。
  • 不便于查看执行日志,也即不方便定位报错的任务和错误原因。
  • 不便于查看调度流下每个任务执行的起止消耗时间,这对于优化task作业是非常重要的。
  • 没有记录历史调度任务的执行情况,这对于优化作业和错误排查是很重要的。

安装

Miniconda(Python)

本文,我们通过Miniconda搭建一个虚拟的Python环境。基于Miniconda的好处是,方便对虚拟环境进行管理。

什么是Miniconda

Conda是一个开源的包、环境管理器,可以用于在同一个机器上安装不同Python版本的软件包及其依赖,并能够在不同的Python环境之间切换。

Conda又可以分为:

  • Anaconda
    Anaconda包括Conda、Python以及一大堆安装好的工具包,比如:numpy、pandas等。
  • Miniconda
    Miniconda包括Conda、Python,以及部分必要的工具。

在本文,我们不需要如此多的工具包,选择MiniConda。

安装Miniconda

下载并执行脚本

Miniconda的下载地址:https://www.anaconda.com/download/

下载完成后,直接执行Miniconda3-latest-Linux-x86_64.sh即可安装。示例代码:

1
sh Miniconda3-latest-Linux-x86_64.sh

指定安装路径

在安装过程中,出现以下的提示,可以指定安装路径。

1
2
3
4
5
6
7
8
Miniconda3 will now be installed into this location:
/root/miniconda3

- Press ENTER to confirm the location
- Press CTRL-C to abort the installation
- Or specify a different location below

[/root/miniconda3] >>>

自动初始化conda

还会出现如下提示,我们选择yes

1
2
3
4
5
6
7
8
9
Do you wish to update your shell profile to automatically initialize conda?
This will activate conda on startup and change the command prompt when activated.
If you'd prefer that conda's base environment not be activated on startup,
run the following command when conda is activated:

conda config --set auto_activate_base false

You can undo this by running `conda init --reverse $SHELL`? [yes|no]
[no] >>>

配置

取消自动激活base环境

Miniconda安装完成后,默认每次打开Linux终端都会激活base环境,我们可通过如下命令,取消自动激活base环境。

1
conda config --set auto_activate_base false

配置conda国内镜像

1
2
3
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
conda config --set show_channel_urls yes

创建虚拟环境

1
conda create --name airflow python=3.12

创建完成后,执行conda activate airflow,激活环境。

激活后,会看到(airflow)

airflow

其他常用命令:

  • 退出环境:conda deactivate
  • 创建环境:conda create -n env_name
  • 查看所有环境:conda info --envs
  • 删除一个环境:conda remove -n env_name --all

更改pip的源

步骤如下:

  1. 创建.pip目录:
    1
    mkdir ~/.pip
  2. 新建pip.conf文件:
    1
    vim  ~/.pip/pip.conf
  3. 添加以下内容
    1
    2
    3
    4
    [global]
    index-url = https://pypi.tuna.tsinghua.edu.cn/simple
    [install]
    trusted-host = https://pypi.tuna.tsinghua.edu.cn

安装Airflow

安装

1
pip install apache-airflow

初始化

1
airflow db init

启动

需要启动两个服务:

  • Airflow Web:
    1
    airflow webserver -p 8080 -D
  • Airflow 调度:
    1
    airflow scheduler -D

创建账号

1
2
3
4
5
6
airflow users create \
--username kaka \
--firstname k \
--lastname k \
--role Admin \
--email i@m.kakawanyifan.com

之后会提示输入密码。

配置Airflow

账号创建完成后,浏览器访问:http://【HOSTNAME】:8080
登录,会看到如下提示:
airflow

这就是我们需要配置的内容。

修改数据库为MySQL

在MySQL中建库

1
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

安装依赖

可以安装pymysqlmysqlclient,本文以pymysql为例。

示例代码:

1
pip install pymysql

注意:不要安装mysql-connector-python,这个包在Airflow会引起各种问题。

修改配置文件

打开airflow.cfg,找到如下内容:

1
sql_alchemy_conn = sqlite:////root/airflow/airflow.db

修改为:

1
sql_alchemy_conn = mysql+pymysql://root:MySQL%%4001@127.0.0.1:3306/airflow_db
  • 在本文MySQL的密码是MySQL@01,在Airflow中需要替换为MySQL%%4001,因为@的URL编码是%40,而在Airflow的配置文件解析器中,%是变量插值的一部分,所以需要更替换为MySQL%%4001
  • 通过airflow info命令,可以查看airflow的安装路径,找到airflow.cfg

重新初始化

依次执行如下操作:

  1. Kill掉和Airflow相关的进程。
    可以先通过ps -ef|grep airflow确认,然后通过pkill -f "airflow",kill掉所有"airflow"相关的进程。
  2. 初始化Airflow
    启动Airflow
    设置用户名密码

修改执行器

官网不推荐使用顺序执行器,会造成任务调度阻塞。

修改airflow.cfg,找到如下部分:

1
2
3
4
5
6
7
8
# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
#
# Variable: AIRFLOW__CORE__EXECUTOR
#
executor = SequentialExecutor

修改为:

1
executor = LocalExecutor

再重启Airflow服务。

组件

一些概念

DAG

DAG,有向无环图。DAG将Task任务,集合在一起,并通过依赖关系组织起来。

DAGRun,DAG的运行对象。(类似于class和object的概念)

Task

Task,任务。Task是Airflow中执行的基本单位,任务被排列成DAG,并且在它们之间设置了上游和下游依赖关系,以表达它们应该运行的顺序。

Scheduler

Scheduler,调度器。Scheduler监控所有任务和DAG,并在任务的依赖关系完成后触发任务实例。

大致实现逻辑是,在后台,调度器启动一个子进程,该子进程监控并保持与指定DAG目录中所有DAG的同步。默认情况下,调度器每分钟收集一次DAG解析结果,并检查是否有任何活动任务可以被触发。
步骤如下:

  1. 检查需要新DagRun的DAG,并创建DagRun。
    调度器定期检查DAG目录,查看是否有DAG需要新的DagRun;如果需要,创建新的DagRun实例。
  2. 检查一批DagRun,确定可调度的TaskInstances或已完成的DagRun。
    调度器会检查已经存在的DagRun,确定哪些TaskInstances已经准备好可以执行,或者哪些DagRun已经完成。
  3. 选择可调度的TaskInstances,基于资源限制,将它们排队等待执行。
    调度器会选择满足所有依赖条件且可以执行的TaskInstances,并基于资源限制将TaskInstances排队等待执行。

WebServer

WebServer,提供图形界面,监控DAG运行状态,也可以对DAG操作。

Metadata Database

Metadata Database,元数据库。默认为SQLite,可以支持MySQL、PostgreSQL等多种数据库。Metadata Database存储所有的DAG、任务定义、运行历史、用户权限等。

Worker

Worker,用来执行Executor接收的任务。这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

Executor

Executor,执行器。常见的执行器有四种选择:

  • SequentialExecutor:单进程顺序执行任务,默认执行器。
    不是说在一个DAG中是Sequential的,在多个DAG之间,还是Sequential的。所以,很慢。通常只用于测试。
  • LocalExecutor:多进程本地执行任务。
  • CeleryExecutor:分布式调度,生产常用。
  • DaskExecutor:动态任务调度,主要用于数据分析。

结构

结构

  1. UserInterface访问WebServer
  2. WebServer、Scheduler和Workers都会访问Metadata-Database
  3. WebServer、Scheduler和Workers也都会访问DAG-Directory(存放DAG的文件夹)。
    1. Scheduler中需要有单独的进程,定期扫描DAG-Directory。
    2. WebServer也会读取DAG-Directory,在WebUI中展示。
    3. Wordks也需要读取DAG-Directory,获取DAG的逻辑。
  4. Executor驱动Worker。

UI操作

DAGs

DAGs-主页

DAGs

  • AllActivePaused:所有、活动、暂停的DAG。
  • 每一个DAG左侧,有一个滑动开关,用于修改当前DAG的状态是活动或暂停。
  • Owner:当前DAG所属的用户。
  • Runs:运行状态。每个圆圈数字有不同的含义,鼠标移过去能看到。
  • Schedule:调度模式。
  • Last Run:最近一次运行时间。
  • Next Run:下一次运行时间。
  • Recent Tasks:标识Tasks的最近状态,每个圆圈数字有不同的含义,鼠标移过去能看到。
  • Actions:三个按钮,分别是执行、刷新、删除。

DAG

点进某一个DAG,会看到如下内容:

DAG

  • Graph:该DAG的结构,多个Task之间的关系。
  • Calendar:以日历形式DAG历史的运行情况。
  • Task Duration:显示历次运行不同Task的耗时情况。
  • Gantt:甘特图。我们点击Gantt,可能会看到提示如下:
    1
    Please select a dag run in order to see a gantt chart
    此时,需要在左侧再点击某一个具体的DAG-Run。
  • Details:DAG的详细信息。

TASK

点击右侧,某一个DAG-Run的具体Task,会看到如下内容:

TASK

  • Logs:执行日志。
  • Details:Task的详情。
  • XCom:cross communication,Task和Task之间通信用。
  • Mark state as…:可以设置某个Task为successfailed

Browse

  • DAG Runs:展示DAG-Run实例。
  • Task Instances:展示Task实例。
  • SLA Misses:Service Level Agreement Misses,指的是未能满足服务级别协议中的时间要求。是指一个或多个任务实例未能在其预定的SLA时间内完成。
  • DAG Dependencies:DAG之间的依赖关系。

使用

版本控制

airflow.cfg中,我们会看到dags_folder的配置项,在本文:

1
dags_folder = /root/airflow/dags

对于DAG的版本控制,建议如下:

  1. 代码管理
    Airflow没有内部的版本控制机制,我们通常会结合Git等工具来实现版本控制。可以将所有的DAG文件放在某个代码仓库中,当有变更时,通过提交代码来追踪变更历史。
  2. 环境隔离
    在进行DAG开发和测试时,可以使用功能分支来隔离不同版本的DAG。在实际部署之前,先在开发环境或测试环境中进行充分测试,确保不影响生产环境。
  3. 回滚机制
    利用版本控制系统的回滚特性,一旦发现某个版本的 DAG 有问题,可以快速回滚到之前稳定的版本,确保系统的稳定性和可靠性。

入门案例

在"dags_folder"添加文件download_stock_price.py,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

import akshare as ak


def download_price():
symbol = "000001"
stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol=symbol, period="daily")
print(type(stock_zh_a_hist_df))
print(stock_zh_a_hist_df.shape)
stock_zh_a_hist_df.to_csv(f"/data/csv/{symbol}.csv", index=False)


default_args_val = {
"owner": "kaka",
"depends_on_past": False,
"email": ["i@m.kakawanyifan.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
dag_id="Download_Stock_Price",
default_args=default_args_val,
description="Download stock price and save to local csv files.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
dag.doc_md = "Download stock price and save to local csv files."
download_task = PythonOperator(
task_id="download_price",
python_callable=download_price
)

我们可以WebUI上手动执行,查看日志,检查机器的文件。

关于该DAG的解释:

  • default_args_val
    • owner:指定该DAG的所有者,用于标识谁负责这个工作流。
    • depends_on_past:如果设置为True,则任务实例会依赖于上一次调度周期的任务成功完成;如果设置为False,那么即使上一个任务失败了,当前任务也会尝试运行。
    • email:当某些条件满足时(如任务失败),将发送通知到这些电子邮件地址。
    • email_on_failure:设置为True时,如果任务失败,Airflow将发送一封电子邮件通知。
    • email_on_retry:设置为True时,当任务重试时,Airflow也会发送电子邮件通知。
    • retries:定义在任务失败后可以重试的次数。
    • retry_delay:在每次重试之间等待的时间,这里设置为5分钟。
  • 创建DAG对象
    • dag_id:是DAG的唯一标识符,用来在Airflow中识别和管理不同的DAG。
    • default_args:使用之前定义的默认参数字典来配置此DAG和其任务的一些属性。
    • description:提供关于DAG功能的简短描述。
    • schedule_interval:设定了DAG的调度间隔,这里是每天一次。
    • start_date:指定DAG第一次运行的时间点,days_ago(2)表示从两天前开始计算。
    • tags:为DAG添加标签,便于分类管理和搜索。
  • 设置文档字符串
    1
    dag.doc_md = "Download stock price and save to local csv files."
    为DAG设置了一个Markdown格式的文档字符串(doc_md),可以用来提供更详细的说明或文档。
  • 定义任务
    • PythonOperator:创建了一个新的任务,它会在DAG调度时执行。
    • task_id:任务的唯一标识符,在DAG内必须是唯一的。
    • python_callable:指向要执行的Python函数,这里是download_price函数,是实际的任务逻辑。

Variable

配置Variable

配置方法

在WebUI,依次点击AdminVariables+,新建Variable。

非JSON格式

非JSON格式:symbol_list000001,000002,600000

JSON格式

JSON格式:symbol_list_json["000001","000002","600000"]

获取Variable

非JSON格式

1
2
3
4
5
6
7
8
9
def download_price():
symbol_list_str = Variable.get("symbol_list")
symbol_list = symbol_list_str.split(",")
for symbol in symbol_list:
stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol=symbol, period="daily")
print(symbol)
print(type(stock_zh_a_hist_df))
print(stock_zh_a_hist_df.shape)
stock_zh_a_hist_df.to_csv(f"/data/csv/{symbol}.csv", index=False)

JSON格式

1
2
3
4
5
6
7
8
def download_price():
symbol_list_ja = Variable.get("symbol_list_json", deserialize_json=True)
for symbol in symbol_list_ja:
stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol=symbol, period="daily")
print(symbol)
print(type(stock_zh_a_hist_df))
print(stock_zh_a_hist_df.shape)
stock_zh_a_hist_df.to_csv(f"/data/csv/{symbol}.csv", index=False)

解释说明

本文只是为了讲解Variable的用法。
在实际业务中,涉及到多个资产的数据处理,基于Variable配置,绝不是一个好操作。
我以为,将股票列表存储在数据库的某张表,是一个更好的操作。

注意

不要在Variable保存"password"、"API_KEY"等,Airflow会自动帮我们加密。

加密

这样,尤其是时间久了之后,我们很难判断,自己填写的对不对。

依赖关系

在Airflow中使用>>操作符来设置依赖关系。
例如:

  • A完成,才能执行B。B依赖A
    1
    task_a >> task_b
  • A和B都完成,才能执行C。
    1
    [task_a, task_b] >> task_c
  • A完成才,能执行B和C。
    1
    task_a >> [task_b, task_c]
  • A和B完成,才能执行C和D。
    1
    [task_a, task_b] >> [task_c, task_d]

对于复杂的依赖关系,可以分多行书写:

1
2
3
4
5
6
7
8
# A完成才能执行B
task_a >> task_b

# B和C完成才能执行D
[task_b, task_c] >> task_d

# ABCD都完成,才能执行E
[task_a, task_b, task_c, task_d] >> task_e

综合案例

将股票价格数据存入数据库(原始方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from datetime import timedelta

import akshare as ak
import pandas as pd
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from sqlalchemy import create_engine


def get_symbol_list():
return Variable.get("symbol_list_json", deserialize_json=True)


def download_price():
symbol_list = get_symbol_list()
for symbol in symbol_list:
stock_zh_a_hist_df = ak.stock_zh_a_hist(symbol=symbol, period="daily")
print(symbol)
print(type(stock_zh_a_hist_df))
print(stock_zh_a_hist_df.shape)
stock_zh_a_hist_df.to_csv(f"/data/csv/{symbol}.csv", index=False)


def upload_mysql():
column_mapping = {
'日期': 'date',
'股票代码': 'stock_code',
'开盘': 'open_price',
'收盘': 'close_price',
'最高': 'highest_price',
'最低': 'lowest_price',
'成交量': 'volume',
'成交额': 'turnover',
'振幅': 'amplitude',
'涨跌幅': 'change_pct',
'涨跌额': 'change_amount',
'换手率': 'turnover_rate'
}

engine = create_engine('mysql+pymysql://root:MySQL%4001@127.0.0.1:3306/demodb')

symbol_list = get_symbol_list()
for symbol in symbol_list:
df = pd.read_csv(f"/data/csv/{symbol}.csv")
df.rename(columns=column_mapping, inplace=True)
print(symbol)
print(type(df))
print(df.shape)
df.to_sql(name='stock_data', con=engine, if_exists='append', index=False)


default_args_val = {
"owner": "kaka",
"depends_on_past": False,
"email": ["i@m.kakawanyifan.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG(
dag_id="Download_Stock_Price",
default_args=default_args_val,
description="Download stock price and save to local csv files.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
dag.doc_md = "Download stock price and save to local csv files."
download_task = PythonOperator(
task_id="download_price",
python_callable=download_price
)

upload_task = PythonOperator(
task_id="upload_price",
python_callable=upload_mysql
)

download_task >> upload_task
  • apache-airflow-2.10.3sqlalchemy-1.4.54是配套的。
  • 但是pandas-2.1.4以上的版本,要求sqlalchemy是2.X版本的。
  • 所以,我们需要指定Pandas的版本,示例代码:
    1
    pip install pandas==2.1.4

使用AirflowConnection管理数据库连接信息

安装

我们需要安装providers插件apache-airflow-providers-mysql

1
pip install apache-airflow-providers-mysql

对于Linux(Rocky),在安装过程中可能会报错:

1
2
3
4
5
6
7
8
  note: This error originates from a subprocess, and is likely not a problem with pip.
error: subprocess-exited-with-error

× Getting requirements to build wheel did not run successfully.
│ exit code: 1
╰─> See above for output.

note: This error originates from a subprocess, and is likely not a problem with pip.

因为需要先安装MySQL服务:

  1. 在如下地址找到MySQL8.0版本对应的mysql84-community-release-el9-1.noarch.rpm
    https://dev.mysql.com/downloads/repo/yum
  2. mysql84-community-release-el9-1.noarch.rpm上传到服务器,进行安装:
    1
    rpm -i mysql84-community-release-el9-1.noarch.rpm
  3. 安装python3-develmysql-devel
    1
    dnf install -y python3-devel mysql-devel
  4. 安装mysqlclient
    1
    pip install mysqlclient

配置

依次点击AdminConnection,新增连接,配置数据库连接信息。

配置连接

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from datetime import timedelta

import akshare as ak
import pandas as pd
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from sqlalchemy import create_engine
from urllib.parse import quote_plus


def get_symbol_list():
【部分代码略】


def download_price():
【部分代码略】


def upload_mysql():
column_mapping = {
'日期': 'date',
'股票代码': 'stock_code',
'开盘': 'open_price',
'收盘': 'close_price',
'最高': 'highest_price',
'最低': 'lowest_price',
'成交量': 'volume',
'成交额': 'turnover',
'振幅': 'amplitude',
'涨跌幅': 'change_pct',
'涨跌额': 'change_amount',
'换手率': 'turnover_rate'
}

conn_info = BaseHook.get_connection('demodb')

username = conn_info.login
password = conn_info.password
host = conn_info.host
port = conn_info.port
database = conn_info.schema

# 使用 quote_plus 对密码进行 URL 编码
encoded_password = quote_plus(password)

connection_url = f"mysql+pymysql://{username}:{encoded_password}@{host}:{port}/{database}"
engine = create_engine(connection_url)

symbol_list = get_symbol_list()
for symbol in symbol_list:
df = pd.read_csv(f"/data/csv/{symbol}.csv")
df.rename(columns=column_mapping, inplace=True)
print(symbol)
print(type(df))
print(df.shape)
df.to_sql(name='stock_data', con=engine, if_exists='append', index=False)


default_args_val = {
【部分代码略】
}

with DAG(
dag_id="Download_Stock_Price",
default_args=default_args_val,
description="Download stock price and save to local csv files.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
dag.doc_md = "Download stock price and save to local csv files."
download_task = PythonOperator(
task_id="download_price",
python_callable=download_price
)

upload_task = PythonOperator(
task_id="upload_price",
python_callable=upload_mysql
)

download_task >> upload_task

重点关注如下部分:

1
from airflow.hooks.base import BaseHook
1
conn_info = BaseHook.get_connection('demodb')
1
2
3
4
5
username = conn_info.login
password = conn_info.password
host = conn_info.host
port = conn_info.port
database = conn_info.schema

使用MySqlOperator执行数据库操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from datetime import timedelta

import akshare as ak
import pandas as pd
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.utils.dates import days_ago
from sqlalchemy import create_engine
from urllib.parse import quote_plus


def get_symbol_list():
【部分代码略】


def download_price():
【部分代码略】


def upload_mysql():
【部分代码略】

default_args_val = {
【部分代码略】
}

with DAG(
dag_id="Download_Stock_Price",
default_args=default_args_val,
description="Download stock price and save to local csv files.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
【部分代码略】

mysql_task = MySqlOperator(
task_id='max_min_price',
mysql_conn_id='demodb',
sql='max_min.sql',
dag=dag,
)

download_task >> upload_task >> mysql_task

重点关注如下部分:

1
2
3
4
5
6
mysql_task = MySqlOperator(
task_id='max_min_price',
mysql_conn_id='demodb',
sql='max_min.sql',
dag=dag,
)

max_min.sql位于"dags_folder",在本文内容如下:

1
2
alter table stock_data add max_min decimal(10, 2) null after lowest_price;
update stock_data set max_min = (highest_price - lowest_price)

进阶

XCOM

XCom:cross communication,Task和Task之间通信用。

本文提供一个简单的DEMO,关于"XCOM"的更多用法,可以参考:

  • example_xcom
  • example_xcom_args
  • example_xcom_args_with_operators
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from datetime import timedelta

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def func_1():
return 'func'


def func_2(*args, **context):
val = context['ti'].xcom_pull(task_ids='task_1')
print(val)


with DAG(
dag_id="xcom_demo",
description="XCOM DEMO",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
dag.doc_md = "XCOM DEMO"
task_1 = PythonOperator(
task_id="task_1",
python_callable=func_1
)

task_2 = PythonOperator(
task_id="task_2",
python_callable=func_2
)

task_1 >> task_2

发邮件

配置

1
2
3
4
5
6
7
8
9
10
[smtp]
smtp_host = smtp.qq.com
smtp_starttls = False
smtp_ssl = True
smtp_user = assistantmail@qq.com
smtp_password = 【password】
smtp_port = 465
smtp_mail_from = assistantmail@qq.com
smtp_timeout = 30
smtp_retry_limit = 5

EmailOperator 手动发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from datetime import timedelta

from airflow.models.dag import DAG
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago


with DAG(
dag_id="email_demo",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["kaka"]
) as dag:
email_task = EmailOperator(
task_id='send_email',
to='i@m.kakawanyifan.com',
subject='email',
html_content="email",
dag=dag
)

自动发送告警邮件

该部分,我们上文讨论过。

1
2
3
4
5
6
7
8
9
default_args_val = {
"owner": "kaka",
"depends_on_past": False,
"email": ["i@m.kakawanyifan.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
  • email:当某些条件满足时(如任务失败),将发送通知到这些电子邮件地址。
  • email_on_failure:设置为True时,如果任务失败,Airflow将发送一封电子邮件通知。
  • email_on_retry:设置为True时,当任务重试时,Airflow也会发送电子邮件通知。

catchup

什么是catchup

catchup,追赶。
在实现DAG具体逻辑后,如果将catchup设置为True,Airflow将回填所有过去的DAGRun;如果将catchup设置为False,Airflow将从最新的DAGRun时刻前一时刻开始执行DAGRun,忽略之前所有的记录。

配置方法

在定义DAG的时候,传入catchup的参数,默认是True

该默认值可以通过airflow.cfgcatchup_by_defaul修改。

在下文,我们定义某个DAG每隔1分钟执行一次,调度开始时间为2001-01-01,并设置catchup为True,那么DAG将从2001-01-01 00:00:00开始每分钟都会运行当前DAG。如果catchup设置为False,那么DAG将从当前时间的前一刻开始执行DAG-Run。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from airflow import DAG

from airflow.operators.bash import BashOperator

from datetime import datetime, timedelta

default_args = {
# 拥有者名称
'owner': 'kaka',
# 第一次开始执行的时间,为 UTC 时间
'start_date': datetime(2001, 1, 1),
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}

dag = DAG(
dag_id='catchup_demo',
default_args=default_args,
schedule_interval=timedelta(minutes=1),
catchup=True
)

first = BashOperator(
task_id='first',
bash_command='echo "run first task"',
dag=dag
)
middle = BashOperator(
task_id='second',
bash_command='echo "run second task"',
dag=dag
)
last = BashOperator(
task_id='third',
bash_command='echo "run third task"',
dag=dag,
retries=3
)
first >> middle >> last

经验

  1. 部署Airflow的机器,内存应较大,或者考虑分布式的Airflow。
  2. 任务的幂等性。
    在设计每一个任务的时候,尽量做到可以重复多次运行,即使在运行过程中被打断了,下次运行依然不会出问题。
  3. 在DAG的定义中,尽量不要有资源密集型操作。
    Airflow会定期扫描所有DAG文件以查找新的或更新的DAG,以便将它们加载到数据库并使之可用。
    如果我们在DAG定义中包含了资源密集型操作,比如复杂的计算、大型数据集的处理或者耗时的API调用等,这些操作会在每次扫描时被执行,这不仅浪费了系统资源,还可能显著增加DAG解析的时间,进而影响整个Airflow系统的性能和响应速度。
    所以,在DAG文件中只定义任务的工作流逻辑,即如何安排和链接各个任务,而不应该包含任何实际的任务执行逻辑或资源密集型操作。任务的具体执行逻辑应当放在由DAG调度触发的单独的operator中,这样可以确保只有当DAG被真正执行的时候,这些操作才会被执行,而不是每次扫描DAG文件的时候。
文章作者: Kaka Wan Yifan
文章链接: https://kakawanyifan.com/19914
版权声明: 本博客所有文章版权为文章作者所有,未经书面许可,任何机构和个人不得以任何形式转载、摘编或复制。

留言板