Spark为什么在执行一个操作时会创建多个作业?

9

我注意到当只有一个操作时,这一堆代码会启动三个任务。

from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import avg

data: List = [("Diamant_1A", "TopDiamant", "300", "rouge"),
    ("Diamant_2B", "Diamants pour toujours", "45", "jaune"),
    ("Diamant_3C", "Mes diamants préférés", "78", "rouge"),
    ("Diamant_4D", "Diamants que j'aime", "90", "jaune"),
    ("Diamant_5E", "TopDiamant", "89", "bleu")
  ]

schema: StructType = StructType([ \
    StructField("reference", StringType(), True), \
    StructField("marque", StringType(), True), \
    StructField("prix", StringType(), True), \
    StructField("couleur", StringType(), True)
  ])

dataframe: DataFrame = spark.createDataFrame(data=data,schema=schema)

dataframe_filtree:DataFrame = dataframe.filter("prix > 50")

dataframe_filtree.show()

根据我的理解,我应该只得到一个。一个操作对应一个作业。 我正在使用Databricks。这可能是问题所在。我有两个问题:
  • 为什么我有3个作业而不是1个?
  • 我能改变这种行为吗?
这是第一个作业: 第一个dag 这是第二个作业: 第二个作业的Dag 最后一个作业: 第三个作业的dag

你能分享UI中三个工作的信息或狗吗? - koiralo
@koiralo,感谢您的评论。我已经添加了DAG。 - Nastasia
你对结果进行了分区,然后调用了 show(),这可能会导致洗牌。如果删除分区会发生什么?分区的目的是什么? - ekrich
@ekrich 谢谢您注意到了一个错误。我已经修复了它。我已经更新了描述。我仍然有3个工作,不明白为什么。 - Nastasia
https://stackoverflow.com/a/70131751/3741571 - chenzhongpu
2个回答

10

1个操作对应1个作业是正确的。被忽略的细节是,这在RDD API中是成立的

Dataframe和Dataset API是在RDD API之上抽象出来的一层,以使您的生活更轻松。有时,在调用一个操作时,它会在内部触发多个操作,您会看到多个作业

例如,使用header=True读取csv时。当您在下游调用一个操作时,它会触发另一个内部操作,该操作读取csv的第一行以推断标题,并且您将看到其显示为作业

另一个原因是自适应查询执行。将spark.sql.adaptive.enabled设置为True会导致Spark使用阶段统计信息,并根据这些统计信息决定后续的物理计划。这在如Spark连接中无需担心倾斜的最终用户等情况下很有用。但是,这会导致Spark将作业分解成许多作业。您会在作业DAG中看到以前作业的跳过阶段。如果将spark.sql.adaptive.enabled设置为False,则会看到所有这些作业都消失。但是,您几乎总是想使用自适应查询执行


1

@Nastasia 虽然我找不到上述问题的答案,但我想分享一下在我的8核CPU系统中发现的一些东西:

  1. Above program without any change :

    No. of Jobs - 3  , 
    No. Tasks  in each Job - 3 , 4 , 1 (because default no. of partitions is 8 )
    
  2. In your example add a few lines as shown below which gives some understanding

     print(dataframe.rdd.getNumPartitions())
     dataframe2 = dataframe.coalesce(1)
     print(dataframe2.rdd.getNumPartitions())
     dataframe_filtree:DataFrame = dataframe2.filter("prix > 50")
     dataframe_filtree.show()
    
    No. of Jobs - 1 ,   
    No. Tasks  in each Job - 1 (because  no. of partitions is 1 )
    
  3. Changing the parameter in coalesce has given the following results:

    coalesce(2): 
    No. of Jobs - 2 (why not 1 ?) , 
    No. Tasks  in each Job - 1,1 
    
    coalesce(6):
    No. of Jobs - 3 (why not 1 or 2) , 
    No. Tasks  in each Job - 1,4,1
    
显然,“分区数”在这里是一个因素。但还有其他决定作业数量的因素。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接