如何将一个RDD拆分成两个或多个RDD?

41

我正在寻找一种将RDD拆分成两个或多个RDD的方法。 我最接近的是 Scala Spark: Split collection into several RDD? ,但它仍然只是一个RDD。

如果您熟悉SAS,可以尝试类似下面的操作:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

导致了两个不同的数据集。为了得到我想要的结果,它必须立即持久化...

4个回答

67

不可能从单个转换中产生多个RDD。如果您想要拆分一个RDD,则必须为每个拆分条件应用一个filter。例如:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果您只有一个二进制条件且计算代价很高,您可能更喜欢使用类似于以下方式的方法:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

这意味着只有一个谓词计算,但需要额外遍历所有数据。

需要注意的是,只要输入RDD被正确缓存且没有其他关于数据分布的假设,重复筛选和嵌套if-else的for循环在时间复杂度上没有显著差异。

对于N个元素和M个条件,您需要执行的操作数量明显与N乘以M成正比。对于for循环,它应该接近(N + MN) / 2,而重复的筛选恰好是NM,但归根结底它无非就是O(NM)。您可以查看我的讨论Jason Lenderman以了解一些优缺点。

在非常高的层面上,您应该考虑两件事:

  1. Spark transformations are lazy, until you execute an action your RDD is not materialized

    Why does it matter? Going back to my example:

     rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    If later I decide that I need only rdd_odd then there is no reason to materialize rdd_even.

    If you take a look at your SAS example to compute work.split2 you need to materialize both input data and work.split1.

  2. RDDs provide a declarative API. When you use filter or map it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.

在一天结束时,这个案例并不特殊到足以证明它自己的转换。
这个带有过滤器模式的地图实际上是在核心Spark中使用的。请参见我的回答How does Sparks RDD.randomSplit actually split the RDDrandomSplit方法的相关部分
如果唯一的目标是在输入上实现拆分,则可以对DataFrameWriter使用partitionBy子句,其文本输出格式:
def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* Spark中只有3种基本的转换类型:

  • RDD[T] => RDD[T]
  • RDD[T] => RDD[U]
  • (RDD[T], RDD[U]) => RDD[W]

其中T、U、W可以是原子类型或产品/元组(K,V)。任何其他操作都必须使用上述某种组合来表达。您可以查看原始RDD论文以获取更多详细信息。

** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** 另请参见Scala Spark:将集合拆分为几个RDD?


1
非常有用:)。我在想为什么Spark中没有一个等效的分区方法。有什么想法吗? - Rakshith
1
@Rakshith 简洁。而且,既然我们看待谱系时会舍弃一支,那么简洁就更为重要了。 - zero323
1
@eje,类似的方法早在一段时间前就被Jason Lenderman提出了,并且已经在这个答案中链接了。我看到的问题是假设数据适合执行器内存,这在一般情况下是无法做出的。 - zero323
@zero323,所有分区数据都必须适合执行器内存,至少在计算时是这样的。多路复用RDD也不例外。存储类别可以指定以控制计算后是否缓存、溢出等。 - eje
据我所知,实际上并没有。但如果你能证明我错了,我很乐意看到一些证据。AFAIK数据被公开为迭代器,可以随时处理。除非您创建本地非惰性集合,否则没有理由将所有数据加载到内存中。您可以通过比较wholeTextFiles(一次性加载数据)和textFile的性能和资源使用情况来轻松观察差异。这基本上是为什么groupByKey不是最好的想法的原因。出于好奇 - 您是否尝试过测量GC的影响? - zero323
显示剩余4条评论

8
正如其他帖子中提到的,没有单一的本地RDD转换可以拆分RDD,但是这里有一些“多路复用”操作,可以有效地模拟各种在RDD上的“分裂”,不需要多次读取: http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions 一些特定于随机拆分的方法: http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions 这些方法可从开源silex项目中获得: https://github.com/willb/silex 一篇博客文章解释了它们的工作原理: http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}

def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
  persist: StorageLevel): Seq[RDD[U]] = {
  val mux = self.mapPartitionsWithIndex { case (id, itr) =>
    Iterator.single(f(id, itr))
  }.persist(persist)
  Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}

如其他地方所述,这些方法确实涉及到内存与速度的权衡,因为它们通过“急切地”计算整个分区结果而不是“懒惰地”计算来操作。因此,在大型分区上,这些方法可能会遇到内存问题,而传统的懒惰变换则不会。


3
值得重申另一个回答中的一部分对话:多路复用通过单次计算提高了效率,但它通过将结果存储在“非惰性”容器中来实现这一点,因此(取决于正在计算什么)与传统的多次遍历变体相比,可能会增加驻留内存。换句话说,多路复用通过增加内存使用来获得更高的计算效率。 - eje
2
这个评论作为答案的一部分不是更好吗? - zero323

4

一种方法是使用自定义分区器根据过滤条件对数据进行分区。这可以通过扩展Partitioner并实现类似于RangePartitioner的内容来实现。

然后可以使用map partitions从分区的RDD构建多个RDD,而无需读取所有数据。

val filtered = partitioned.mapPartitions { iter => {

  new Iterator[Int](){
    override def hasNext: Boolean = {
      if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
        false
      } else {
        iter.hasNext
      }
    }

    override def next():Int = iter.next()
  }

请注意,过滤后的RDD中分区的数量与分区RDD中的数量相同,因此应使用coalesce来减少分区数量并删除空分区。


在调用mapPartitions时,它会为每个分区运行任务,但是分区内的实际数据只会被读取一次。 - Jem Tucker
好的,但如果我立即持久化它,我只会触及每个观察值一次,并且我将有两个不同输出的RDD,对吗? - Carlos Bribiescas
是的,那将是这种情况。 - Jem Tucker
@JemTucker,你可以使用mapPartitionsWithIndex代替访问TaskContext。另外,不是每个观察值只会被访问一次。由于需要进行洗牌操作,这本身就很糟糕,因此至少部分数据将被读取、序列化、传输、反序列化和可选写入。这意味着不仅数据被多次访问,而且成本更高。 - zero323
这确实有道理,但是我使用这种方法过滤大量RDD时已经取得了良好的性能。我同意洗牌是昂贵的,但通常在之前的步骤中强制进行洗牌,因此可以在这些步骤中使用自定义分区器来有效地对分区进行排序,从而避免一组过滤器。 - Jem Tucker
如果数据已经被分区,那么这样做可能是有意义的。否则,你会用网络通信来替换线性内存扫描,这大约慢了两个数量级。更不用说你还要纠正偏斜的分布,这需要先验知识或采样(在RangePartitioner中的直方图)->另一个数据扫描。无论如何...你可以将代码简化为:mapPartitionsWithIndex((i, iter) => if (rangeOfPartitionsToKeep.contains(i)) iter else Iterator()) - zero323

2
如果您使用 randomSplit API 调用 来分割 RDD,您将获得一个 RDD 数组。
如果您希望返回 5 个 RDD,请传入 5 个权重值。
例如:
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)

splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)

2
这不是和 @zero323 的解决方案一样吗?他说它会多次读取,而我正试图避免这种情况。 - Carlos Bribiescas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接