Apache Airflow宏获取最后一次DAG运行的执行时间

9

我认为宏prev_execution_date列在这里会给我上一个DAG运行的执行日期,但查看源代码后发现只是根据DAG计划获取最后日期。

prev_execution_date = task.dag.previous_schedule(self.execution_date)

当DAG没有按计划运行时,是否有通过宏获取执行日期的方法?


1
对于这个问题的读者,值得检查 [1] 获取最近成功DAG执行日期[2] 如何获取Airflow作业的最后两次成功执行日期? 或者这个 搜索结果 - y2k-shubham
2个回答

12

是的,您可以定义自己的自定义宏来实现这一点,如下所示:

# custom macro function
def get_last_dag_run(dag):
    last_dag_run = dag.get_last_dagrun()
    if last_dag_run is None:
        return "no prev run"
    else:
        return last_dag_run.execution_date.strftime("%Y-%m-%d")

# add macro in user_defined_macros in dag definition
dag = DAG(dag_id="my_test_dag",
      schedule_interval='@daily',
      user_defined_macros={
          'last_dag_run_execution_date': get_last_dag_run
      }
)

# example of using it in practice
print_vals = BashOperator(
    task_id='print_vals',
    bash_command='echo {{ last_dag_run_execution_date(dag) }}',
    dag=dag
)

请注意,dag.get_last_run()只是Dag对象上可用的众多函数之一。这是我找到它的地方: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/models.py#L3396 您还可以调整日期格式字符串的格式和在没有先前运行时输出的内容。

2
dag.get_last_dagrun(include_externally_triggered=True) 用于外部触发 DAG。 - Hasitha

0
您可以创建自己的用户自定义宏函数,使用Airflow模型搜索元数据库。
def get_last_dag_run(dag_id):
  //TODO search DB
  return xxx

dag = DAG(
    'example',
    schedule_interval='0 1 * * *',
    user_defined_macros={
        'last_dag_run_execution_date': get_last_dag_run,
    }
)

然后在你的模板中使用 KEY。


2
这个答案似乎只回答了一半,省略了数据库搜索。 - tobi6

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接