使用Spark集合累加器时出现ConcurrentModificationException

10
我正在尝试在Azure HDInsight按需集群上运行基于Spark的应用程序,但是看到了很多SparkExceptions(由ConcurrentModificationExceptions引起)被记录。当我启动本地Spark实例时,应用程序可以运行而没有这些错误。 我看到有类似的报告在使用累加器时出现错误,我的代码确实使用了CollectionAccumulator,但是我已经在每次使用它的地方放置了同步块,并且没有任何区别。与累加器相关的代码如下:
class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}
异常不会导致应用程序失败,但是当调用endOfBatch并且代码尝试从累加器中读取值时,它是空的,并且processIt永远不会被调用。 我们正在使用HDInsight版本3.6和Spark版本2.3.0。
18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more
以下代码是一个更加自包含的示例,可以重现问题。{{MyRecord}}是一个简单的case类,仅包含数值。该代码在本地运行时没有错误,但在HDInsight集群上会产生上述错误。
object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}

这可能听起来像是一个愚蠢的建议,但我并不完全相信这个错误来自你的累加器。因此,我想提出两个测试建议:1)使用修改过的MySparkClass进行一次虚拟运行,以这样的方式将所有(虚假的)数据预添加到构造函数中,然后永远不会被add修改。2)使用写时复制逻辑进行测试运行:将myAccumulator **var**而不是val,并使用copy-add-赋值循环而不仅仅是add。这是100%线程安全的,但非常慢。我敢打赌错误仍然存在。 - SergGr
在Apache网站上提出的错误报告链接:https://issues.apache.org/jira/browse/SPARK-26183 - codebox
2个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
1
我怀疑使用同步块是否真的有所帮助。CustomeAccumulators或所有其他累加器都不是线程安全的。它们实际上没有必要,因为DAGScheduler.updateAccumulators方法只在运行调度循环的单个线程上执行,该方法由Spark驱动程序用于在任务完成(成功或失败)后更新累加器的值。此外,它们是供具有自己本地累加器引用的工作程序编写的数据结构,而访问累加器的值仅由驱动程序允许。 当你说它在本地模式下工作是因为它是单个JVM,但在集群模式下,它们是不同的JVM和Java实例,PRC调用被触发以启用通信。 你的MyRecord对象长什么样子,如果你只用.value结束你的行而不是迭代它会有所帮助。尝试一下。
myAccumulator.value

add 方法是从 Executor 代码调用的,而 endOfBatch 是从驱动程序代码调用的。我不确定您在询问本地模式方面的内容,但当我在本地 PC 上运行应用程序时,它是可以工作的,使用主 URL 为 local[4] - 如果您需要进一步的信息,请随时提问。 - codebox
我需要迭代已添加到累加器中的值,如果我进行您建议的编辑,那么processIt将如何被调用?MyRecord类是一个简单的case类,具有各种数字和布尔字段。 - codebox
你能展示一下使用你的类的代码吗?或者至少提供一个最小化的例子来重现这个错误吗? - Oli
@Oli 我已经在问题中添加了一个示例。 - codebox
很奇怪...我也在本地成功运行了你的代码,并得到了预期的结果。出于好奇,你为什么要使用累加器来收集那么多数据到驱动程序呢? - Oli

-1

在调用RDD的某些操作(如collectcount)之后才读取累加器是有意义的。

此外,您不需要在累加器上进行同步,因为每个分区都会分配一个独立的副本。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,