Apache Airflow切换分支。

4

我在使用Apache Airflow时遇到了一些将分支移动到另一个位置的问题。我有一个DAG,该DAG依赖于三个分支操作器。

all_empty_branch_task >> generate_round_task >> load_tasks
all_empty_branch_task >> resolving_branch_task
resolving_branch_task >> [
        export_final_annotation_task, annotation_branch_task, cleansing_branch_task]

Airflow UI中的DAG

我确认了 resolving_branch_task(check-resolving-branch) 这个 Python 函数返回的 annotation_branch_task(check-annotation-branch) 任务ID也是一个 Python 分支,但是在 resolving_branch_task 执行结束后,它什么也没做。 我不确定有什么问题。 值得注意的是,当我返回普通的任务ID时(而不是分支),它可以成功执行任务。 请问有人能帮忙吗?非常感谢。

1个回答

4
BranchPythonOperator 任务将跳过其 python_callable 未返回的整个“分支”中的所有任务。这意味着当“check-resolving-branch”不选择“export-final-annotation-task”时,它将被跳过,其下游任务包括“check-annotation-branch”任务和DAG中的所有其他任务。
为了解决这个问题,您可以在Airflow中使用触发规则。默认情况下,所有任务的触发规则是“all_success”。在此用例中,您可以将“check-annotation-branch”任务的触发规则设置为“all_done”,以便该任务在所有上游任务完成(成功、失败或跳过)后执行。
以下示例应该能够给您一个实现DAG所需的想法:
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="branch_test",
    start_date=datetime(2021, 9, 10),
    schedule_interval=None,
) as dag:

    @task
    def func1():
        ...

    @task
    def func2():
        ...

    @task
    def func3():
        ...

    @task
    def func4():
        ...

    branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "branch_2")
    branch_2 = BranchPythonOperator(
        task_id="branch_2", python_callable=lambda: "func3", trigger_rule=TriggerRule.ALL_DONE
    )

    func1() >> branch_1 >> func2() >> branch_2
    branch_1 >> branch_2 >> [func3(), func4()]

enter image description here


1
非常感谢您的回答,这是正确的答案,尽管我阅读了整个文档的那一部分,但我忽略了它,谢谢。 - samyouaret

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接