在airflow DAG中出现错误:不支持操作数'list'和'list'的>>。任务的顺序执行和并行执行。

7

我对Apache Airflow和DAG还不熟悉。DAG中一共有6个任务(task1, task2, task3, task4, task5, task6)。但在运行DAG时,我们遇到了以下错误:

DAG unsupported operand type(s) for >>: 'list' and 'list'

下面是我用于DAG的代码,请帮忙看看。我是Airflow的新手。

from airflow import DAG
from datetime import datetime
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False
}

dag = DAG('DAG_FOR_TEST',default_args=default_args,schedule_interval=None,max_active_runs=3, start_date=datetime(2020, 7, 14)) 


#################### CREATE TASK #####################################   

task_1 = DatabricksSubmitRunOperator(
    task_id='task_1',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_1/task_1.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_1.driver.TestClass1',
        'parameters' : [
            '{{ dag_run.conf.json }}'       
        ]
    }
)



    
task_2 = DatabricksSubmitRunOperator(
    task_id='task_2',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_2/task_2.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_2.driver.TestClass2',
        'parameters' : [
            '{{ dag_run.conf.json }}'                               
        ]
    }
)
    
task_3 = DatabricksSubmitRunOperator(
    task_id='task_3',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',   
    libraries= [
        {
        'jar': 'dbfs:/task_3/task_3.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_3.driver.TestClass3',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_4 = DatabricksSubmitRunOperator(
    task_id='task_4',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_4/task_4.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_4.driver.TestClass4',
        'parameters' : [
            '{{ dag_run.conf.json }}'   
        ]
    }
) 

task_5 = DatabricksSubmitRunOperator(
    task_id='task_5',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_5/task_5.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_5.driver.TestClass5',
        'parameters' : [
            'json ={{ dag_run.conf.json }}' 
        ]
    }
) 

task_6 = DatabricksSubmitRunOperator(
    task_id='task_6',
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_6/task_6.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_6.driver.TestClass6',
        'parameters' : ['{{ dag_run.conf.json }}'   
        ]
    }
) 

#################### ORDER OF OPERATORS ###########################  
 
    task_1.dag = dag
    task_2.dag = dag
    task_3.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag

task_1 >> [task_2 , task_3] >> [ task_4 , task_5 ] >> task_6 
5个回答

11

你想要的任务依赖是什么?你希望只在task_2之后运行task_4,还是在task_2task_3之后运行task_4

根据你的答案,使用以下其中之一:

(如果task_4应该在完成task_2task_3之后运行,请使用此选项)

task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5] >> task_6
task_3 >> [task_4, task_5]

(如果 task_4 应在完成 task_2 后运行,而 task_5 应在完成 task_3 后运行,请使用此选项)

task_1 >> [task_2 , task_3]
task_2 >> task_4
task_3 >> task_5
[task_4, task_5] >> task_6

一个提示,你不需要做以下事情:

    task_1.dag = dag
    task_2.dag = dag
    task_3.dag = dag
    task_4.dag = dag
    task_5.dag = dag
    task_6.dag = dag
您可以将dag参数传递给您的任务本身,例如:
task_6 = DatabricksSubmitRunOperator(
    task_id='task_6',
    dag=dag,
    databricks_conn_id='connection_id_details',
    existing_cluster_id='{{ dag_run.conf.clusterId }}',
    libraries= [
        {
        'jar': 'dbfs:/task_6/task_6.jar'
        }        
        ],
    spark_jar_task={
        'main_class_name': 'com.task_6.driver.TestClass6',
        'parameters' : ['{{ dag_run.conf.json }}'   
        ]
    }
) 

或者按照https://airflow.apache.org/docs/stable/concepts.html#context-manager中记录的方式,使用DAG作为您的上下文管理器;也可以参考https://medium.com/datareply/airflow-lesser-known-tips-tricks-and-best-practises-cf4d4a90f8f中的第一点。


8

Airflow任务的依赖关系无法处理 [list]>>[list]。最简单的解决方法是将您的依赖关系分散在多行:

task_1 >> [task_2 , task_3]
task_2 >> [task_4, task_5]
task_3 >> [task_4, task_5]
[task_4 , task_5 ] >> task_6

4
我的解决方案是首先将所有前导任务都封装到一个 DummyOperator 任务中,然后使用同一个 DummyOperator 任务来启动下一组任务。 enter image description here
with dag:

    task_1 = DummyOperator(
        task_id = 'real_operation_1'
    )

    task_2 = DummyOperator(
        task_id = 'real_operation_2'
    )

    task_3 = DummyOperator(
        task_id = 'dummy_operation'
    )

    task_4 = DummyOperator(
        task_id = 'real_operation_3'
    )

    task_5 = DummyOperator(
        task_id = 'real_operation_4'
    )

    [task_1, task_2] >> task_3 >> [task_4, task_5]

3
Same problem here, running Version: 1.10.14+composer GCP:
Code:
i = 0   # iterator
while i < len(LIST_OF_OPS):   # list of operators which I have aggregated beforehand
    LIST_OF_OPS[i] >> LIST_OF_OPS[i+1:i+8] # one task queues up seven more
    i += 7 # in order not to start all of them in parallel, I increase the iterator to one less than the number of tasks started just now; a chain is formed.

这个解决方案允许我动态生成任务,并对它们进行迭代。目前在一个包含280个任务的工作流中使用。

enter image description here


0

taskgroup 可以用于解决某些任务的分组,并在任务组之间添加动态依赖关系


TaskGroups仅用于在UI中可视化分组,它无法帮助修复任何实现错误。 - gbeaven

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接