我认为宏prev_execution_date
列在这里会给我上一个DAG运行的执行日期,但查看源代码后发现只是根据DAG计划获取最后日期。
prev_execution_date = task.dag.previous_schedule(self.execution_date)
当DAG没有按计划运行时,是否有通过宏获取执行日期的方法?
我认为宏prev_execution_date
列在这里会给我上一个DAG运行的执行日期,但查看源代码后发现只是根据DAG计划获取最后日期。
prev_execution_date = task.dag.previous_schedule(self.execution_date)
当DAG没有按计划运行时,是否有通过宏获取执行日期的方法?
是的,您可以定义自己的自定义宏来实现这一点,如下所示:
# custom macro function
def get_last_dag_run(dag):
last_dag_run = dag.get_last_dagrun()
if last_dag_run is None:
return "no prev run"
else:
return last_dag_run.execution_date.strftime("%Y-%m-%d")
# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
schedule_interval='@daily',
user_defined_macros={
'last_dag_run_execution_date': get_last_dag_run
}
)
# example of using it in practice
print_vals = BashOperator(
task_id='print_vals',
bash_command='echo {{ last_dag_run_execution_date(dag) }}',
dag=dag
)
def get_last_dag_run(dag_id):
//TODO search DB
return xxx
dag = DAG(
'example',
schedule_interval='0 1 * * *',
user_defined_macros={
'last_dag_run_execution_date': get_last_dag_run,
}
)
然后在你的模板中使用 KEY。