我在主DAG的一个节点中使用了一个子DAG,工作流程正常。
我试图通过在子DAG中包含另一个子DAG来增加层次结构的级别。但Airflow似乎变得混乱了。在这方面有几个问题:
1)Airflow是否支持在子DAG中再次嵌套子DAG?如果支持,那么层次结构有限制吗?
2)在子DAG中使用另一个子DAG的最佳实践是什么?
我在主DAG的一个节点中使用了一个子DAG,工作流程正常。
我试图通过在子DAG中包含另一个子DAG来增加层次结构的级别。但Airflow似乎变得混乱了。在这方面有几个问题:
1)Airflow是否支持在子DAG中再次嵌套子DAG?如果支持,那么层次结构有限制吗?
2)在子DAG中使用另一个子DAG的最佳实践是什么?
最近我开始使用Airflow,而且Airflow确实支持在子DAG中嵌套子DAG。 我能够深入到4个级别,但我不确定层级的确切限制。 希望这有所帮助!
我最近开始尝试使用子DAG。我认为它们可以无限嵌套。很多网页建议避免使用子DAG,因为连接池的问题。
我做了一个这里的例子,有两个级别。它可以进一步重构,但它证明了这一点。
1)创建一个帮助方法来创建以parent.child格式命名的DAG子图。
def create_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
''''Returns a DAG which has the dag_id formatted as parent.child '''
return DAG(
dag_id='{}.{}'.format(parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
max_active_runs=15
)
2) 然后递归地创建并分配子DAG到父DAG。请记住,子DAG仍然是DAG。SubDagOperator只将其捆绑为父DAG的任务。
level1_list = ['AWS', 'AZURE']
level2_list = ['eu', 'us', 'ap', 'jp']
tasks = ['task_{}'.format(str(i)) for i in range(0, 10)]
for level1_item in level1_list:
level1_dag = create_sub_dag(dag_id, level1_item, datetime(2020, 3, 10), '0 6 * * *')
level1_subdag_operator = SubDagOperator(
subdag=level1_dag,
task_id=level1_item,
dag=dag,
)
level1_dag_id = '{}.{}'.format(dag_id, level1_item)
for level2_item in level2_list:
level2_dag = create_sub_dag(level1_dag_id, level2_item, datetime(2020, 3, 10), '0 6 * * *')
level2_subdag_operator = SubDagOperator(
subdag=level2_dag,
task_id=level2_item,
dag=level1_dag,
)
level2_dag_id = '{}.{}.{}'.format(dag_id, level1_item, level2_item)
create_tasks(level2_dag, tasks)
简短的回答是可以。 我使用subdag创建函数实现了这一点,遵循https://github.com/geosolutions-it/evo-odas/wiki/Airflow---about-subDAGs,-branching-and-xcom中的说明。 我实现了3级subdag层次结构。 关键是要非常小心地遵循您任务中的maindag.subdag符号。
task = SubDagOperator(
subdag=create_subdag(dag_name, task_id, datetime(2019, 1, 29) , None),
task_id=task_id,
dag=dag
)