只运行最新的Airflow DAG

4
假设我想运行一个相当简单的Airflow ETL DAG:它检查DB2中的最后插入时间,如果有新行,则将其加载到DB1中。
有一些可理解的要求:
  1. 它每小时调度一次,前几次运行时间超过1小时
    • 例如,第一次运行应处理一个月的数据,并持续72小时,
    • 因此第二次运行应处理过去的72小时,持续7.2小时,
    • 第三次处理7.2小时并在一小时内完成,
    • 从那时起每小时运行。
  2. 当DAG正在运行时,请不要启动下一个DAG,而是跳过它。
  3. 如果时间超过触发事件,并且DAG没有启动,请勿随后启动它。
  4. 还有其他DAG,这些DAG应独立执行。
我发现这些参数和操作有点混乱,请问它们之间的区别是什么?
  • depends_on_past:如果设置为True,则表示任务开始时间必须在先前任务的结束时间之后。默认为False。
  • catchup:如果设置为True,则表示可以运行过去未执行的任务。默认为True。
  • backfill:如果设置为True,则表示可以运行过去未执行的任务。默认为False。
  • LatestOnlyOperator:一个空操作符,只检查最新的任务是否已经完成。如果没有完成,则跳过所有任务。

我应该使用哪个,以及哪个LocalExecutor?

附:已经有一个非常相似的线程,但它并不详尽。

3个回答

5

将DAG的max_active_runs设置为1,并且catchup设置为False可以解决这个问题。


1
这个满足我的要求。DAG每分钟运行一次,我的“主”任务持续90秒,因此应该跳过每隔一次运行。 我使用了一个ShortCircuitOperator来检查当前运行是否是目前唯一的(在airflow数据库的dag_run表中查询),并使用catchup=False禁用回填。 但我无法正确地利用LatestOnlyOperator,它应该做类似的事情。

DAG文件

import os
import sys
from datetime import datetime
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator

import foo
import util

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': datetime(2018, 2, 13), # or any date in the past
    'email': ['services@mydomain.com'],
    'email_on_failure': True}

dag = DAG(
    'test90_dag',
    default_args=default_args,
    schedule_interval='* * * * *',
    catchup=False)

condition_task = ShortCircuitOperator(
    task_id='skip_check',
    python_callable=util.is_latest_active_dagrun,
    provide_context=True,
    dag=dag)

py_task = PythonOperator(
    task_id="test90_task",
    python_callable=foo.bar,
    provide_context=True,
    dag=dag)

airflow.utils.helpers.chain(condition_task, py_task)

util.py

import logging
from datetime import datetime
from airflow.hooks.postgres_hook import PostgresHook

def get_num_active_dagruns(dag_id, conn_id='airflow_db'):
    # for this you have to set this value in the airflow db
    airflow_db = PostgresHook(postgres_conn_id=conn_id)
    conn = airflow_db.get_conn()
    cursor = conn.cursor()
    sql = "select count(*) from public.dag_run where dag_id = '{dag_id}' and state in ('running', 'queued', 'up_for_retry')".format(dag_id=dag_id)
    cursor.execute(sql)
    num_active_dagruns = cursor.fetchone()[0]
    return num_active_dagruns

def is_latest_active_dagrun(**kwargs):
    num_active_dagruns = get_num_active_dagruns(dag_id=kwargs['dag'].dag_id)
    return (num_active_dagruns == 1)

foo.py

import datetime
import time

def bar(*args, **kwargs):
    t = datetime.datetime.now()
    execution_date = str(kwargs['execution_date'])
    with open("/home/airflow/test.log", "a") as myfile:
        myfile.write(execution_date + ' - ' + str(t) + '\n')
    time.sleep(90)
    with open("/home/airflow/test.log", "a") as myfile:
        myfile.write(execution_date + ' - ' + str(t) + ' +90\n')
    return 'bar: ok'

致谢:本答案基于这篇博客文章

0

DAG max_active_runs = 1 结合 catchup = False,并在开头添加一个 DUMMY 任务(类似于 START 任务),wait_for_downstream=True。 对于 LatestOnlyOperator - 如果前一个执行尚未完成,则可以避免重新运行任务。 或者将“START”任务创建为 LatestOnlyOperator,并确保所有第一处理层的任务都连接到它。但要注意 - 根据文档,“请注意,如果给定的 DAG_Run 标记为外部触发,则不会跳过下游任务。”


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接