如何获取Airflow DAG中正在运行的任务列表

4

如何获取具有dag-id的airflow dag中正在运行的任务列表?任务之间的连接无关紧要。谢谢。

比如,我从"airflow list_dags"中得到了dag-id。


你需要获取Airflow DAG中的任务列表吗?Airflow中的task_instance表存储了这些信息。根据配置项core__sqlalchemy_conn的不同,答案可能会有所不同。 - nightgaunt
您也可以使用REST API获取它。在Airflow中运行任务 - hkravitz
2个回答

4

如果你想获取DAG内部任务列表,可以使用以下代码:

from airflow import settings
from airflow.models import TaskInstance
from airflow.utils.state import State
import logging


def print_running_tasks():
    session = settings.Session()
    for task in session.query(TaskInstance) \
            .filter(TaskInstance.state == State.RUNNING) \
            .all():
        logging.info(f'task_id: {task.task_id}, dag_id: {task.dag_id}, start_date: {task.start_date}, '
                     f'hostname: {task.hostname}, unixname: {task.unixname}, job_id: {task.job_id}, pid: {task.pid}')

这段代码打印正在运行任务的信息。如果要获取所有任务,而不考虑它们的状态,请删除.filter()

0

有没有一种方法可以在DAG内执行相同的操作? - tristobal
“在DAG内”是什么意思? DAG是一个Python对象。其中之一的属性是任务。 我想像your_dag.tasks这样的东西会给你任务/操作符列表。https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/models/dag.html#DAG.tasks https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG.tasks - Murilo Cunha
我的意思是:创建一个DAG,可以获取所有任务的列表,即使用Python获取任务列表(这就是我所说的:在DAG内)。就像我下面发布的那样。 - tristobal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接