如何在Spark中将单个RDD分区为多个RDD

3

我有一个RDD,其中每个条目都属于一个类。我想将单个RDD分成几个RDD,以便同一类别的所有条目都进入一个RDD。假设我在输入RDD中有100个这样的类别,我希望每个类别都有自己的RDD。我可以为每个类别使用过滤器(如下所示)来完成此操作,但这将启动多个作业。有没有更好的方法在单个作业中完成它?

def method(val input:RDD[LabeledPoint], val classes:List[Double]):List[RDD] = 
      classes.map{lbl=>input.filter(_.label==lbl)}

这类似于另一个问题,但我有超过2个类(大约10个)


"我希望每个类都有自己的RDD。"为什么?接下来你会怎么做? - The Archetypal Paul
嗯... Spark RDD 模型并没有考虑到这样的操作。但是如果你想要类似的东西... 你总是可以使用最明显的方法(就像你所做的那样)。现在... 关于能够在“单个作业”中执行此操作的重点(大多数单个RDD上的操作实际上涉及多个作业,因此我不确定您所说的“单个作业”是什么意思,但是让我们假设您的意思是O(n)操作而不依赖于类的数量)...根据当前RDD的哲学,我认为不应该可能。 - sarveshseri
@Paul 另一种方法(StatisticsSummary)需要RDD作为输入。我想获取每个类别的摘要统计信息。 - Arun
我不认为这是一个重复的问题,因为您想要将其分成多个RDD。我点赞了,因为我真的很喜欢您的解决方案! - Kevin Pauli
2个回答

2
我也遇到了同样的问题,根据我找到的不同资源,很遗憾没有其他办法。
关键是你需要从RDD转换为实际结果中的列表,如果你看这里,答案也表示这是不可能的。
你所做的应该没问题,如果想要优化,可以尝试缓存数据。

有没有办法修改Spark代码来支持这一点。RDD是一组分区。一个分区可以被拆分成List[List[]]。如何划分分区以创建List[RDD]。 - Arun
在RRDs上进行的操作会返回其他RDD。这就是API的定义方式。我不会反对这个。你可能能够更改某些内容,但我认为它会破坏其他所有内容,而且即使它起作用,我也不确定它是否会被接受为pull请求。 缓存数据集是您能做的最好的事情,我会说这是您应该做的。您有避免这样做的原因吗? - Ivan Nikolov
接受缓存似乎有助于降低任何运行时成本。谢谢。 - Arun

0

据我所知,这是不可能的,但你可能有一个概念上的问题。

根据您的评论,您可能想使用 aggregateByKey()。无需创建一百个RDD,只需创建一个按类分组的RDD,并构建自定义聚合方法来聚合您的统计数据。Spark会根据类将您的实例分布到各个节点上,以便您可以独立地操作它们。如果逻辑因类而异,您始终可以使用if/else、switch、多态等等...


我想为每个类创建一个RDD,而不是使用aggregateByKey,因为这会将一个类的值聚合到单个分区中。假设我只有5个类,那么会有很多数据移动。我还需要这个作为另一种方法(Statistics.colStats)需要一个RDD。因此,再次聚合和创建RDD将会很昂贵。 - Arun
1
那我猜你就很麻烦了,你必须通过过滤旧的RDD来创建每个新的RDD :-S 但请注意,agregateByKey首先在多个分区上分别聚合,然后再聚合中间结果(类似于hadoop combiners)。这就是为什么它比groupByKey()更受推荐。根据您的版本,您还可以查看combineByKey()和reduceByKey()。也许您可以以这种方式重新编写您的统计数据...或者不行...试试看! - Daniel Langdon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接