Airflow 2.0分支操作后任务被跳过。

3

我在Airflow的新版本中尝试使用分支,无论我尝试什么方法,在BranchOperator之后的所有任务都被跳过了。

这是我尝试完成的最小示例:

from airflow.decorators import dag, task
from datetime import timedelta, datetime

from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

import logging
logger = logging.getLogger("airflow.task")

@dag(
    schedule_interval="0 0 * * *",
    start_date=datetime.today() - timedelta(days=2),
    dagrun_timeout=timedelta(minutes=60),
)
def StackOverflowExample():

    @task
    def task_A():

        logging.info("TASK A")
        

    @task
    def task_B():

        logging.info("TASK B")

    @task
    def task_C():

        logging.info("TASK C")

    @task
    def task_D():
        
        logging.info("TASK D")

        return {"parameter":0.5}

    
    def _choose_task(task_parameters,**kwargs):

        logging.info(task_parameters["parameter"])
        if task_parameters["parameter"]<0.5:
            logging.info("SUCCESSS ")
            return ['branch_1', 'task_final']
        else:
            logging.info("RIP")
            return ['branch_2', 'task_final']

    @task(task_id="branch_1")
    def branch_1():
        logging.info("branch_1...")

    @task(task_id="branch_2")
    def branch_2():
        logging.info("branch_2")

    @task(task_id="task_final")
    def task_final():
        logging.info("task_final")


    parameter = task_A() >> task_B() >> task_C() >> task_D()   

    choose_task = BranchPythonOperator(
                                            task_id='choose_best_model',
                                            op_kwargs={"task_parameters":parameter},
                                            python_callable=_choose_task,
                                            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
                                            )



    choose_task >> [branch_1(), branch_2()] >> task_final()


dag = StackOverflowExample  ()

Airflow DAG graph

有什么线索吗?我对触发规则持怀疑态度。作为一个初学者,我不会排除其他我忽略的问题。

1个回答

5

您需要在task_final上设置触发规则。 您希望在branch_1branch_2执行完成后(无论哪一个被执行/跳过),都要执行task_final,因此您需要设置所有已完成的触发规则:

@task(task_id="task_final", trigger_rule=TriggerRule.ALL_DONE)
def task_final():
    logging.info("task_final")

enter image description here


1
非常感谢!我能问一下为什么需要为任务“task_final”设置触发规则,而不是整个工作流程的其余部分吗? - ABaron
3
@ABaron,因为trigger_rule的默认值为ALL_SUCCESS,这对其他任务是可以的。task_final是您的工作流中仅依赖于2个任务的任务之一,其中一个任务保证会被跳过。 - Elad Kalif

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接