当在Spark中运行迭代程序(Java)时,出现堆栈溢出错误

3

我正在尝试在Spark中实现层次聚合算法。

当输入数据较大时,在本地模式下运行时,经常会抛出java.lang.StackOverflowError

有人说(例如Cut off the super long serialization chain),这是因为序列化链变得太长了。所以我每几个迭代就添加了检查点,但仍然不起作用。还有其他人遇到了这个问题吗?

我的代码如下:

JavaRDD<LocalCluster> run(JavaRDD<Document> documents)
{
    @SuppressWarnings("resource")
    JavaSparkContext sc = new JavaSparkContext(documents.context());
    Broadcast<Double> mergeCriterionBroadcast = sc.broadcast(mergeCriterion);
    Accumulable<Long, Long> clusterIdAccumulator = sc.accumulable(documents.map(doc -> doc.getId()).max(Comparator.naturalOrder()), new LongAccumulableParam());

    // Clusters.
    JavaRDD<LocalCluster> clusters = documents.map(document -> 
    {
        return LocalCluster.of(document.getId(), Arrays.asList(document));          
    }).cache();

    // Calculate clusters pair-wise similarity, initially, each document forms a cluster.
    JavaPairRDD<Tuple2<LocalCluster, LocalCluster>, Double> similarities = clusters.cartesian(clusters).filter(clusterPair -> (clusterPair._1().getId() < clusterPair._2().getId()))
    .mapToPair(clusterPair ->
    {
        return new Tuple2<Tuple2<LocalCluster, LocalCluster>, Double>(clusterPair, LocalCluster.getSimilarity(clusterPair._1(), clusterPair._2()));
    })
    .filter(tuple -> (tuple._2() >= mergeCriterionBroadcast.value())).cache();

    // Merge the most similar two clusters.
    long count = similarities.count();
    int loops = 0;
    while (count > 0)
    {
        System.out.println("Count: " + count);
        Tuple2<Tuple2<LocalCluster, LocalCluster>, Double> mostSimilar = similarities.max(SerializableComparator.serialize((a, b) -> Double.compare(a._2(), b._2())));

        Broadcast<Tuple2<Long, Long>> MOST_SIMILAR = sc.broadcast(new Tuple2<Long, Long>(mostSimilar._1()._1().getId(), mostSimilar._1()._2().getId()));

        clusterIdAccumulator.add(1L);
        LocalCluster newCluser = LocalCluster.merge(mostSimilar._1()._1(), mostSimilar._1()._2(), clusterIdAccumulator.value());

        JavaRDD<LocalCluster> newClusterRDD = sc.parallelize(Arrays.asList(newCluser));
        JavaRDD<LocalCluster> filteredClusters = clusters.filter(cluster -> (cluster.getId() != MOST_SIMILAR.value()._1() && cluster.getId() != MOST_SIMILAR.value()._2()));

        JavaPairRDD<Tuple2<LocalCluster, LocalCluster>, Double> newSimilarities = filteredClusters.cartesian(newClusterRDD)
        .mapToPair(clusterPair ->
        {
            return new Tuple2<Tuple2<LocalCluster, LocalCluster>, Double>(clusterPair, LocalCluster.getSimilarity(clusterPair._1(), clusterPair._2()));
        })
        .filter(tuple -> (tuple._2() >= mergeCriterionBroadcast.value()));

        clusters = filteredClusters.union(newClusterRDD).coalesce(2).cache();
        similarities = similarities.filter(tuple -> 
                (tuple._1()._1().getId() != MOST_SIMILAR.value()._1()) && 
                (tuple._1()._1().getId() != MOST_SIMILAR.value()._2()) && 
                (tuple._1()._2().getId() != MOST_SIMILAR.value()._1()) && 
                (tuple._1()._2().getId() != MOST_SIMILAR.value()._2()))
                .union(newSimilarities).coalesce(4).cache();
        if ((loops++) >= 2)
        {
            clusters.checkpoint();
            similarities.checkpoint();
            loops = 0;
        }

        count = similarities.count();
    }

    return clusters.filter(cluster -> cluster.getDocuments().size() > 1);
}

堆栈跟踪的第一部分

15/08/05 10:11:19 ERROR TaskSetManager: Failed to serialize task 4078, not attempting to retry it.

java.io.IOException: java.lang.StackOverflowError
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1242)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.CoalescedRDDPartition$$anonfun$writeObject$1.apply$mcV$sp(CoalescedRDD.scala:48)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.CoalescedRDDPartition.writeObject(CoalescedRDD.scala:45)
at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at org.apache.spark.rdd.UnionPartition$$anonfun$writeObject$1.apply$mcV$sp(UnionRDD.scala:55)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239)
at org.apache.spark.rdd.UnionPartition.writeObject(UnionRDD.scala:52)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

堆栈跟踪的另一部分:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

在错误之前,有许多这样的警告。
15/08/05 11:19:56 WARN TaskSetManager: Stage 519 contains a task of very large size (226 KB). The maximum recommended task size is 100 KB.

每隔几次迭代添加一个检查点,如何解决序列化链过长的问题? - user207421
这就是检查点的作用,它将计算出的RDD存储到文件中,而不包含序列化链信息。下次需要该RDD时,它将从文件中加载。因此,每次执行检查点操作时,序列化链都会被切断。 - xfoolish
1个回答

4
最终,我找到了问题所在:这是由于使用了checkpoint
在Spark中,checkpoint允许用户截断RDD的血统,但仅当RDD被action(如collect、count)计算后才会创建检查点。
在我的程序中,没有在clusters RDD上调用任何操作,因此不会创建检查点,clusters的序列化链仍然不断增长,最终导致stackoverflowerror。要解决这个问题,请在checkpoint之后调用count
        clusters = filteredClusters.union(newClusterRDD).coalesce(2).cache();
        similarities = similarities.filter(tuple -> 
                (tuple._1()._1().getId() != MOST_SIMILAR.value()._1()) && 
                (tuple._1()._1().getId() != MOST_SIMILAR.value()._2()) && 
                (tuple._1()._2().getId() != MOST_SIMILAR.value()._1()) && 
                (tuple._1()._2().getId() != MOST_SIMILAR.value()._2()))
                .union(newSimilarities).coalesce(4).cache();
        if ((loops++) >= 50)
        {
            clusters.checkpoint();
            clusters.count();
            similarities.checkpoint();
            loops = 0;
        }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接