如何查找RDD的大小

28

我有一个RDD [Row],需要持久化到第三方存储库中。但是这个第三方存储库在单次调用中最多接受5 MB的数据。

因此,我想根据RDD中存在数据的大小而不是行数来创建分区。

如何找到RDD的大小并根据其创建分区?

5个回答

14

正如Justin和Wang所提到的,获取RDD的大小并不是直接的。我们只能做一个估计。

我们可以对一个RDD进行采样,然后使用SizeEstimator来获取样本的大小。 正如Wang和Justin所提到的, 基于离线采样的大小数据,例如,在离线时使用X行占用Y GB,运行时Z行可能需要Z*Y/X GB

下面是获取RDD大小/估算的Scala示例代码。

我对Scala和Spark还很陌生。下面的示例代码可能有更好的写法。

def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }

  totalSize
}

def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }

    rddSize
}

YARN 如何获取 RDD 的大小?我正在运行作业并估计我的 RDD 大小为 GB,但无法在 Spark 代码内访问此信息。 - Glenn Strycker
1
我发现value.asInstanceOf[AnyRef]比toString更好用,因为value.toString可能在value为空时抛出空指针异常,而强制转换则不会有这个问题,因此更加安全。 - lockwobr
2
@SamuelAlexander rdd.sample(true, NO_OF_SAMPLE_ROWS) 会返回完整的RDD,第二个参数应该是介于0和1之间的数字。 - Victor P.
1
@TheProletariat - 是的,你也可以用你的代码编辑答案。 - sag
1
这个编程示例是错误的。正如@VictorP.指出的那样,NO_OF_SAMPLE_ROWS必须是分数形式。 - WamBamBoozle
显示剩余6条评论

7

一种直接的方法是根据需要是否以序列化形式存储数据,调用以下函数,然后转到Spark UI的“存储”页面,您应该能够确定RDD的总大小(内存+磁盘):

rdd.persist(StorageLevel.MEMORY_AND_DISK)

or

rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

在运行时准确计算内存大小并不容易。但你可以试着在运行时进行估算:以离线采样的数据大小为基础,比如 X 行使用了 Y GB 离线,那么运行时的 Z 行可能需要 Z*Y/X GB;这与 Justin 早先提出的类似。

希望这能有所帮助。


1
感谢您的答复。是的,这对查找大小很有帮助。但是我想在管道/代码执行过程中检查此内容。因此,在Spark UI中手动检查不适合我。 - sag
1
我认为在运行时计算准确的内存大小并不容易。但你可以尝试在运行时进行估算:基于离线采样的数据大小,比如说,X行使用了Y GB的离线数据,在运行时Z行可能需要Z*Y/X GB;这与Justin之前提出的建议类似。 - Haiying Wang
随机问题,当我执行rdd.cache()时,我在用户界面中看不到它。仅存储在内存中的数据不会显示吗? - zengr

6
我认为RDD.count()将给出RDD中元素的数量。

6
你好 @Yiying,欢迎来到StackOverflow。发帖者正在询问RDD的大小,而不仅仅是行数。也许你可以扩展一下你的回答,这样发帖者就不需要进一步澄清了。一旦你有足够的声望,如果你愿意,你就可以留下评论。 - buruzaemon
2
问题要求以信息单位(字节)表示大小。但是count也是一种大小的度量方式——这个答案并没有真正回答问题,但确实为理想答案增加了信息。 - ribamar

3

这将取决于如序列化等因素,因此并不是一成不变的。但是,您可以拿出一组样本数据,并对该样本数据进行一些实验,从而推断出结果。


考虑我有一个包含字符串的RDD。需要遍历整个RDD并使用String.size()获取大小吗? - sag
@sag 这是一种方法,但会增加执行时间。如果你的RDD不是非常大,你可以这样做。 - BJC

0

如果你正在集群上处理大数据,那么这就是要使用的版本——也就是它会消除 collect 操作。

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")

  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")

  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")

  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")

  return estimatedTotalSize
}

// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)

我认为可能有一种解决方案可以避免使用sag上面的答案中使用的collect,但仍然使用Spark的SizeEstimator.estimate,这可能比在行上运行mkString并查看字符串长度更准确。假定这个答案只能在它们被存储为字符串时才能工作,并且取决于RDD如何持久化(序列化为字符串、序列化为Java对象等)。 - Marcus

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接