当RDD项目很大时,为什么rdd.map(identity).cache会变慢?

4
我发现当在RDD上使用.map( identity ).cache时,如果项目很大,它会变得非常缓慢。而否则几乎是瞬间完成的。
注意:这可能与这个问题有关,但在这里我提供了一个非常精确的例子(可以直接在spark-shell中执行)。
// simple function to profile execution time (in ms)
def profile[R](code: => R): R = {
  val t = System.nanoTime
  val out = code
  println(s"time = ${(System.nanoTime - t)/1000000}ms")
  out
}

// create some big size item
def bigContent() = (1 to 1000).map( i => (1 to 1000).map( j => (i,j) ).toMap )

// create rdd
val n = 1000 // size of the rdd

val rdd = sc.parallelize(1 to n).map( k => bigContent() ).cache
rdd.count // to trigger caching

// profiling
profile( rdd.count )                 // around 12 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 5700 ms !!!

我最初认为是创建一个新的RDD(容器)的时间到了。但是,如果我使用一个大小相同但内容较少的RDD,则执行时间只有微小的差异:

val rdd = parallelize(1 to n).cache
rdd.count

profile( rdd.count )                 // around 9 ms
profile( rdd.map(identity).count )   // same
profile( rdd.cache.count )           // same
profile( rdd.map(identity).cache.count ) // 15 ms

所以,似乎缓存实际上是复制数据。我曾以为缓存还会浪费时间来序列化它,但我检查了默认的MEMORY_ONLY持久性使用了缓存:

rdd.getStorageLevel == StorageLevel.MEMORY_ONLY // true

=> 那么,缓存是复制数据还是其他什么东西?
这对我的应用程序来说确实是一项重大限制,因为我从采用类似于rdd = rdd.map(f: Item => Item).cache的设计开始,可以与许多这样的函数f以任意顺序(我不能事先确定的顺序)应用。
我正在使用 Spark 1.6.0。
编辑
当我查看 Spark UI -> 阶段标签页 -> 最后一个阶段(即第4个),所有任务都几乎具有相同的数据:
- 持续时间=3s(虽然已达到3s,但仍然比预期的2.9太长了 :-\) - 调度程序10ms - 任务反序列化20ms - GC 0.1s(所有任务都有这个,但为什么会触发GC?) - 结果序列化0ms - 获取结果0ms - 峰值执行内存0.0B - 输入大小为7.0MB / 125 - 无错误

缓存意味着Spark将把您的数据复制到列式数据的Spark缓存中。在缓存之前进行映射操作可能会导致洗牌操作。您可以通过Spark UI来检查映射和缓存时发生了什么。 - giaosudau
什么是列式数据?在这里,它只是一些A的RDD[A]。此外,没有洗牌。这是最简单的映射操作。如果映射很复杂,它将花费第二个配置文件中的时间:rdd.map(identity).count - Juh_
我从Spark UI中添加了信息。请注意,无论以何种格式,它都没有序列化的原因。还是我漏掉了什么? - Juh_
1
@Juh_ 这是正常的,因为你要求缓存一个新的rdd(它已经通过.map(identity)进行了转换)。因此,如果你附加缓存并对其计数,第一次计数将触发缓存机制,但在这个特定的计数操作中,你不会从中受益。不过,对于缓存的rdd的后续操作将利用它被缓存的优势-前提是你在变量中存储了对它的引用。 - Jonathan Taws
基于这份文档MEMORY_ONLY将RDD作为反序列化的Java对象存储在JVM中。你确定你的数据适合内存吗?否则它将会被动态重新计算。你可以在应用程序详细信息UI的存储选项卡中查看。在内部,persist使用一个分区迭代器的BlockManager,如此处所解释的那样。 - Jonathan Taws
显示剩余6条评论
1个回答

12

在缓存较慢的情况下运行org.apache.spark.executor.CoarseGrainedExecutorBackend的进程的jstack显示如下:

"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
  at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
  at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
   java.lang.Thread.State: RUNNABLE
  at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
  at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
  at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
  at scala.collection.immutable.List.foreach(List.scala:318)
  at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
  at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
  at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
  at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
  at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
  at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
  at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
  at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

SizeEstimator是缓存 ostensibly 已在内存中的东西成本的主要之一,因为对于未知对象的适当大小估计可以相当困难;如果您查看visitSingleObject方法,就会发现它严重依赖于反射,调用getClassInfo来访问运行时类型信息;不仅遍历完整个对象层次结构,而且还会检查每个嵌套成员是否与IdentityHashMap中的某个具体对象实例相同,因此堆栈跟踪显示大量时间花费在这些IdentityHashMap操作上。

在您的示例对象的情况下,您基本上将每个项作为从包装整数到包装整数的映射列表;假定Scala的内部映射实现也持有一个数组,这就解释了visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject的调用层次结构。无论如何,在这种情况下,有很多内部对象需要访问,因此SizeEstimators为每个采样对象设置了一个新的IdentityHashMap。

在您测量的情况下:

profile( rdd.cache.count )

由于RDD已经成功缓存,因此这不被视为执行缓存逻辑的一部分,所以Spark足够智能,不会重新运行缓存逻辑。您实际上可以通过直接对新的RDD创建和缓存进行分析,将缓存逻辑的确切成本单独隔离出来,而不需要使用额外的"map(identity)"转换;以下是我从您最后几行继续的Spark会话:

