Airflow中的执行顺序

3
我正在查看示例代码这里
有两个“operation”函数:
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

并且:

def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

对于每次运行my_sleeping_function,我们都会运行print_context吗?

我不明白的是顺序。 这是图和树...执行顺序不同:

enter image description here enter image description here

先发生什么?之后又会发生什么?为什么?

我认为根据这个:

for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)

run_this执行,然后task执行,但循环让我感到困惑。

3个回答

5

我认为您的困惑在于您期望图形视图和树形视图是同一事物的两个不同可视化方式。然而,它们用于可视化不同的内容。

图形视图显示了工作流中任务运行的顺序。在您的情况下,print_the_context将会运行,一旦完成,sleep_for_0sleep_for_1sleep_for_2sleep_for_3sleep_for_4将会并行运行(或者至少尽可能并行,取决于您的airflow配置)。

树形视图表示DAG的深度优先可视化(以及右侧方块中每个任务随时间的状态)。也就是说,树的第一层节点是DAG中的最终任务(叶节点),其中DAG将被视为成功完成。它会分支出每个必须运行的依赖任务。

换句话说,在两个视图中执行顺序相同,只是从不同的方向进行可视化。


2
这里的循环只是展示如何动态构建DAG。执行顺序取决于您设置的“上游”或“下游”任务。您链接的示例也可以像以下示例一样完成。但是,如果您想添加另外10个任务呢?你需要大量的复制/粘贴来实现相同的事情,最好像链接的示例那样将其放入循环中:
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'


run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

task_0 = PythonOperator(
    task_id='sleep_for_' + 0,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(0) / 10},
    dag=dag)

task_1 = PythonOperator(
    task_id='sleep_for_' + 1,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(1) / 10},
    dag=dag)

task_2 = PythonOperator(
    task_id='sleep_for_' + 2,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(2) / 10},
    dag=dag)


task_3 = PythonOperator(
    task_id='sleep_for_' + 3,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(3) / 10},
    dag=dag)

task_4 = PythonOperator(
    task_id='sleep_for_' + 4,
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': float(4) / 10},
    dag=dag)


task_0.set_upstream(run_this)
task_1.set_upstream(run_this)
task_2.set_upstream(run_this)
task_3.set_upstream(run_this)
task_4.set_upstream(run_this)

1
我在这里寻找与问题无关的东西,但是你的解决方案恰好对我有用 :) - Akhil

2

@cwurtz是正确的!

要设置任务顺序,可以使用priority_weight任务参数。

池参数可与priority_weight一起使用,定义队列中的优先级以及随着池中可用插槽而开启的哪些任务先执行。(阅读更多


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接