我有许多Spark数据框需要执行以下操作:
1) load a single spark dataframe
2) select rows from it
3) merge it with all of the previous spark dataframes
现在,以上每个操作都需要不同数量的分区。选择行需要很多分区,例如100个分区。合并需要很少的分区,例如10个分区。
因此,我真的希望它能像这样工作:
1) load a single spark dataframe
1.5) repartition into 100 partitions
2) select rows from it
2.5) repartition into 10 partitions
3) merge it with all of the previous spark dataframes
那么,我该如何在步骤1和2之间以及步骤2和3之间强制进行重新分区呢?
我知道当我调用 data = data.repartition(7)
时,它被惰性地评估,因此只有在实际保存数据时才进行重新分区。
所以,我一直都是这样做的:
1) load a single spark dataframe
1.5) repartition into 100 partitions
1.75) `df.count()` *just* to force materialization
2) select rows from it
2.5) repartition into 10 partitions
2.75) `df.count()` *just* to force materialization
3) merge it with all of the previous spark dataframes
有没有更好的方法来强制在这之间重新分区?是否有比在数据框上运行count()
更好的方法?
first()
这样的无用操作,当我真的不关心它从中输出什么时?我只想让它重新分区,但我并不在意它实际上输出了什么。有没有什么方法可以避免这种情况? - makansijcache
会有所帮助,因为我需要两次实现数据材料化(每次调用first()
时都会实现数据)。你认为在重新分区之间使用cache
是个好主意吗? - makansijcache()
来减少运行时间,因为它会使后续对数据的操作显著加快。cache()
本身只会将数据标记为持久性,直到执行操作为止,因此在调用first()
之前添加它。 - Shaido