scala> profile( rdd.count )
time = 91ms
res1: Long = 1000

scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000

scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6564ms                                                                   
res4: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms                                                                  
res5: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms                                                                  
res6: Long = 1000

scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms                                                                  
res7: Long = 1000

你可以看到,慢的原因并不是因为你运行了一个 map 转换,而是在这种情况下,当每个对象有大约100万到1000万个内部对象时(取决于Map实现方式;例如,堆栈跟踪中额外的visitArray嵌套提示HashMap impl具有嵌套数组,这对于每个哈希表条目内的典型密集线性探测数据结构是有道理的),计算缓存逻辑的基本成本约为6秒。

针对你的具体用例,如果可能,应该偏向于惰性缓存,因为缓存中间结果会有开销,如果你不会真正重复使用中间结果进行大量单独的下游转换,则这种折衷方法不划算。但正如你在问题中提到的,如果你确实使用一个RDD分支扩展到多个不同的下游转换,那么如果原始转换比较昂贵,你确实需要缓存步骤。

解决方法是尝试使用更适合常数时间计算的内部数据结构(例如基本类型的数组),在这些数据结构中,您可以节省大量的成本,避免遍历大量包装器对象并依赖于SizeEstimator中的反射。

我尝试了像Array [Array [Int]] 这样的东西,尽管仍然有一些开销,但相似数据量的情况下要好10倍:

scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28

scala> rdd.count // to trigger caching
res16: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 29ms
res17: Long = 1000

scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000

scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 763ms                                                                    
res20: Long = 1000
为了说明在任何更复杂的对象上反射成本有多么糟糕,如果我在那里删除最后一个 toArray 并且最终每个 bigContent 都是一个 scala.collection.immutable.IndexedSeq [Array [Int]],则性能回到原始 IndexSeq [Map [Int,Int]] 案例的 ~2倍慢的范围内:
scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]

scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28

scala> rdd.count // to trigger caching
res21: Long = 1000                                                              

scala> 

scala> // profiling

scala> profile( rdd.count )
time = 27ms
res22: Long = 1000

scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000

scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 2781ms                                                                   
res25: Long = 1000

如评论部分所讨论的,您还可以考虑使用MEMORY_ONLY_SER StorageLevel。只要有高效的序列化器,它很可能比SizeEstimator中使用的递归反射更便宜;为此,您只需用persist(StorageLevel.MEMORY_ONLY_SER)替换cache(),正如另一个问题中提到的那样,cache()在概念上与persist(StorageLevel.MEMORY_ONLY)是相同的。

import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )

我曾经在Spark 1.6.1和Spark 2.0.0-preview上尝试过,其他集群配置都完全相同(分别使用Google Cloud Dataproc的"1.0"和"preview"镜像版本)。不幸的是,在Spark 1.6.1中,MEMORY_ONLY_SER技巧似乎并没有起到帮助的作用:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 6126ms                                                                   
res20: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms                                                                   
res21: Long = 1000

但是在 Spark 2.0.0-preview 中,性能似乎提高了 10 倍:

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000

scala> profile( rdd.map(identity).cache.count )
time = 5353ms                                                                   
res19: Long = 1000

scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms                                                                   
res20: Long = 1000

但这可能因您的对象而异;如果序列化本身并没有使用大量反射,那么只有在使用Kryo序列化时才会期望加速; 如果您能够有效地使用MEMORY_ONLY_SER来处理这些大型对象,则很可能会看到改进。


但是结论是令人误解的:-\ 我旨在实现的设计是运行许多rdd = rdd.map(f).cache,然后收集一些统计信息(例如rdd.map(_.map(_.sum)).collect)。然后会任意选择以下f,部分与这些收集到的值有关。有些f很简单,有些则很长......好在你展示了使用数组可以提高效率。但我不知道能否使用它。我的实际结构更加复杂。我需要进行一些测试。 - Juh_
我还需要查看Spark的SizeEstimator系统。乍一看,似乎我可以制作一个自制类来实现它,以绕过完整解析。对此有什么了解吗? - Juh_
不确定是否可以动态地完成,但我以前肯定已经构建了Spark汇编的自定义版本,如果您只想要git clone并构建自己的Spark,则说明书非常好:https://github.com/apache/spark。 - Dennis Huo
1
但在深入研究Spark层之前,您可能还应该考虑将存储级别更改为MEMORY_ONLY_SER而不是MEMORY_ONLY;我还没有时间测试它,但理论上将缓存层以序列化形式存储至少可以绕过繁重的大小估算逻辑,有效地与手动将所有内容保留在字节数组格式中以在任务逻辑内重新构建相同。在我的过去经验中,反序列化成本通常比进行过多反射要便宜。 - Dennis Huo
1
有趣的是,我在Spark 1.6.1和Spark 2.0.0-preview中都尝试了MEMORY_ONLY_SER,使用Google Cloud Dataproc,在Spark 1.6.1中,MEMORY_ONLY_SER与cache或persist(MEMORY_ONLY)的性能相同,而在Spark 2.0.0-preview中,MEMORY_ONLY_SER快了10倍。 - Dennis Huo
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接