Apache Spark根据特定大小拆分RDD

4
我将尝试从文本文件中读取字符串,但我想根据特定的大小限制每行。例如;
这是我代表文件的方式。

aaaaa\nbbb\nccccc

当尝试使用sc.textFile读取此文件时,RDD将显示为以下内容。
scala> val rdd = sc.textFile("textFile")
scala> rdd.collect
res1: Array[String] = Array(aaaaa, bbb, ccccc)

但我想限制这个RDD的大小。例如,如果限制为3,则应该得到像这样的结果。

Array[String] = Array(aaa, aab, bbc, ccc, c)

什么是最佳性能的实现方式?

所以你想忽略行边界并分割成n个字符组?在Spark之外预处理为长度为n的行,然后使用textFile读取几乎肯定会更快。 - The Archetypal Paul
2个回答

3

这并不是一个特别高效的解决方案(但也不太糟),你可以像这样做:

val pairs = rdd
  .flatMap(x => x)  // Flatten
  .zipWithIndex  // Add indices
  .keyBy(_._2 / 3)  // Key by index / n

// We'll use a range partitioner to minimize the shuffle 
val partitioner = new RangePartitioner(pairs.partitions.size, pairs)

pairs
  .groupByKey(partitioner)  // group
  // Sort, drop index, concat
  .mapValues(_.toSeq.sortBy(_._2).map(_._1).mkString("")) 
  .sortByKey()
  .values

可以通过显式传递填充分区所需的数据来避免洗牌,但需要编写一些代码。请参阅我的答案:将RDD分区为长度为n的元组
如果您可以接受在分区边界上出现一些不对齐的记录,则可以使用简单的mapPartitions与grouped来以更低的成本解决问题:
rdd.mapPartitions(_.flatMap(x => x).grouped(3).map(_.mkString("")))

也可以使用滑动RDD:

rdd.flatMap(x => x).sliding(3, 3).map(_.mkString(""))

1

无论如何,您都需要阅读所有数据。除了映射每行并修剪它之外,您没有太多可以做的。

rdd.map(line => line.take(3)).collect()

1
我认为这不是正确的。请再看一下预期的输出。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接