我想要构建一个项目,在其中需要捕获所有叶子任务,并向它们添加下游依赖项,以使作业在我们的数据库中完成。在Airflow中是否有一种简单的方法来查找DAG的所有叶子节点?
我想要构建一个项目,在其中需要捕获所有叶子任务,并向它们添加下游依赖项,以使作业在我们的数据库中完成。在Airflow中是否有一种简单的方法来查找DAG的所有叶子节点?
使用BaseOperator
中的@property
upstream_task_ids
和downstream_task_ids
def get_start_tasks(dag: DAG) -> List[BaseOperator]:
# returns list of "head" / "root" tasks of DAG
return [task for task in dag.tasks if not task.upstream_task_ids]
def get_end_tasks(dag: DAG) -> List[BaseOperator]:
# returns list of "leaf" tasks of DAG
return [task for task in dag.tasks if not task.downstream_task_ids]
Python 3.6+
的Type-Annotations
(类型注解)
更新-1
现在,Airflow DAG
模型具有强大的@property
函数,例如: