将顶层DAGs连接在一起

16

我需要几个相同的(仅参数不同)顶级DAG,它们也可以同时触发,具有以下约束/假设:

  • 单个顶级DAG将具有schedule_interval=None,因为它们只需要偶尔进行手动触发
  • 然而,DAG系列需要每天运行
  • 顺序数量在系列中的DAG是固定的(编写代码之前已知),并且很少更改(每隔几个月一次)
  • 无论DAG是成功还是失败,触发链都不能中断
  • 目前它们必须按系列一起运行;将来可能需要并行触发

所以我为dags目录中的每个DAG创建了一个文件,现在我必须将它们连接起来以进行连续执行。我已经确定了两种方法可以实现这一点:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • 在我的演示中可以工作,但以并行方式运行(不是顺序),因为它在移动到下一个之前不会等待触发的DAG完成
    • ExternalTaskSensor可能有助于克服上述限制,但会使事情非常混乱

我的问题是

  • 如何克服SubDagdag_idparent_id前缀的限制?
  • 如何强制TriggerDagRunOperator等待DAG完成?
  • 有没有其他/更好的方法将独立(顶级)DAG连接在一起?
  • 是否有解决方案可以为每个顶级DAG创建单独的文件(对于仅在输入方面不同的DAG)?

我正在使用puckel/docker-airflow,其中包括:
  • Airflow 1.9.0-4
  • Python 3.6-slim
  • CeleryExecutorredis:3.2.7配合使用

编辑-1

澄清@Viraj Parekh查询

您能否详细说明一下在触发之前等待DAG完成的含义?

当我触发import_parent_v1 DAG时,它应该使用TriggerDagRunOperator启动的所有3个外部DAG都会并行运行,即使我将它们顺序链接。实际上,日志表明,虽然它们是一个接一个地启动的,但在前一个结束之前执行已经转移到下一个DAG(TriggerDagRunOperator)。 enter image description here enter image description here 注意:在此示例中,顶级DAG的名称为importer_child_v1_db_X,它们对应的task_id(针对TriggerDagRunOperator)的名称为importer_v1_db_X

TriggerDagRunOperator是否可以成为DAG中的最后一个任务?

我必须将几个类似的DAG链接在一起,形成一个工作流,逐个触发它们。因此,不只是一个可以放在最后的TriggerDagRunOperator,而是许多(这里有3个,但在生产环境中可能会达到15个)。

