Airflow,在dag运行前标记任务成功或跳过

3

我们有一个庞大的DAG,其中包含许多小而快速的任务和少量耗时较长的大型任务。

我们想只运行DAG的一部分,而我们找到的最简单的方法是不添加我们不想运行的任务。问题在于,我们的DAG有许多相互依赖关系,因此在跳过某些任务时保持DAG完整性变成了一个真正的挑战。

有没有一种默认情况下为任务添加状态的方法?(对于每次运行)例如:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

或者。
for task in task_list:
    task.status = 'success'

如果你将所有不关心的任务标记为成功,而不在每次运行时执行,那么是什么阻止你从DAG中完全删除这些任务呢? - Ben Gregory
@BenGregory,我们通常作为“完整运行”来运行DAG,但有时会有一些任务失败,我们只想运行DAG的某些部分。相互依赖树非常庞大,简单地删除一个任务通常会破坏DAG。因此,我们希望将所有DAG标记为“成功”,除了需要运行和其依赖项的任务。 - Pablo
1
好的 - 我不确定在任务被评估之前如何默认设置任务状态。如果您正在运行一个大型DAG,而且大部分时间只需要运行某些部分,我建议在各个节点使用BranchPythonOperator(https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_branch_operator.py)来确定执行哪些下游任务和跳过哪些任务。 - Ben Gregory
@Pablo,你能解决这个问题吗?我能想到的另一种方法是将所有需要重新运行的部分移出主DAG,并将它们建模为单独的顶级DAG。现在,在您的主DAG中,您可以使用ShortCircuitOperatorBranchPythonOperatorTriggerDagRunOperator相结合来链接它们。这种方法仍然存在的问题是,如果您仍有更多部分需要在这些小型顶级DAG之后运行,那么您将需要像ExternalTaskSensor这样的东西来等待完成这些小型DAG以触发它们。难看的。 - y2k-shubham
@y2k-shubham 是的,我们使用了一个有点复杂但对我们的问题很有用的解决方法。正如您在主要问题中所看到的,我们正在寻找一种使用 env-var(动态地)修改dag的方法,我们没有找到跳过airflow任务的方法,但我们意识到可以基于 env-var 创建一个dag。我们所有的任务基本上都是相同的,因此我们根据保存在 env-var 中的任务列表创建了一个循环来创建它们。然后,当我们想要跳过某些任务时,我们使用图形算法修改该变量。希望这有所帮助。 - Pablo
考虑添加一个答案,描述您的解决方案/解决方法。 - y2k-shubham
2个回答

2
正如评论中所提到的,您应该使用BranchPythonOperator(或ShortCircuitOperator)来防止耗时任务执行。如果您需要运行这些耗时任务的下游操作,则可以使用TriggerRule.ALL_DONE使这些操作运行,但请注意,即使上游操作失败,这也将运行。
您可以使用Airflow变量影响这些BranchPythonOperators,而无需更新DAG,例如:
from airflow.models import Variable

def branch_python_operator_callable()
  return Variable.get('time_consuming_operator_var')

使用branch_python_operator_callable作为您的BranchPythonOperator的Python可调用函数。


有一个开关BranchPythonOperatorShortCircuitOperator用于每个任务(或DAG执行的每个组合),如果可能的话,我们希望避免这种情况。就像为每种类型的执行(考虑任务间的代码依赖关系)都有一个DAG一样。 是否有一种方法可以将任务保留在DAG中但跳过它们?或者仅给它们分配一个属性标记为成功? - Pablo
我正在寻找类似的解决方案。我必须创建任务,以便它仍然可以在用户界面中可见,但同时我想将其标记为跳过。 - alltej

0
你有没有考虑在可调用函数周围使用装饰器/高阶函数?
我正在考虑使用类似以下的东西:
def conf_task_id_skip(python_callable):
    def skip_if_configured(*args, **context):
        task_id = context["task_id"]
        dag_run = context["dag_run"]
        skip_task_ids = dag_run.conf.get("skip_task_ids", [])

        if skip_task_ids and task_id in skip_task_ids:
            return None
        else:
            return python_callable(*args, **context)
    
    return skip_if_configured

PythonOperator(
    task_id="task_id", 
    python_callable=conf_task_id_skip(task_callable)
)

然后,如果我想要的话,我可以手动跳过我想要跳过的任务(并仍然成功)。

如果您希望如此,您还可以通过添加检查来增加鲁棒性,以确保是否禁止跳过(例如在生产环境中):

def conf_task_id_skip(python_callable):
    def skip_if_configured(*args, **context):
        if Variable.get("disallow_conf_task_id_skip"):
            return python_callable(*args, **context)

        task_id = context["task_id"]
        dag_run = context["dag_run"]
        skip_task_ids = dag_run.conf.get("skip_task_ids", [])

        if skip_task_ids and task_id in skip_task_ids:
            return None
        else:
            return python_callable(*args, **context)
    
    return skip_if_configured

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接