Spark:如何处理性能密集型命令,如collect()、groupByKey()、reduceByKey()

3
我知道一些Spark操作,如collect()会导致性能问题。
这已经在文档中引用了。

要在驱动程序上打印所有元素,可以使用collect()方法将RDD首先带到驱动节点,从而:rdd.collect().foreach(println)。但是,这可能会导致驱动程序内存不足

因为collect()将整个RDD提取到单个计算机上;如果您只需要打印RDD的几个元素,则更安全的方法是使用take()rdd.take(100).foreach(println)
另一个相关的SE问题:当按键分组时,Spark会耗尽内存 我已经了解到groupByKey(),reduceByKey()可能会导致内存不足,如果并行度未正确设置。

我没有足够的证据来了解其他转换和操作命令,这些命令必须谨慎使用。

这三个命令是唯一需要处理的吗?我对以下命令也有疑问

  1. aggregateByKey()
  2. sortByKey()
  3. persist() / cache()

如果您提供关于强制执行命令(跨分区全局而不是单个分区或低性能命令)的信息,那将是很好的,这些命令需要更好的保护。

1个回答

6

您需要考虑三种操作类型:

  • 仅使用mapPartitions(WithIndex)实现的转换,如filtermapflatMap等。通常这是最安全的组。你可能遇到的最大问题可能是大量溢出到磁盘。
  • 需要洗牌的转换。其中包括明显的嫌疑人,如不同变体的combineByKeygroupByKeyreduceByKeyaggregateByKey)或join,以及不太明显的,如sortBydistinctrepartition。如果没有上下文(数据分布、准确的缩减函数、分区器、资源),很难确定特定的转换是否会有问题。主要有两个因素:
    • 网络流量和磁盘IO - 任何不在内存中执行的操作都将至少慢一个数量级
    • 数据分布倾斜 - 如果分布高度倾斜,则洗牌可能会失败,或者后续操作可能会受到次优资源分配的影响。
  • 需要将数据传递到驱动程序并从驱动程序传递数据的操作。通常涵盖像collecttake这样的操作,以及从本地创建分布式数据结构(parallelize)。

    此类别的其他成员包括broadcasts(包括自动广播连接)和accumulators。总成本当然取决于特定操作和数据量。

虽然其中一些操作可能很昂贵,但没有特别糟糕的(包括 被妖魔化的groupByKey)。显然,避免网络流量或额外的磁盘IO是更好的选择,但在实际应用中你无法避免它们。关于缓存,你可能会发现Spark:为什么我需要明确告诉它要缓存什么?有用。

谢谢提供信息。cache()或persist()/unpersist()对RDD有什么影响? - Ravindra babu
1
每个RDD在不再需要时都会被删除。这种机制不像Java那样只要有引用就会一直保留,因为真正的计算只会在某些函数中触发。因此,如果您想重复使用某个RDD,可以将其分配给某个变量,缓存它,然后进行更多的转换,例如调用collect。然后,您创建的RDD仍将以缓存形式保存在调用cache()之前的转换形式中。 - szefuf
zero323,另外一个问题,你能否检查一下这个问题:https://dev59.com/oZPfa4cB1Zd3GeqPF6W_ - Ravindra babu
@ravindra 已经注意到了。现有的答案看起来还不错,但我会在有空闲时间时尽量添加一些备注。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接