如何查找Spark RDD/Dataframe的大小?

51

我知道如何在Scala中查找文件大小。但是如何在Spark中查找RDD / dataframe的大小?

Scala:

object Main extends App {
  val file = new java.io.File("hdfs://localhost:9000/samplefile.txt").toString()
  println(file.length)
}
Spark:
val distFile = sc.textFile(file)
println(distFile.length)

但如果我处理它,无法获取文件大小。如何找到RDD的大小?


2
你指的是 rdd 中的行数还是实际大小以 MB(或 GB)为单位? - Glennie Helles Sindholt
让我们来看一个例子,假设有一个50MB的文件作为输入,我想将它分割成5个部分。为了实现这个目标,首先需要输入RDD并找到其大小,但是这一步并没有成功。 - Venu A Positive
3个回答

78

谢谢,它正在工作。当我导入import org.apache.spark.util.SizeEstimator时就可以了,但不能得到准确的值,总是大约43MB左右。 - Venu A Positive
2
顺便提一下,如果一切都正常工作,您可以将问题标记为已解决吗 :) - Glennie Helles Sindholt
请在这里找到我的问题。http://sparkdeveloper.blogspot.in/2016/01/spark-solution-please.html - Venu A Positive
2
哦,我明白了 - 我错过了“始终约为43MB”的部分。但是,如果您不关心dataframe在内存中占用的大小,只想知道磁盘上文件的大小,为什么不使用常规的文件工具呢? - Glennie Helles Sindholt
1
常规文件工具可以告诉您任何给定文件在磁盘上的物理大小 - 无论它是parquet、gzipped还是以任何其他方式打包。您使用的是哪个文件工具,不能给您正确的大小? - Glennie Helles Sindholt
显示剩余2条评论

14

是的,终于我找到了解决方案。 包括这些库。

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd

如何查找RDD的大小:

def calcRDDSize(rdd: RDD[String]): Long = {
  rdd.map(_.getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}

查找DataFrame大小的函数:(此函数在内部仅将DataFrame转换为RDD)

val dataFrame = sc.textFile(args(1)).toDF() // you can replace args(1) with any path

val rddOfDataframe = dataFrame.rdd.map(_.toString())

val size = calcRDDSize(rddOfDataframe)

15
如果将一个数据框转换为RDD,它的大小会显著增加。 数据框使用项目钨来实现更高效的内存表示。 如果您只想了解大小印象,可以缓存RDD和数据框(确保通过例如进行计数来实现缓存的材料化),然后查看UI的存储选项卡下的大小。 请注意,在任一情况下,您都可以获得内存中的大小而不是文件大小。 - Assaf Mendelson
14
这个答案是错误的。将对象转换为字符串计算大小毫无意义。另外,import org.apache.spark.util.SizeEstimator 没有被使用。 - mathieu
1
这将实际上为您提供平面文本文件的大小,如果您将数据框存储在其中。所以基本上这就是我要找的。 - Amit Kumar
@Venu A Positive 我正在使用 spark-sql 2.4.1v,即使导入了此处显示的所有导入,我仍然无法获得 _.getBytes 方法,我还需要导入什么?是否需要更改 pom.xml?请建议。 - BdEngineer
我已经在本地的Spark安装中尝试过了,它的大小并不完全与我的操作系统相同,但这是迄今为止我找到的最接近的解决方案。谢谢! - mjbsgll

7
以下是一种除了SizeEstimator之外我经常使用的方式: 通过代码来判断一个RDD是否被缓存,更精确地说,有多少个分区在内存中被缓存,有多少个分区在磁盘上被缓存?想要获取存储级别,并且想要知道当前实际的缓存状态,以及知道内存消耗。 Spark Context拥有开发者API方法getRDDStorageInfo(),有时候可以用它。

Return information about what RDDs are cached, if they are in mem or on disk, how much space they take, etc.

For Example :

scala> sc.getRDDStorageInfo
       res3: Array[org.apache.spark.storage.RDDInfo] = 
       Array(RDD "HiveTableScan [name#0], (MetastoreRelation sparkdb, 
       firsttable, None), None " (3) StorageLevel: StorageLevel(false, true, false, true, 1);  CachedPartitions: 1;

TotalPartitions: 1; MemorySize: 256.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)

看起来Spark UI也使用了与此相同的代码

  • 请参见此源问题SPARK-17019,其中描述了...

描述
随着SPARK-13992,Spark支持将数据持久化到非堆内存中,但目前不暴露堆外内存的使用情况,这对于用户的监控和分析不太方便,因此在此提议在各个地方公开堆内存和堆外内存的使用情况:

  1. Spark UI的执行器页面将显示堆内存和堆外内存的使用情况。
  2. REST请求返回堆内存和堆外内存的使用情况。
  3. 还可以通过SparkListener以编程方式获取这两种内存使用情况。

2
我在这里看不到你是如何从sc.getRDDStorageInfo中得到MemorySize: 256.0 B的。 - user2739472

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接