按列动态设置Spark重分区数量

21
如何根据列中项目数量的计数对DataFrame进行分区。 假设我们有一个包含100个人的DataFrame(列是first_namecountry),我们想为每个国家的10个人创建一个分区。
如果我们的数据集包含80个来自中国的人,15个来自法国,5个来自古巴,那么我们将希望为中国创建8个分区,为法国创建2个分区,为古巴创建1个分区。
以下代码将无法正常工作:
  • df.repartition($"country"):这将为中国创建1个分区,为法国创建1个分区,为古巴创建1个分区
  • df.repartition(8,$ "country",rand):这将为每个国家创建多达8个分区,因此它应该为中国创建8个分区,但France&Cuba分区未知。 France可能在8个分区中,而Cuba可能在多达5个分区中。 请参见此答案以了解更多详细信息。
这里是repartition()文档:

repartition documentation

当我查看repartition()方法时,我甚至没有看到一个带有三个参数的方法,因此看起来其中一些行为没有记录。
是否有任何方法可以动态设置每列的分区数?这将使创建分区数据集变得更加容易。

2
关于3个参数,$"country", rand 在第二次调用中一起作为 partitionExprs - Kombajn zbożowy
2个回答

16
由于Spark对数据进行分区的方式,您无法完全实现这一点。Spark会将您在repartition中指定的列的值哈希为一个64位长整数,然后将该值对分区数取模。这样可以确定分区的数量。之所以采用这种方式,是因为连接操作需要在连接的左右两侧具有相同数量的分区,并确保哈希值在两侧相同。
"我们希望为每个国家的10个人创建一个分区。"
您在这里到底想要实现什么?每个分区只有10行可能会严重影响性能。您是想创建一个分区表,其中每个分区中的文件都保证只有x行吗?
"df.repartition($"country"):这将为中国创建一个分区,为法国创建一个分区,为古巴创建一个分区"
实际上,这将创建一个具有默认数量的按国家哈希的洗牌分区的数据框。
  def repartition(partitionExprs: Column*): Dataset[T] = {
    repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*)
  }

df.repartition(8, $"country", rand):这将为每个国家创建最多8个分区,因此应该为中国创建8个分区,但法国和古巴的分区是未知的。法国可能在8个分区中,古巴可能在最多5个分区中。有关更多详细信息,请参阅此答案。
同样,这个方法有微妙的错误。这里只有8个分区,而国家基本上是在这8个分区中随机洗牌。
编辑:最后一个澄清的要点。数据框重新分区的工作方式与写入时使用partitionBy(...)方法不同。partitionBy操作首先获取所有的Spark分区,然后对于每个Spark分区,将其切分为一个按partitionBy分区的表,然后将每个分区写入与partitionBy列对应的文件夹中。

感谢您指出我的细微错误。对于10行,这段代码是不需要的,但在创建针对倾斜大型数据集的分区数据湖时非常重要。 - Powers
@Andrew Long,sparkSession 中没有“sessionState”,我们在哪里可以找到“sessionState”? - Shasu
@Shasu,您是否在使用旧版本的Spark 1.x? - Andrew Long
@AndrewLong 不是,我正在使用 Spark 2.4.5 和现在的 3.3.1。 - Shasu
Spark在内部隐藏了与会话相关的状态,这些状态对于Spark来说是私有的。您可以在以下链接中找到更多关于此主题的信息:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala - Andrew Long

4
以下是创建每个数据文件中十行数据的代码(示例数据集在此):
val outputPath = new java.io.File("./tmp/partitioned_lake5/").getCanonicalPath
df
  .repartition(col("person_country"))
  .write
  .option("maxRecordsPerFile", 10)
  .partitionBy("person_country")
  .csv(outputPath)

以下是创建每个数据文件大约十行的 Spark 2.2 之前的代码:
val desiredRowsPerPartition = 10

val joinedDF = df
  .join(countDF, Seq("person_country"))
  .withColumn(
    "my_secret_partition_key",
    (rand(10) * col("count") / desiredRowsPerPartition).cast(IntegerType)
  )

val outputPath = new java.io.File("./tmp/partitioned_lake6/").getCanonicalPath
joinedDF
  .repartition(col("person_country"), col("my_secret_partition_key"))
  .drop("count", "my_secret_partition_key")
  .write
  .partitionBy("person_country")
  .csv(outputPath)

你从哪里获取这个名为 col("count") 的列? - Shasu
有没有办法使用哈希函数处理偏斜数据? - Shasu
1
是的,就像@Shasu所说的那样,数据倾斜可能会导致执行器方面的OOM问题。 - linehrr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接