定义Airflow DAG任务顺序的正确方法

3

我有一个相当长的DAG任务集,每个任务都有相当长的task_id,详细信息都是相关的,而且命名不能缩短。
目前我已经写成了这样:

a_very_long_long_named_task_1 >> a_very_long_long_named_task_2 >> a_very_long_long_named_task_3 >> a_very_long_long_named_task_4 >> a_very_long_long_named_task_5

在其他有向无环图中,我看到这被拆分为多行,尽管存在重复:
a_very_long_long_named_task_1 >> a_very_long_long_named_task_2
a_very_long_long_named_task_2 >> a_very_long_long_named_task_3
a_very_long_long_named_task_3 >> a_very_long_long_named_task_4
a_very_long_long_named_task_4 >> a_very_long_long_named_task_5

哪种方式被推荐?是否有最佳实践或者其他更好的定义任务排序的方法?

1个回答

4
  • 你可以随时在创建(实例化)任务时将其添加到Python list(或dict/ 类似的数据结构)中
  • 然后在结束时,您可以通过编程方式将它们连接起来

请注意,此代码片段未经测试

from typing import List
from airflow.models.baseoperator import BaseOperator

my_tasks: List[BaseOperator] = [
    a_very_long_long_named_task_1,
    a_very_long_long_named_task_2,
    a_very_long_long_named_task_3, 
    a_very_long_long_named_task_4, 
    a_very_long_long_named_task_5
]

..

# define a utility method to set dependencies b/w tasks
def wire_tasks(my_tasks: List[BaseOperator]) -> None:
    """
    A utility method that accepts a list of tasks and links them up
    :param my_tasks: List of tasks (operator instances)
    :type my_tasks: List[BaseOperator]
    :return None
    """
    for i in range(1, len(my_tasks)):
        # this is equivalent to my_tasks[i - 1].set_upstream(my_tasks[i])
        my_tasks[i - 1] >> my_tasks[i]

# call the utility method to wire the tasks
wire_tasks(my_tasks=my_tasks)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接