Spark是否知道DataFrame的分区键?

22
我想知道Spark是否知道parquet文件的分区键,并使用此信息来避免洗牌。 上下文: 运行本地SparkSession的Spark 2.0.1。我有一个csv数据集,将其保存为parquet文件在我的磁盘上:
val df0 = spark
  .read
  .format("csv")
  .option("header", true)
  .option("delimiter", ";")
  .option("inferSchema", false)
  .load("SomeFile.csv"))


val df = df0.repartition(partitionExprs = col("numerocarte"), numPartitions = 42)

df.write
  .mode(SaveMode.Overwrite)
  .format("parquet")
  .option("inferSchema", false)
  .save("SomeFile.parquet")

我正在使用列 numerocarte 创建42个分区。这将把多个numerocarte分组到同一个分区中。但我不想在write时使用 partitionBy("numerocarte"),因为我不希望每张卡片都有一个分区,这将产生数百万个分区。

此后,在另一个脚本中,我会读取名为SomeFile.parquet的parquet文件,并对其进行一些操作。特别是在其中运行窗口函数,其中分区是由与重新分区的parquet文件相同的列完成的。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val df2 = spark.read
  .format("parquet")
  .option("header", true)
  .option("inferSchema", false)
  .load("SomeFile.parquet")

val w = Window.partitionBy(col("numerocarte"))
.orderBy(col("SomeColumn"))

