如何在DAG中获取operator之外的execution_date?

7

如何在dag之外获取execution_date参数?

execution_min = "{{execution_date.strftime('%M') }}"

if execution_min == '00':
    logging.info('**** ' + "YES, It's 00")
    final_task = DummyOperator(
        task_id='task_y00',
        ...
        dag=dag
    )
else:
    logging.info('**** ' + "NOPE!!!")
    final_task = DummyOperator(
        task_id='task_n00',
        ...
        dag=dag
    )

我希望能够动态设置带有执行日期(尤其是分钟)的任务流

但是Jinja模板在使用template_fields = ['execution_date']时无法工作

是否有任何解决方案可以从操作者外部获取执行参数(=DAG本身)?

2个回答

7

执行日期是特定于DagRun的。DAG定义文件中没有DagRun信息(因为这些信息只有在运行时通过Jinja解析Operator的模板字段时才可用)。即使dag未运行,调度器、Web服务器和Worker也会频繁地解析DAG定义文件。这就是为什么在实际的DagRun之外无法访问执行日期等内容。

此外,在运行时无法添加/减少DAG的任务。你可以拥有动态DAG,其结构在运行之前已确定(例如将文件解析为DAG结构),但是你无法在运行时添加任务或修改DAG的结构。


1
尝试只使用execution_min = "{{ execution_date }}",然后再使用strftime,确保双括号前后有空格。
更新:如果您在Operator之外使用它,它将无法工作,您可以传递kwargs然后使用它。Airflow:将{{ ds 作为参数传递给PostgresOperator}}。

1
它既不起作用,也不...... 还有......{{execution_date.strftime('%M') }}这种jinja模板在操作符中也可以使用。 - robinhur

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接