我有一个RDD [Row]
,需要持久化到第三方存储库中。但是这个第三方存储库在单次调用中最多接受5 MB的数据。
因此,我想根据RDD
中存在数据的大小而不是行数来创建分区。
如何找到RDD
的大小并根据其创建分区?
我有一个RDD [Row]
,需要持久化到第三方存储库中。但是这个第三方存储库在单次调用中最多接受5 MB的数据。
因此,我想根据RDD
中存在数据的大小而不是行数来创建分区。
如何找到RDD
的大小并根据其创建分区?
正如Justin和Wang所提到的,获取RDD的大小并不是直接的。我们只能做一个估计。
我们可以对一个RDD进行采样,然后使用SizeEstimator来获取样本的大小。 正如Wang和Justin所提到的, 基于离线采样的大小数据,例如,在离线时使用X行占用Y GB,运行时Z行可能需要Z*Y/X GB
下面是获取RDD大小/估算的Scala示例代码。
我对Scala和Spark还很陌生。下面的示例代码可能有更好的写法。
def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
一种直接的方法是根据需要是否以序列化形式存储数据,调用以下函数,然后转到Spark UI的“存储”页面,您应该能够确定RDD的总大小(内存+磁盘):
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
在运行时准确计算内存大小并不容易。但你可以试着在运行时进行估算:以离线采样的数据大小为基础,比如 X 行使用了 Y GB 离线,那么运行时的 Z 行可能需要 Z*Y/X GB;这与 Justin 早先提出的类似。
希望这能有所帮助。
count
也是一种大小的度量方式——这个答案并没有真正回答问题,但确实为理想答案增加了信息。 - ribamar这将取决于如序列化等因素,因此并不是一成不变的。但是,您可以拿出一组样本数据,并对该样本数据进行一些实验,从而推断出结果。
如果你正在集群上处理大数据,那么这就是要使用的版本——也就是它会消除 collect 操作。
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)
sag
上面的答案中使用的collect
,但仍然使用Spark的SizeEstimator.estimate
,这可能比在行上运行mkString
并查看字符串长度更准确。假定这个答案只能在它们被存储为字符串时才能工作,并且取决于RDD如何持久化(序列化为字符串、序列化为Java对象等)。 - Marcus
rdd.sample(true, NO_OF_SAMPLE_ROWS)
会返回完整的RDD,第二个参数应该是介于0和1之间的数字。 - Victor P.