假设这是我的DAG:
A >> B >> C
如果任务B引发异常,我想跳过该任务而不是使其失败。但是,我不想跳过任务C。我调查了AirflowSkipException和soft_fail sensor,但它们都会强制跳过下游任务。请问有人知道如何解决这个问题吗?
谢谢!
假设这是我的DAG:
A >> B >> C
如果任务B引发异常,我想跳过该任务而不是使其失败。但是,我不想跳过任务C。我调查了AirflowSkipException和soft_fail sensor,但它们都会强制跳过下游任务。请问有人知道如何解决这个问题吗?
谢谢!
目前发布的答案涉及不同主题或似乎不完全正确。
将触发器规则all_failed
添加到任务C中,对于OP的示例DAG: A >> B >> C
是行不通的,除非任务A以failed
状态结束,这很可能是不希望出现的。
实际上,OP非常接近预期行为,可以通过混合使用AirflowSkipException
和none_failed
触发器规则来实现:
from datetime import datetime
from airflow.exceptions import AirflowSkipException
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id="mydag",
start_date=datetime(2022, 1, 18),
schedule_interval="@once"
) as dag:
def task_b():
raise AirflowSkipException
A = DummyOperator(task_id="A")
B = PythonOperator(task_id="B", python_callable=task_b)
C = DummyOperator(task_id="C", trigger_rule="none_failed")
A >> B >> C
Airflow 根据以下方式执行:
这个规则是什么意思?
触发规则
none_failed: 所有上游任务都没有失败或 upstream_failed- 也就是说,所有上游任务都成功或已被跳过。
所以,基本上我们可以在代码中捕获实际异常并引发提到的 Airflow 异常,从而将任务状态从 failed
更改为 skipped
。但是,如果没有给 Task-C 提供 trigger_rule
参数,我们最终会将下游的 Task-B 标记为 skipped
。
触发规则
允许您配置任务的执行依赖关系。通常情况下,当所有上游任务成功时,任务才会被执行。您可以将其更改为Airflow提供的其他触发规则。 all_failed
触发规则仅在所有上游任务失败时执行任务,这将实现您所述的内容。from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="my_dag",
start_date=datetime(2021, 4, 5),
schedule_interval='@once',
) as dag:
p = PythonOperator(
task_id='fail_task',
python_callable=lambda x: 1,
)
t = PythonOperator(
task_id='run_task',
python_callable=lambda: 1,
trigger_rule=TriggerRule.ALL_FAILED
)
p >> t
您可以更改任务声明中的trigger_rule
。
task = BashOperator(
task_id="task_C",
bash_command="echo hello world",
trigger_rule="all_done",
dag=dag
)