Airflow: 使用分支跳过任务

3
在我的DAG中,我希望根据一个标志跳过一个任务(oracle_merge_hist_orig)。
我的逻辑是:
当oracle_branch=True时,执行[merge_op,update_table_op,table_count_op]
当oracle_branch=False时,执行[update_table_op, table_count_op]
我尝试使用BranchPythonOperator,如下所示:
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
    if oracle_branch:
        return "oracle_branch"
    else:
        return "normal_branch"

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

branching_op = BranchPythonOperator(
    task_id='branch_shall_run_oracle_merge_original_hist',
    python_callable=branch_func,
    dag= dag)

oracle_branch = DummyOperator(
    task_id='oracle_branch',
    dag=dag)

normal_branch = DummyOperator(
    task_id='normal_branch',
    dag=dag)

merge_op = DummyOperator(
    task_id='oracle_merge_hist_orig',
    dag=dag,
)

update_table_op = DummyOperator(
    task_id='update_table_job',
    dag=dag,
)

table_count_op = DummyOperator(
    task_id='table_count',
    dag=dag,
)

branching_op >> [oracle_branch,normal_branch] 
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op

然而,它跳过的不是任务,而是整个路径。
如何修复此问题,以便我只跳过“racle_merge_hist_orig”任务?
当oracle_branch=False时 enter image description here 当oracle_branch=True时 enter image description here

你使用的是哪个Airflow版本?我在Airflow 1.8中测试了相同的代码,它可以正常工作。 - Sai Neelakantam
@SaiKiranNeelakantam 我使用的是 1.10.3 版本。 - Ashika Umanga Umagiliya
我能够在Airflow 1.10.4上重现您的错误,并在下面发布了一个答案,可以解决这个问题。 - Sai Neelakantam
1个回答

10

每个任务都将有一个 trigger_rule,默认设置为all_success。我们可以覆盖它为不同的值,这些值在这里列出。

在您的DAG中,update_table_job任务有两个上游任务。由于其上游任务之一处于skipped状态,因此它也进入了skipped状态。我们可以通过将trigger_rule的默认值覆盖为one_success来避免这种情况,代码示例如下。

update_table_op = DummyOperator(
    task_id='update_table_job',
    trigger_rule='one_success',
    dag=dag
)

在此输入图片描述 注意: 我是在 Airflow 1.10.4 版本上测试过的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接