我想了解Airflow是否支持在DAG中跳过任务进行adhoc执行?
假设我的DAG图如下: task1 > task2 > task3 > task4
我想手动从task3开始启动DAG,最好的方法是什么?
我已经阅读了ShortCircuitOperator
,但我正在寻找更多的adhoc解决方案,一旦触发执行就能应用。
谢谢!
我想了解Airflow是否支持在DAG中跳过任务进行adhoc执行?
假设我的DAG图如下: task1 > task2 > task3 > task4
我想手动从task3开始启动DAG,最好的方法是什么?
我已经阅读了ShortCircuitOperator
,但我正在寻找更多的adhoc解决方案,一旦触发执行就能应用。
谢谢!
您可以使用SkipMixin来跳过下游任务,这是ShortCircuitOperator 在底层使用的。
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
是的,您可以通过另一种特殊方式来实现。
已经找到了!!
您需要引发AirflowSkipException异常。
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
从Apache Airflow的构建方式来看,您可以编写逻辑/分支来确定要运行哪些任务。
但是
您不能从中间的任何任务开始执行任务。排序完全由依赖关系管理(上游/下游)定义。
然而,如果您使用celery operator,可以在运行时忽略所有依赖项,并要求airflow按您的意愿执行该任务。不过,这将无法防止上游任务的调度。
Maayan, 有一个非常简单但非常明显的解决方案,实际上只需要30秒。但是,只有在您可以轻松更新PROD中的代码并且能够暂时防止其他人运行DAG的情况下才可能实现。 只需注释要跳过的任务
'#task1 > task2 >
task3 > task4
更严肃的解决方案,但需要更多的努力,可能是基于start_from_task参数动态创建DAG,在这种情况下,依赖关系将使用此参数构建。可以使用Admin==>Variables菜单在UI中更改参数。您还可以使用先前变量的导出时间的另一个变量。例如 - DAG将忽略任务1和任务2直到14:05:30,之后将运行整个DAG。
skip_task_1=True
,是否足够? - judoole