以下是我们团队的管理方法。
首先,在命名规范方面,我们每个DAG文件名与DAG本身内容中的DAG Id匹配(包括DAG版本)。这很有用,因为最终在Airflow UI中看到的是DAG Id,所以您将确切地知道每个DAG背后使用了哪个文件。
例如,对于像这样的DAG:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017,12,05,23,59),
'email': ['me@mail.com'],
'email_on_failure': True
}
dag = DAG(
'my_nice_dag-v1.0.9',
default_args=default_args,
schedule_interval="0,15,30,45 * * * *",
dagrun_timeout=timedelta(hours=24),
max_active_runs=1)
[...]
DAG文件的名称将为:my_nice_dag-v1.0.9.py
- 我们所有的DAG文件都存储在Git仓库中(以及其他东西)
- 每当一个合并请求在我们的主分支中完成时,我们的持续集成管道会开始新构建,并将我们的DAG文件打包成zip文件(我们使用Atlassian Bamboo,但也有其他解决方案,如Jenkins、Circle CI、Travis等)
- 在Bamboo中,我们配置了一个部署脚本(shell),它会解压缩包并将DAG文件放置在Airflow服务器的/dags文件夹中。
- 我们通常会在DEV环境中部署DAG进行测试,然后是UAT和最终的PROD。点击Bamboo UI上的按钮即可完成部署,感谢上面提到的shell脚本。
优势:
- 由于您在文件名中包含了DAG版本,因此以前的DAG文件不会被覆盖在DAG文件夹中,因此您可以轻松返回它
- 当您的新DAG文件在Airflow中加载时,您可以通过版本号在UI中识别它。
- 因为你的DAG文件名=DAG ID,你甚至可以通过添加一些Airflow命令行来改进部署脚本,以便在部署后自动打开新的DAG。
- 因为每个DAG版本都在Git中被历史记录,所以我们可以随时返回到之前的版本。
from airflow.models import Variable
调用这些变量,然后使用Variable.get('my_variable_name')
。 - Alexis.Rolland