我有一个场景,在其中特定的dag完成后需要触发多个dag。我已经使用TriggerDagRunOperator来触发单个dag,是否可以将多个dag传递给TriggerDagRunOperator以触发多个dag?
并且是否可以仅在当前dag成功完成后才触发?
我有一个场景,在其中特定的dag完成后需要触发多个dag。我已经使用TriggerDagRunOperator来触发单个dag,是否可以将多个dag传递给TriggerDagRunOperator以触发多个dag?
并且是否可以仅在当前dag成功完成后才触发?
我也遇到了同样的问题。并没有现成的解决方案,但是我们可以编写一个自定义运算符来解决它。
这里是一个自定义运算符的代码,它接受python_callable
和trigger_dag_id
作为参数:
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break
if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)
if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
trigger_dag_id
是我们想要多次运行的DAG ID。
python_callable
是一个函数,它应该返回一个DagRunOrder
对象列表,其中每个对象对应于使用DAG ID trigger_dag_id
调度一个DAG实例。
Github上的代码和示例: https://github.com/mastak/airflow_multi_dagrun 有关此代码的更多说明: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
import uuid
import random
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
dag_args = {
"start_date": datetime(2022, 9, 9),
"schedule_interval": None,
"catchup": False,
}
@task
def define_runs():
num_runs = random.randint(3, 5)
runs = [str(uuid.uuid4()) for _ in range(num_runs)]
return runs
@dag(**dag_args)
def dynamic_tasks():
runs = define_runs()
run_dags = TriggerDagRunOperator.partial(
task_id="run_dags",
trigger_dag_id="hello_world",
conf=None,
).expand(
trigger_run_id=runs,
)
run_dags
dag = dynamic_tasks()
Docs here.
for i in list:
trigger_dag =TriggerDagRunOperator(task_id='trigger_'+ i,
trigger_dag_id=i,
python_callable=conditionally_trigger_non_indr,
dag=dag)
根据所需任务进行设置。我已经为PythonOperator自动化了类似的操作。你可以尝试看看是否适用于你!
正如API文档所述,该方法接受一个dag_id。然而,如果您想在完成后无条件启动下游DAG,为什么不将这些任务放在一个单独的DAG中,并在那里设置依赖项/工作流程?然后,您将能够在适当的位置设置depends_on_past=True
。
编辑:如果您绝对需要将它们分开成不同的DAG,则可以轻松解决此问题,只需创建多个TriggerDagRunOperators并将它们的依赖关系设置为相同的任务即可。
run_dags = TriggerDagRunOperator.partial(
task_id='test_07_few_opt_ins_triggered_dag',
trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)
expand
以填充配置中相同数量的"runs"。@task
def start(dag_run=None):
print(f"consuming line {dag_run.conf.get('line')}")
start()