Spark - GraphX - 扩展连通组件

7

我正在尝试使用连通组件,但在缩放方面遇到了问题。这是我的代码 -

// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache

// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache

// create graph  
val identityGraph = Graph(vertices, edges)

// get connected components
val cc = identityGraph.connectedComponents.vertices

在此处,GraphUtil有助于返回顶点和边的帮助函数。此时,我的图有大约100万个节点和200万条边(顺便说一句,预计将增长到大约1亿个节点)。我的图相当稀疏连接 - 所以我预计会有很多小图。

当我运行上述操作时,我不断收到"java.lang.OutOfMemoryError: Java heap space"错误。我已经尝试使用"executor-memory 32g"并运行一个由15个节点组成的集群,其中每个yarn容器大小为45g。

以下是异常详细信息:

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOfRange(Arrays.java:2694)
    at java.lang.String.<init>(String.java:203)
    at java.lang.StringBuilder.toString(StringBuilder.java:405)
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)

此外,我正在收到大量以下日志:
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes

我的问题是有人在这个规模上尝试过ConnectedComponents吗?如果是,那我做错了什么?

2个回答

6

与本地Spark连接组件相比,这在性能/可扩展性方面如何? - WestCoastProjects
我认为您所说的本地实现是指GraphX实现。上一次使用GraphX(大约一年前),它对我们来说无法扩展。到目前为止,在我们的测试中,我们的实现运行得非常好。我已将其记录在README中。 - Shirish Kumar

4
连接组件算法不太适合大规模应用,其性能在很大程度上取决于图的拓扑结构。边的稀疏性并不意味着你有小的组件。一长串的边非常稀疏(边数=顶点数-1),但在GraphX中实现的暴力算法效率不高(参见ccpregel的源代码)。
以下是您可以尝试的方法(按排序,仅代码):
  1. 将顶点和边以parquet格式(存储在磁盘上)进行检查点处理,然后重新加载它们以构建图形。当执行计划变得太大时,缓存有时无法胜任。
  2. 以不改变算法结果的方式转换图形。例如,您可以在code中看到算法正在双向传播信息(默认情况下应该如此)。因此,如果您有多个连接相同两个顶点的边,请从应用算法的图形中过滤掉它们。
  3. 使用通用优化(例如,在每次迭代时在磁盘上进行检查点处理以避免OOM)或基于领域的优化(类似于第2点),自行优化GraphX代码(这实际上非常简单)。
如果您可以放弃GraphX(已经有点过时) ,您可以考虑使用GraphFrames(package, blog)。我从未尝试过,所以我不知道它是否具有CC。 我相信您可以在Spark包中找到其他可能性,但也许您甚至想使用Spark之外的东西。 但这超出了问题的范围。祝你好运!

1
GraphFrames使用DataFrames和GraphX作为其基础,因此我不明白这将如何帮助OP。 - eliasah
@eliasah,我希望GraphFrames比GraphX更加优化。它们运行在DataFrames上的事实是一个好迹象,因为这样它们可以利用catalyst优化器和tungsten。正如我所说,我没有尝试过,我只是抱有一种明智的希望。 - Wilmerton
1
GraphX是一个项目,由于底层的图论难以扩展,没有人愿意参与。不幸的是,它几乎已经是一个死亡项目了。我认为GraphFrames不会比GraphX走得更远。 - eliasah
不幸的是,你可能是对的。但是越来越多的工作正在优化Spark计算(在项目内部和外部,例如flare),同时利用GPU。因此,也许一个集成在Spark中的解决方案可能会变得更具竞争力,利用其美观且集成的UI(GraphFrames)。 - Wilmerton
1
然后,你需要阅读这篇文章:http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Improvement-Proposals-td19268.html。围绕Spark发生了很多事情,我对其中的许多事情并不满意。虽然我认为自己是一个Spark狂热者。 - eliasah
1
感谢您的回复。我已决定从GraphX ConnectedComponent转换到我自己使用map/reduce构建的ConnectedComponent版本,目前表现良好。 - Shirish Kumar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接