根据 @Freedom 的报告(https://dev59.com/XZ_ha4cB1Zd3GeqPui5T#nurpnYgBc1ULPQZFs3_u),在 Airflow v2.0 之前不应扩展 SubDagOperator - y2k-shubham
以下是与该问题某种程度上相关的一些链接 link1 link2 link3 link4 - y2k-shubham
link5 - y2k-shubham
这是我在 Airflowdev 邮件列表上提出的问题的 链接。该邮件列表的链接为:(https://lists.apache.org/list.html?dev@airflow.apache.org)。 - y2k-shubham
4个回答

11

参考@Viraj Parekhanswer,我成功地让TriggerDagRunOperator按预期工作。现在我发布我的(部分)答案;随着事情变得更加清晰,我会进行更新。


如何克服SubDag中dag_id的parent_id前缀限制?
正如@Viraj所说,没有直接实现这一点的方法。扩展SubDagOperator以移除此检查可能有效,但我决定避开它。
如何强制 TriggerDagRunOperator 等待 DAG 完成?
  • 看到实现,很明显TriggerDagRunOperator的工作只是触发外部DAG,仅此而已。默认情况下,它不应该等待DAG完成。因此我观察到的行为是可以理解的。

  • ExternalTaskSensor是明显的出路。然而,在学习Airflow基础知识时,我依赖于DAG的手动触发schedule_interval=None)。在这种情况下,ExternalTaskSensor使得准确指定外部任务(等待其完成)的execution_date变得困难,否则传感器会被卡住

  • 因此,从实现中得到启示,我通过等待相关任务的所有task_instances完成来对ExternalTaskSensor的行为进行了小调整

    execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

    这实现了期望的结果:外部DAG按顺序一个接一个地运行。


这里有个解决方法可以避免为每个顶层DAG创建单独的文件(这些DAG仅在输入方面有所不同):将DAG分配给全局范围,使用globals()[dag_id] = DAG(..)

编辑-1

也许我参考的资源不正确(上面的link已经失效),但是ExternalTaskSensor已经包含了参数execution_deltaexecution_date_fn,以便轻松限制感知的任务的execution_date


虽然我在任何真正的DAG中都没有真正使用过ExternalTaskSensor,但它似乎仍然很棘手,难以实现。 - y2k-shubham
还要阅读一下这个想法,并注意报告任何陷阱 /原因,为什么它不能工作。 - y2k-shubham
嗨@y2k-shubham。在你的最后一个例子中,为什么你选择了10分钟:execution_delta=timedelta(minutes=10),?你怎么知道是10分钟?为什么不是5或15分钟?谢谢。 - arcee123
1
在这种情况下,ExternalTaskSensor 很难准确指定外部任务的 execution_date。对于 TriggerDagRunOperator()execution_date 字段是 模板化 的;使用与父 DAG 相同的 execution_date 值:TriggerDagRunOperator(task_id="...", trigger_dag_id="some_dag_id", execution_date="{{ts}}", ...),此时 ExternalTaskSensor() 将完全匹配,因为它始终使用相同的 execution_date,不需要 delta。 - Martijn Pieters
1
两年晚了。关于您的问题“如何强制TriggerDagRunOperator等待DAG完成?”,有一个名为wait_for_completion的参数,只有在触发的DAG完成时才会将操作器标记为“成功”。文档在这里 - Jialer Chew

2
  • 您能否详细说明在触发之前等待DAG完成的含义?是否可以将TriggerDagRunOperator作为DAG中的最后一个任务?

  • 要创建类似的DAG,您可以从一个Python文件动态生成DAG。您可以像这样操作:

from airflow import DAG

from airflow.operators.python_operator import PythonOperator


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

def hello_world_py(*args):
    print('Hello World')
    print('This is DAG: {}'.format(str(dag_number)))

dag = DAG(dag_id,
          schedule_interval=schedule,
          default_args=default_args)

with dag:
    t1 = PythonOperator(
        task_id='hello_world',
        python_callable=hello_world_py,
        dag_number=dag_number)

return dag


# build a dag for each number in range(10)
for n in range(1, 10):
dag_id = 'hello_world_{}'.format(str(n))

default_args = {'owner': 'airflow',
                'start_date': datetime(2018, 1, 1)
                }

schedule = '@daily'

dag_number = n

globals()[dag_id] = create_dag(dag_id,
                              schedule,
                              dag_number,
                              default_args)

您可以在这里了解更多关于该方法的信息。如果您大部分生成的DAG都相似,您可能需要考虑将配置存储在Airflow变量中(点击此处进入链接)
您可能无法克服SubDag操作员的前缀限制-建议您完全从工作流程中删除SubDags,并将它们作为单独的DAG运行-如果您发现自己必须这样做以重新运行旧的DagRuns,则这将使其更加容易。

我不理解参数dag_number。据我所知,在PythonOperator或者BaseOperator中都没有这样的参数。 - y2k-shubham

1
这对我很有帮助,当我使用计划无时。

trigger_dag = TriggerDagRunOperator(
    task_id=f'dag_id-trigger',
    trigger_dag_id='dag_id',
    python_callable=set_args,
    dag=dag,
)


def get_most_recent_dag_run(execution_date, **kwargs):
    return DagRun.find(dag_id='dag_id').pop().execution_date


sensor = ExternalTaskSensor(
    task_id=f'dag_id-sensor',
    external_dag_id='dag_id',
    execution_date_fn=get_most_recent_dag_run,
    dag=dag,
    poke_interval=5,
    external_task_id='last_task_id' # This is task need to be in your external dag
)


1
如果你正在寻找一种等待触发的DAG完成的方法,在Airflow 2.0中,比以前更容易实现了。有一个新版本的TriggerDagRunOperator可以让你这样做。不再需要使用ExternalTaskSensor了。我制作了一个10分钟的教程https://youtu.be/8uKW0mPWmCk。祝愉快!

值得一提的是(非关联)Marc的视频非常棒!我在来到这里之前就看过了。但为了回答问题更完整,您需要查找新的参数(已在Airflow 2.0中添加) TriggerDagRunOperator(...,wait_for_completion=True, poke_interval=30) - muon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接