Airflow:运行子DAG一次的模式

14

从Airflow文档中:

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything

我知道子DAG操作符实际上是通过BackfillJob实现的,因此我们必须为操作符提供一个schedule_interval。但是,是否有一种方法可以获得与子DAG的schedule_interval="@once"语义等效的方式?我担心如果我将schedule_interval="@daily"设置给子DAG,如果子DAG运行时间超过一天,那么子DAG可能会运行多次。

def subdag_factory(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        dag_id="{parent_dag_name}.{child_dag_name}".format(
            parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
        ),
        schedule_interval="@daily", # <--- this bit here
        default_args=args
    )

    ... do more stuff to the subdag here
    return subdag
如何伪造“仅在父DAG触发一次运行此子DAG”的功能。
2个回答

7
我发现对于我的subdags,schedule=@once命令完全可行。也许我的版本过时了,但是即使所有任务都成功(或被跳过),我在处理subdag时遇到的问题比反面情况要多。

以下是实际运行的示例代码:

subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
    dag_id=subdag_name,
    default_args=dargs,
    schedule_interval="@once",
)

事实上,我最初将几乎所有的DAG构建为我的子DAG的较高级的cfg文件。经过一些试验后,我不确定这是否是一个好主意,但是计划间隔对我从来没有造成过阻碍。

我正在运行一个相对较新的1.8版本,并进行了少量自定义。我一直按照示例DAG建议中的方法,将我的子DAG保存在DAG文件夹内部的一个文件夹中,以便它们不会出现在DagBag中。


我正在使用Airflow 1.7.1.3版本,目前不考虑升级到1.8版本,因为该版本会意外破坏自定义执行程序插件。我将看一下1.8版本是否支持以“@once”为计划的子任务运行,但是如果文档中明确说明不支持,那么我会感到惊讶。 - gnicholas
有运气了吗?我的代码仍然在愉快地运行着。我试图为您在1.7中查找规范的方法。我能找到的最接近的方法(假设@once不可行)是将实际子DAG任务的execution_timeout设置得比您在子DAG本身中设置的执行频率短。这样,您将在可能启动更多任务之前超时。我知道这只是猜测,但我很难找到我们分支中与您使用的旧版本Airflow相同的构建版本。 - apathyman
1
很想听听作者们为什么能够让这个工作,尽管文档明确表示它不应该。 - qwwqwwq

3

尝试使用 schedule=None 的外部触发器模式来运行子 DAG。在这种情况下,它只会在父 DAG 触发时才被运行。


3
为了澄清,您建议使用TriggerDagRunOperator来触发一个没有调度的dag吗?子dag的关键是我们想要阻塞语义,而触发dagrun操作符只会触发一个dagrun然后继续执行,并不会等待dagrun完成。此外,在airflow UI中没有透明度表明已运行子dag,您只知道触发了一些随机的dagrun。 - gnicholas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接