df2.withColumn("NewColumnName",
      sum(col("dollars").over(w))

read之后,我可以看到repartition按预期工作,并且DataFrame df2有42个分区,在每个分区中都有不同的卡。

问题:

  1. Spark知道数据框df2是由numerocarte列分区的吗?
  2. 如果知道,则窗口函数中不会进行洗牌。是真的吗?
  3. 如果不知道,则窗口函数中将执行洗牌。是真的吗?
  4. 如果它不知道,我如何告诉Spark数据已经按正确的列进行了分区?
  5. 如何检查DataFrame的分区键?是否有此命令?我知道如何检查分区数,但如何查看分区键?
  6. 当我在每个步骤之后打印文件的分区数时,我在read之后有42个分区,在withColumn之后有200个分区,这表明Spark重新分区了我的DataFrame
  7. 如果我有两个使用相同列进行重新分区的不同表,那么连接会使用该信息吗?
回答:

read之后,使用repartition方法对DataFrame df2进行分区,其中每个分区包含不同的卡,是按预期工作的。

  1. Spark知道数据框df2是由numerocarte列分区的吗?答:如果使用repartition("numerocarte")方法在读取数据后重新分区,则Spark会知道DataFrame df2是由numerocarte列分区的。
  2. 如果它知道,则窗口函数中不会进行洗牌。是真的吗?答:是的,如果Spark已经知道DataFrame是按照正确的列进行分区的,则窗口函数将不需要进行洗牌。
  3. 如果它不知道,则窗口函数中将执行洗牌。是真的吗?答:是的,如果Spark没有正确识别DataFrame的分区键,则窗口函数将需要执行洗牌操作。
  4. 如果它不知道,我如何告诉Spark数据已经按正确的列进行了分区?答:可以使用df2.repartition("numerocarte")方法来明确指定DataFrame已经按照正确的列进行了分区。
  5. 如何检查DataFrame的分区键?是否有此命令?答:可以通过DataFrame.rdd.partitioner属性来检查DataFrame的分区键。如果DataFrame已经分区,则该属性将返回一个分区器对象,否则将返回None。
  6. 当我在每个步骤之后打印文件的分区数时,我在read之后有42个分区,在withColumn之后有200个分区,这表明Spark重新分区了我的DataFrame。答:是的,由于使用了withColumn方法,Spark会重新计算数据的分区,因此分区数增加到了200个。
  7. 如果我有两个使用相同列进行重新分区的不同表,那么连接会使用该信息吗?答:是的,如果两个表都已经按照正确的列进行了重新分区,则连接操作将会利用这一信息来避免洗牌操作。

1
要检查dataframe使用的分区器,您应该查看底层RDD。df.rdd.partitioner。如果两个dfs具有相同的分区器,则可能不会进行洗牌。您可以通过调用df.explain来检查是否会进行洗牌。要检查分区数,请调用df.rdd.partitions.length。有关分区的更完整解释,请参见https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html - addmeaning
2个回答

18

为了方便以后参考,我自己回答一下我的问题并分享解决办法。

根据@user8371915的建议,使用bucketBy函数解决了问题。

我正在保存我的DataFrame df

df.write
  .bucketBy(250, "userid")
  .saveAsTable("myNewTable")

然后当我需要加载这个表格时:

val df2 = spark.sql("SELECT * FROM myNewTable")

val w = Window.partitionBy("userid")

val df3 = df2.withColumn("newColumnName", sum(col("someColumn")).over(w)
df3.explain

我确认当我对以userid为分区的df2执行窗口函数时,不会出现洗牌!感谢@user8371915!

在调查过程中我学到的一些东西

  • myNewTable看起来像普通的parquet文件,但实际上并不是。您可以使用spark.read.format("parquet").load("path/to/myNewTable")正常读取它,但这种方式创建的DataFrame将无法保留原始分区!您必须使用spark.sql select获取正确分区的DataFrame
  • 您可以使用spark.sql("describe formatted myNewTable").collect.foreach(println)查看表格内部。这将告诉您用于分桶的列以及有多少个桶。
  • 利用分区的窗口函数和连接通常还需要排序。您可以使用.sortBy()在写入时对存储桶中的数据进行排序,并且排序也将保留在Hive表中。df.write.bucketBy(250, "userid").sortBy("somColumnName").saveAsTable("myNewTable")
  • 在本地模式下工作时,表格myNewTable保存在我的本地Scala SBT项目的spark-warehouse文件夹中。通过 spark-submit 使用mesos在集群模式下保存时,它将保存到hive warehouse中。对于我来说,它位于/user/hive/warehouse
  • 当执行spark-submit时,您需要向您的SparkSession添加两个选项:.config("hive.metastore.uris", "thrift://addres-to-your-master:9083").enableHiveSupport()。否则,您创建的Hive表将不可见。
  • 如果要将表格保存到特定数据库,请在分桶之前执行spark.sql("USE your database")

更新 05-02-2018

我遇到了一些关于Spark分桶和创建Hive表格的问题。请参考问题、回复和评论:为什么Spark saveAsTable with bucketBy会创建数千个文件?


优秀的文章。我理解了,但是如果使用嵌套相关子查询,我想知道会进行哪些优化。 - thebluephantom

17

Spark是否知道数据框df2是按列numerocarte进行分区的?

它不知道。

如果它不知道,我如何告诉Spark数据已经按正确的列进行了分区?

你不能。仅仅因为你保存了被混洗过的数据,并不意味着它将以相同的切片加载。

如何检查DataFrame的分区键?

一旦加载了数据,就没有分区键,但可以检查queryExecution中的Partitioner


实际上:

  • 如果要支持对键进行高效的下推操作,请使用DataFrameWriterpartitionBy方法。
  • 如果要有限支持联接优化,请在元数据存储和持久表中使用bucketBy

详细示例请参见如何定义DataFrame的分区?


@T.Gawęda 但是之前的操作中没有元数据,是吗?请注意 OP 使用了 repartition - Alper t. Turker
只是为了确保我正确理解用户8371915的答案,根据您的回答和引用的文章,如果我无法在写入时使用partitionBy,因为它会创建数百万个分区,那么就无能为力了。数据将以未指定的分区方式加载,并且窗口函数将在内部处理重新分区。对吗? - astro_asz
好的,谢谢。只有一个相关的问题:如果我有两个窗口函数链接在一起,像这样 df.withColumn("x", sum(col("y")).over(w1)).withColumn("z", sum(col("t")).over(w2)),其中 w1 和 w2 都是相同 partitionBy 列但可能不同 rangeBetween 的,那么 w2 会知道数据已经在 w1 处分区吗?还是它会每次为 w1 和 w2 重新分区? - astro_asz
1
从执行计划来看,在分区列相同且排序列相同时,具有不同rangeBetween/rowsBetween的两个窗口函数仅进行一次重新分区。 - astro_asz
1
这篇文章和回答是我在Spark论坛上读过的最好的帖子。 - thebluephantom
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接