如何使用TriggerDagRunOperator来触发多个Airflow DAG?

9

我有一个场景,在其中特定的dag完成后需要触发多个dag。我已经使用TriggerDagRunOperator来触发单个dag,是否可以将多个dag传递给TriggerDagRunOperator以触发多个dag?

并且是否可以仅在当前dag成功完成后才触发?

5个回答

15

我也遇到了同样的问题。并没有现成的解决方案,但是我们可以编写一个自定义运算符来解决它。

这里是一个自定义运算符的代码,它接受python_callabletrigger_dag_id作为参数:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):
        session = settings.Session()
        created = False
        for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
            if not dro or not isinstance(dro, DagRunOrder):
                break

            if dro.run_id is None:
                dro.run_id = 'trig__' + datetime.utcnow().isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True
            )
            created = True
            self.log.info("Creating DagRun %s", dr)

        if created is True:
            session.commit()
        else:
            self.log.info("No DagRun created")
        session.close()

trigger_dag_id是我们想要多次运行的DAG ID。

python_callable是一个函数,它应该返回一个DagRunOrder对象列表,其中每个对象对应于使用DAG ID trigger_dag_id调度一个DAG实例。

Github上的代码和示例: https://github.com/mastak/airflow_multi_dagrun 有关此代码的更多说明: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13


1
即使我自己也没有找到一个现成的解决方案,因此你的答案似乎是这个用例的解决方案。 - Ashwin K
使用此操作符能否按顺序触发DAG? - Dilantha

2
在Airflow 2中,您可以进行动态任务映射。例如:
import uuid
import random
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


dag_args = {
    "start_date": datetime(2022, 9, 9),
    "schedule_interval": None,
    "catchup": False,
}

@task
def define_runs():
    num_runs = random.randint(3, 5)
    runs = [str(uuid.uuid4()) for _ in range(num_runs)]
    return runs


@dag(**dag_args)
def dynamic_tasks():

    runs = define_runs()
    run_dags = TriggerDagRunOperator.partial(
        task_id="run_dags",
        trigger_dag_id="hello_world",
        conf=None,
    ).expand(
        trigger_run_id=runs,
    )

    run_dags

dag = dynamic_tasks()

Docs here.


你运行了这段代码吗?我在我的本地机器上尝试使用Airflow 2.3.0,但是它失败了。 - shubh gupta
我已经更新了答案,请现在尝试。 - Matias Lopez

1
你可以尝试循环它!例如:

for i in list:

trigger_dag =TriggerDagRunOperator(task_id='trigger_'+ i, 
                                trigger_dag_id=i,
                                python_callable=conditionally_trigger_non_indr,
                                dag=dag)

根据所需任务进行设置。我已经为PythonOperator自动化了类似的操作。你可以尝试看看是否适用于你!


好的,我会尝试一下。我们能否修改python_callable - “conditionally_trigger”函数,在DAG中所有任务成功完成后只返回dagrun对象? - Ashwin K
你为什么需要那个?你可以让trigger_dag任务等待所有任务完成,然后在所有任务完成后运行。也就是说,你应该设置依赖关系,使其在所有任务之后运行。 - Chetan J

0

正如API文档所述,该方法接受一个dag_id。然而,如果您想在完成后无条件启动下游DAG,为什么不将这些任务放在一个单独的DAG中,并在那里设置依赖项/工作流程?然后,您将能够在适当的位置设置depends_on_past=True

编辑:如果您绝对需要将它们分开成不同的DAG,则可以轻松解决此问题,只需创建多个TriggerDagRunOperators并将它们的依赖关系设置为相同的任务即可。


只是好奇,我们不能使用BashOperator来执行命令“airflow trigger_dag <<dag_id>>”而不是使用TriggerDagRunOperator吗?这两种方法有什么区别? - Ashwin K
有各种变通方法,但我认为您需要正确评估业务逻辑。如果您的唯一依赖是简单完成,则将这些过程结构化为DAG中的任务而不是DAG可能更合适。您想将它们拆分成DAG的原因是什么? - Anthony Lee
我们需要将其放在一个单独的dag中,因为它需要具有不同的start_date(确切的时间)。 - Ashwin K

0
扩展https://stackoverflow.com/users/14647868/matias-lopez的回复。如果你需要动态负载:
例如:
run_dags = TriggerDagRunOperator.partial(
    task_id='test_07_few_opt_ins_triggered_dag',
    trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
    conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)

以上我们有3个运行,我们需要设置expand以填充配置中相同数量的"runs"。
然后,在触发的DAG中:
@task
def start(dag_run=None):
    print(f"consuming line {dag_run.conf.get('line')}")

start()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接