如何在 Airflow 中跳过一个任务而不跳过其下游任务?

6

假设这是我的DAG:

A >> B >> C

如果任务B引发异常,我想跳过该任务而不是使其失败。但是,我不想跳过任务C。我调查了AirflowSkipException和soft_fail sensor,但它们都会强制跳过下游任务。请问有人知道如何解决这个问题吗?

谢谢!

3个回答

7

目前发布的答案涉及不同主题或似乎不完全正确。

将触发器规则all_failed添加到任务C中,对于OP的示例DAG: A >> B >> C 是行不通的,除非任务A以failed状态结束,这很可能是不希望出现的。

实际上,OP非常接近预期行为,可以通过混合使用AirflowSkipExceptionnone_failed触发器规则来实现:

from datetime import datetime

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

with DAG(
    dag_id="mydag",
    start_date=datetime(2022, 1, 18),
    schedule_interval="@once"
) as dag:

    def task_b():
        raise AirflowSkipException

    A = DummyOperator(task_id="A")
    B = PythonOperator(task_id="B", python_callable=task_b)
    C = DummyOperator(task_id="C", trigger_rule="none_failed")

    A >> B >> C

Airflow 根据以下方式执行:

enter image description here

这个规则是什么意思?

触发规则

none_failed: 所有上游任务都没有失败或 upstream_failed- 也就是说,所有上游任务都成功或已被跳过。

所以,基本上我们可以在代码中捕获实际异常并引发提到的 Airflow 异常,从而将任务状态从 failed 更改为 skipped。但是,如果没有给 Task-C 提供 trigger_rule 参数,我们最终会将下游的 Task-B 标记为 skipped


1
你可以参考Airflow文档中的触发规则触发规则允许您配置任务的执行依赖关系。通常情况下,当所有上游任务成功时,任务才会被执行。您可以将其更改为Airflow提供的其他触发规则all_failed 触发规则仅在所有上游任务失败时执行任务,这将实现您所述的内容。
from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
        dag_id="my_dag",
        start_date=datetime(2021, 4, 5),
        schedule_interval='@once',
) as dag:
    p = PythonOperator(
        task_id='fail_task',
        python_callable=lambda x: 1,
    ) 
    
    t = PythonOperator(
        task_id='run_task',
        python_callable=lambda: 1,
        trigger_rule=TriggerRule.ALL_FAILED
    )
    
    p >> t

0

您可以更改任务声明中的trigger_rule

task = BashOperator(
    task_id="task_C",
    bash_command="echo hello world",
    trigger_rule="all_done",
    dag=dag
)

我认为trigger_rule应该设置为"all_done",如果B失败,则使用"none_failed"不会触发C。A >> B(无论异常或失败都继续使用"all_done"触发)>> C。不清楚ops是否希望在上游任务B引发异常或失败的情况下运行下游任务C。 - camel_case

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接