Spark的zipWithIndex方法在并行实现时是否安全?

9
如果我有一个文件,并且对于每一行,我执行了一个RDD zipWithIndex操作,
([row1, id1001, name, address], 0)
([row2, id1001, name, address], 1)
...
([row100000, id1001, name, address], 100000)

如果重新加载文件,我是否能获得相同的索引顺序?由于它是并行运行的,其他行可能会被分区不同?

1个回答

8

RDD可以被排序,也有顺序。这个顺序用于使用.zipWithIndex()创建索引。

要获得相同的顺序,取决于程序中先前调用的内容。文档提到.groupBy()可能会破坏顺序或生成不同的排序。还可能有其他调用也会这样做。

如果需要保证特定的排序,您可以在调用.zipWithIndex()之前始终调用.sortBy()

这在.zipWithIndex() scala API docs中有解释。

public RDD<scala.Tuple2<T,Object>> zipWithIndex()将该RDD与其元素索引一起压缩。排序首先基于分区索引,然后是每个分区内项目的排序。因此,第一个分区中的第一个项目获得索引0,最后一个分区中的最后一个项目获得最大的索引。这类似于Scala的zipWithIndex,但它使用Long而不是Int作为索引类型。当此RDD包含多个分区时,此方法需要触发一个Spark作业。

请注意,某些RDD(例如由groupBy()返回的RDD)不保证分区中元素的顺序。因此,分配给每个元素的索引不被保证,并且如果重新评估RDD,则甚至可能会更改。如果需要固定的排序以确保相同的索引分配,则应使用sortByKey()对RDD进行排序或将其保存到文件中。


使用 RDD 上的 sortBy 操作会将其收集到驱动程序,是吗?我担心这可能会导致 OOME(Out of Memory Error)。我想要的排序顺序只是文件中行的默认顺序。 - sophie
1
@sophie 排序是在工作节点中完成的,而不是驱动程序中。如果阅读了API文档后仍不确定会发生什么,那么您应该通过运行它几次并在特定索引号处检查元素来进行测试。您可以使用带有匿名函数的.filter(),当行号与某个特定行(如第43行)匹配时,该函数返回true,然后使用.take(1)将该数据片段带到驱动程序中,而无需将所有数据加载到驱动程序中。 - Paul

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接