val df0 = spark
.read
.format("csv")
.option("header", true)
.option("delimiter", ";")
.option("inferSchema", false)
.load("SomeFile.csv"))
val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.option("inferSchema", false)
.save("SomeFile.parquet")
我正在使用列 numerocarte
创建42个分区。这将把多个numerocarte
分组到同一个分区中。但我不想在write
时使用 partitionBy("numerocarte"),因为我不希望每张卡片都有一个分区,这将产生数百万个分区。
此后,在另一个脚本中,我会读取名为SomeFile.parquet
的parquet文件,并对其进行一些操作。特别是在其中运行窗口函数,其中分区是由与重新分区的parquet文件相同的列完成的。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val df2 = spark.read
.format("parquet")
.option("header", true)
.option("inferSchema", false)
.load("SomeFile.parquet")
val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))
df2.withColumn("NewColumnName",
sum(col("dollars").over(w))
在read
之后,我可以看到repartition
按预期工作,并且DataFrame df2
有42个分区,在每个分区中都有不同的卡。
问题:
- Spark知道数据框
df2
是由numerocarte
列分区的吗? - 如果知道,则窗口函数中不会进行洗牌。是真的吗?
- 如果不知道,则窗口函数中将执行洗牌。是真的吗?
- 如果它不知道,我如何告诉Spark数据已经按正确的列进行了分区?
- 如何检查
DataFrame
的分区键?是否有此命令?我知道如何检查分区数,但如何查看分区键? - 当我在每个步骤之后打印文件的分区数时,我在
read
之后有42个分区,在withColumn
之后有200个分区,这表明Spark重新分区了我的DataFrame
。 - 如果我有两个使用相同列进行重新分区的不同表,那么连接会使用该信息吗?
在read
之后,使用repartition
方法对DataFrame df2
进行分区,其中每个分区包含不同的卡,是按预期工作的。
- Spark知道数据框
df2
是由numerocarte
列分区的吗?答:如果使用repartition("numerocarte")
方法在读取数据后重新分区,则Spark会知道DataFramedf2
是由numerocarte
列分区的。 - 如果它知道,则窗口函数中不会进行洗牌。是真的吗?答:是的,如果Spark已经知道DataFrame是按照正确的列进行分区的,则窗口函数将不需要进行洗牌。
- 如果它不知道,则窗口函数中将执行洗牌。是真的吗?答:是的,如果Spark没有正确识别DataFrame的分区键,则窗口函数将需要执行洗牌操作。
- 如果它不知道,我如何告诉Spark数据已经按正确的列进行了分区?答:可以使用
df2.repartition("numerocarte")
方法来明确指定DataFrame已经按照正确的列进行了分区。 - 如何检查
DataFrame
的分区键?是否有此命令?答:可以通过DataFrame.rdd.partitioner
属性来检查DataFrame的分区键。如果DataFrame已经分区,则该属性将返回一个分区器对象,否则将返回None。 - 当我在每个步骤之后打印文件的分区数时,我在
read
之后有42个分区,在withColumn
之后有200个分区,这表明Spark重新分区了我的DataFrame
。答:是的,由于使用了withColumn
方法,Spark会重新计算数据的分区,因此分区数增加到了200个。 - 如果我有两个使用相同列进行重新分区的不同表,那么连接会使用该信息吗?答:是的,如果两个表都已经按照正确的列进行了重新分区,则连接操作将会利用这一信息来避免洗牌操作。
df.rdd.partitioner
。如果两个dfs具有相同的分区器,则可能不会进行洗牌。您可以通过调用df.explain
来检查是否会进行洗牌。要检查分区数,请调用df.rdd.partitions.length
。有关分区的更完整解释,请参见https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html - addmeaning