如何在Airflow中跳过任务?

13

我想了解Airflow是否支持在DAG中跳过任务进行adhoc执行?

假设我的DAG图如下: task1 > task2 > task3 > task4

我想手动从task3开始启动DAG,最好的方法是什么?

我已经阅读了ShortCircuitOperator,但我正在寻找更多的adhoc解决方案,一旦触发执行就能应用。

谢谢!


分支运算符 + 变量,比如 skip_task_1=True,是否足够? - judoole
运行后,您可以通过在该任务上使用“清除”来“重新运行”该任务。这个方法可行吗? - judoole
如果可以的话,您可以尝试将Dag分成两个部分,并在task2> task3中使用TriggerDagRunOperator,就像这样https://github.com/apache/incubator-airflow/blob/272952a9dce932cb2c648f82c9f9f2cafd572ff1/airflow/example_dags/example_trigger_controller_dag.py - judoole
什么是BranchOperator + Variables?你能分享一个例子吗? - Maayan
使用 https://airflow.apache.org/concepts.html#variables,然后使用 BranchOperator 进行检查,如果变量为真,则跳过任务1和任务2,直接执行任务3。 - judoole
5个回答

19

您可以使用SkipMixin来跳过下游任务,这是ShortCircuitOperator 在底层使用的

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")

谢谢!但我正在寻找更加临时的东西——即能够从任何DAG上的任何任务开始。无论上游依赖关系如何以及是否满足这些依赖关系。 - Maayan
不确定如何做到这一点 - 您可以将task3设置为无论task1或task2的结果如何都运行,或者添加一个branchOperator来确定要运行哪个任务,但默认情况下,所有任务都将按照图表指示的顺序在执行中运行。 - Ben Gregory

13

是的,您可以通过另一种特殊方式来实现。

已经找到了!!

您需要引发AirflowSkipException异常。

from airflow.exceptions import AirflowSkipException
def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)

2
是的,只需单击任务3。切换到运行按钮右侧的复选框以忽略依赖项,然后单击运行。 enter image description here

0

从Apache Airflow的构建方式来看,您可以编写逻辑/分支来确定要运行哪些任务。

但是

您不能从中间的任何任务开始执行任务。排序完全由依赖关系管理(上游/下游)定义。

然而,如果您使用celery operator,可以在运行时忽略所有依赖项,并要求airflow按您的意愿执行该任务。不过,这将无法防止上游任务的调度。


谢谢!假设我只是在谈论手动触发,没有任何调度。 - Maayan
那么为什么一开始要安排时间呢? - Tameem
Airflow 提供了良好的流程管理,不仅仅是调度。我们主要关注的是依赖图、并行性等方面。 - Maayan
我是一个大型组织的一部分,Airflow已经存在并提供了我们需要的大部分功能,除了我在问题中提到的内容。 - Maayan
你的陈述自相矛盾。当下游任务依赖于上游(依赖项)时,只要上游有更新,就不能启动下游。目前,最好的方法是使用由@Ben Gregory描述的自定义跳过运算符。正如我之前提到的,“你不能从中间的任何任务开始任务”,这就是依赖关系的含义。对于一个任务来说,只有在不是第一个任务的情况下,所有上游任务的状态更新才能启动。 - Tameem
1
我们现在有LatestOnlyOperator,可以(在某种程度上)绕过限制:“..您不能从中间任何任务开始任务执行..” - y2k-shubham

-2

Maayan, 有一个非常简单但非常明显的解决方案,实际上只需要30秒。但是,只有在您可以轻松更新PROD中的代码并且能够暂时防止其他人运行DAG的情况下才可能实现。 只需注释要跳过的任务

'#task1 > task2 >

task3 > task4

更严肃的解决方案,但需要更多的努力,可能是基于start_from_task参数动态创建DAG,在这种情况下,依赖关系将使用此参数构建。可以使用Admin==>Variables菜单在UI中更改参数。您还可以使用先前变量的导出时间的另一个变量。例如 - DAG将忽略任务1和任务2直到14:05:30,之后将运行整个DAG。


1
我尝试过这个。但它并没有跳过任务,只是直接执行了任务3和任务2而已,没有等待。你需要将这些任务也放到注释中。 - Thomas R

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接