在缓存较慢的情况下运行org.apache.spark.executor.CoarseGrainedExecutorBackend
的进程的jstack
显示如下:
"Executor task launch worker-4" #76 daemon prio=5 os_prio=0 tid=0x00000000030a4800 nid=0xdfb runnable [0x00007fa5f28dd000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.resize(IdentityHashMap.java:481)
at java.util.IdentityHashMap.put(IdentityHashMap.java:440)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:251)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$$anonfun$sampleArray$1.apply$mcVI$sp(SizeEstimator.scala:284)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.SizeEstimator$.sampleArray(SizeEstimator.scala:276)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:260)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:211)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"Executor task launch worker-5" #77 daemon prio=5 os_prio=0 tid=0x00007fa6218a9800 nid=0xdfc runnable [0x00007fa5f34e7000]
java.lang.Thread.State: RUNNABLE
at java.util.IdentityHashMap.put(IdentityHashMap.java:428)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:176)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:224)
at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:223)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:223)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:203)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:70)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
SizeEstimator是缓存 ostensibly 已在内存中的东西成本的主要之一,因为对于未知对象的适当大小估计可以相当困难;如果您查看visitSingleObject方法,就会发现它严重依赖于反射,调用getClassInfo
来访问运行时类型信息;不仅遍历完整个对象层次结构,而且还会检查每个嵌套成员是否与IdentityHashMap
中的某个具体对象实例相同,因此堆栈跟踪显示大量时间花费在这些IdentityHashMap操作上。
在您的示例对象的情况下,您基本上将每个项作为从包装整数到包装整数的映射列表;假定Scala的内部映射实现也持有一个数组,这就解释了visitSingleObject -> List.foreach -> visitSingleObject -> visitSingleObject的调用层次结构。无论如何,在这种情况下,有很多内部对象需要访问,因此SizeEstimators为每个采样对象设置了一个新的IdentityHashMap。
在您测量的情况下:
profile( rdd.cache.count )
由于RDD已经成功缓存,因此这不被视为执行缓存逻辑的一部分,所以Spark足够智能,不会重新运行缓存逻辑。您实际上可以通过直接对新的RDD创建和缓存进行分析,将缓存逻辑的确切成本单独隔离出来,而不需要使用额外的"map(identity)"转换;以下是我从您最后几行继续的Spark会话:
scala> profile( rdd.count )
time = 91ms
res1: Long = 1000
scala> profile( rdd.map(identity).count )
time = 112ms
res2: Long = 1000
scala> profile( rdd.cache.count )
time = 59ms
res3: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 6564ms
res4: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).count )
time = 14990ms
res5: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).cache.count )
time = 22229ms
res6: Long = 1000
scala> profile( sc.parallelize(1 to n).map( k => bigContent() ).map(identity).cache.count )
time = 21922ms
res7: Long = 1000
你可以看到,慢的原因并不是因为你运行了一个 map
转换,而是在这种情况下,当每个对象有大约100万到1000万个内部对象时(取决于Map实现方式;例如,堆栈跟踪中额外的visitArray
嵌套提示HashMap impl具有嵌套数组,这对于每个哈希表条目内的典型密集线性探测数据结构是有道理的),计算缓存逻辑的基本成本约为6秒。
针对你的具体用例,如果可能,应该偏向于惰性缓存,因为缓存中间结果会有开销,如果你不会真正重复使用中间结果进行大量单独的下游转换,则这种折衷方法不划算。但正如你在问题中提到的,如果你确实使用一个RDD分支扩展到多个不同的下游转换,那么如果原始转换比较昂贵,你确实需要缓存步骤。
解决方法是尝试使用更适合常数时间计算的内部数据结构(例如基本类型的数组),在这些数据结构中,您可以节省大量的成本,避免遍历大量包装器对象并依赖于SizeEstimator中的反射。
我尝试了像Array [Array [Int]] 这样的东西,尽管仍然有一些开销,但相似数据量的情况下要好10倍:
scala> def bigContent2() = (1 to 1000).map( i => (1 to 1000).toArray ).toArray
bigContent2: ()Array[Array[Int]]
scala> val rdd = sc.parallelize(1 to n).map( k => bigContent2() ).cache
rdd: org.apache.spark.rdd.RDD[Array[Array[Int]]] = MapPartitionsRDD[23] at map at <console>:28
scala> rdd.count
res16: Long = 1000
scala>
scala>
scala> profile( rdd.count )
time = 29ms
res17: Long = 1000
scala> profile( rdd.map(identity).count )
time = 42ms
res18: Long = 1000
scala> profile( rdd.cache.count )
time = 34ms
res19: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 763ms
res20: Long = 1000
为了说明在任何更复杂的对象上反射成本有多么糟糕,如果我在那里删除最后一个
toArray
并且最终每个
bigContent
都是一个
scala.collection.immutable.IndexedSeq [Array [Int]]
,则性能回到原始
IndexSeq [Map [Int,Int]]
案例的 ~2倍慢的范围内:
scala> def bigContent3() = (1 to 1000).map( i => (1 to 1000).toArray )
bigContent3: ()scala.collection.immutable.IndexedSeq[Array[Int]]
scala> val rdd = sc.parallelize(1 to n).map( k => bigContent3() ).cache
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.IndexedSeq[Array[Int]]] = MapPartitionsRDD[27] at map at <console>:28
scala> rdd.count
res21: Long = 1000
scala>
scala>
scala> profile( rdd.count )
time = 27ms
res22: Long = 1000
scala> profile( rdd.map(identity).count )
time = 39ms
res23: Long = 1000
scala> profile( rdd.cache.count )
time = 37ms
res24: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 2781ms
res25: Long = 1000
如评论部分所讨论的,您还可以考虑使用MEMORY_ONLY_SER StorageLevel。只要有高效的序列化器,它很可能比SizeEstimator中使用的递归反射更便宜;为此,您只需用persist(StorageLevel.MEMORY_ONLY_SER)替换cache(),正如另一个问题中提到的那样,cache()在概念上与persist(StorageLevel.MEMORY_ONLY)是相同的。
import org.apache.spark.storage.StorageLevel
profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
我曾经在Spark 1.6.1和Spark 2.0.0-preview上尝试过,其他集群配置都完全相同(分别使用Google Cloud Dataproc的"1.0"和"preview"镜像版本)。不幸的是,在Spark 1.6.1中,MEMORY_ONLY_SER技巧似乎并没有起到帮助的作用:
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 6709ms
res19: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 6126ms
res20: Long = 1000
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 6214ms
res21: Long = 1000
但是在 Spark 2.0.0-preview 中,性能似乎提高了 10 倍:
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY_SER).count )
time = 500ms
res18: Long = 1000
scala> profile( rdd.map(identity).cache.count )
time = 5353ms
res19: Long = 1000
scala> profile( rdd.map(identity).persist(StorageLevel.MEMORY_ONLY).count )
time = 5927ms
res20: Long = 1000
但这可能因您的对象而异;如果序列化本身并没有使用大量反射,那么只有在使用Kryo序列化时才会期望加速; 如果您能够有效地使用MEMORY_ONLY_SER
来处理这些大型对象,则很可能会看到改进。
RDD[A]
。此外,没有洗牌。这是最简单的映射操作。如果映射很复杂,它将花费第二个配置文件中的时间:rdd.map(identity).count
。 - Juh_.map(identity)
进行了转换)。因此,如果你附加缓存并对其计数,第一次计数将触发缓存机制,但在这个特定的计数操作中,你不会从中受益。不过,对于缓存的rdd的后续操作将利用它被缓存的优势-前提是你在变量中存储了对它的引用。 - Jonathan TawsMEMORY_ONLY
将RDD作为反序列化的Java对象存储在JVM中。你确定你的数据适合内存吗?否则它将会被动态重新计算。你可以在应用程序详细信息UI的存储选项卡中查看。在内部,persist
使用一个分区迭代器的BlockManager
,如此处所解释的那样。 - Jonathan Taws