如何在Spark数据框中强制重新分区?

6

我有许多Spark数据框需要执行以下操作:

1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes

现在,以上每个操作都需要不同数量的分区。选择行需要很多分区,例如100个分区。合并需要很少的分区,例如10个分区。
因此,我真的希望它能像这样工作:
1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes

那么,我该如何在步骤1和2之间以及步骤2和3之间强制进行重新分区呢?

我知道当我调用 data = data.repartition(7) 时,它被惰性地评估,因此只有在实际保存数据时才进行重新分区。

所以,我一直都是这样做的:

1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes

有没有更好的方法来强制在这之间重新分区?是否有比在数据框上运行count()更好的方法?

1个回答

11

由于Spark中的所有dataframe转换都是惰性评估的,您需要执行操作才能实际执行转换。目前没有其他方法可以强制执行转换。

您可以在文档中找到所有可用的dataframe操作(查看actions)。在您的情况下,您可以使用first()而不是使用count()来强制执行转换,这应该会更加廉价。

在2.5步骤中,您可以将repartition()替换为coalesce(),因为它将避免完全洗牌。当新的分区数小于之前的分区数时,这通常是有利的,因为它将最小化数据移动。

编辑:

回答您关于如果不使用任何操作只是执行以下三个步骤:1) repartition,2) spark dataframe 转换,3) repartition 会发生什么的问题。由于Spark在转换上执行的优化,似乎并不总是遵循这个顺序。我编写了一个小测试程序来测试它:

val df = spark.sparkContext.parallelize(Array((1.0,"a"),(2.0,"b"),(3.0,"c"),(1.0,"d"),(2.0,"e"),(3.0,"f"))).toDF("x", "y")
val df1 = df.repartition(10).filter($"x" =!= 1.0).repartition(5).filter($"y" =!= "b")
df1.explain(true)

这将返回有关数据框如何计算的信息。

== Parsed Logical Plan ==
'Filter NOT ('y = b)
+- Repartition 5, true
   +- Filter NOT (x#5 = 1.0)
      +- Repartition 10, true
         +- Project [_1#2 AS x#5, _2#3 AS y#6]
            +- LogicalRDD [_1#2, _2#3]

== Analyzed Logical Plan ==
x: double, y: string
Filter NOT (y#6 = b)
+- Repartition 5, true
   +- Filter NOT (x#5 = 1.0)
      +- Repartition 10, true
         +- Project [_1#2 AS x#5, _2#3 AS y#6]
            +- LogicalRDD [_1#2, _2#3]

== Optimized Logical Plan ==
Repartition 5, true
+- Project [_1#2 AS x#5, _2#3 AS y#6]
   +- Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
      +- LogicalRDD [_1#2, _2#3]

== Physical Plan ==
Exchange RoundRobinPartitioning(5)
+- *Project [_1#2 AS x#5, _2#3 AS y#6]
   +- *Filter ((NOT (_1#2 = 1.0) && isnotnull(_2#3)) && NOT (_2#3 = b))
      +- Scan ExistingRDD[_1#2,_2#3]

可以在这里看到,repartition(10)步骤未被包含,并且似乎在优化过程中被移除了。


1
但是有没有办法避免像 first() 这样的无用操作,当我真的不关心它从中输出什么时?我只想让它重新分区,但我并不在意它实际上输出了什么。有没有什么方法可以避免这种情况? - makansij
@Sother 已更新了答案。 - Shaido
谢谢。看起来似乎没有简单的方法来做到这一点。我想知道是否使用cache会有所帮助,因为我需要两次实现数据材料化(每次调用first()时都会实现数据)。你认为在重新分区之间使用cache是个好主意吗? - makansij
@Sother 是的,你可以通过在数据框上调用 cache() 来减少运行时间,因为它会使后续对数据的操作显著加快。cache() 本身只会将数据标记为持久性,直到执行操作为止,因此在调用 first() 之前添加它。 - Shaido
1
@Sother 看起来并不总是这样。我进行了一项小测试,并将其包含在答案中。 - Shaido
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接