67得票3回答
已用、已提交和最大堆内存的差异

我正在监控一个 Spark Executor JVM 的 OutOfMemoryException。我使用 Jconsole 连接到 Executor JVM。以下是 Jconsole 快照: 在图像中,已使用的内存显示为 3.8G,已提交的内存为 8.6G,最大内存也为 8.6G。有人能...

57得票2回答
如何优化Apache Spark应用程序中的Shuffle Spill

我正在运行一个带有两个工作节点的Spark流式应用程序。该应用包含联接和合并操作。 所有批次都成功完成,但是注意到Shuffle溢出指标与输入数据大小或输出数据大小不一致(溢出内存超过20倍)。 请在下面的图片中查找Spark阶段详细信息: 经过调查,发现: 当Shuffle数据没有...

54得票4回答
从缓存中删除Spark DataFrame

我正在使用带Python API的Spark 1.3.0。在转换巨大数据框时,我会缓存许多DF以实现更快的执行;df1.cache() df2.cache() 当某个特定的数据框使用完成并且不再需要时,我该如何将DF从内存中删除(或取消缓存)? 例如,df1在整个代码中都被使用,而df2仅用...

46得票3回答
使用Python的Spark:如何解决阶段x包含一个非常大(xxx KB)的任务。最大推荐任务大小为100 KB。

我刚刚创建了一个Python列表,其中包含range(1,100000)。 使用SparkContext执行了以下步骤:a = sc.parallelize([i for i in range(1, 100000)]) b = sc.parallelize([i for i in range...

42得票2回答
build.sbt: 如何添加Spark依赖

您好,我正在尝试从以下 build.sbt 文件中下载 spark-core、spark-streaming、twitter4j 和 spark-streaming-twitter:name := "hello" version := "1.0" scalaVersion := "2.11...

39得票6回答
如何在Spark Streaming中更新广播变量?

我认为我有一个相对常见的Spark Streaming用例: 我有一个对象流,我希望根据一些参考数据进行过滤 最初,我认为可以使用广播变量来轻松实现这一点:public void startSparkEngine { Broadcast<ReferenceData> r...

37得票6回答
Spark DataFrame:orderBy后groupBy是否保持了排序?

我有一个Spark 2.0的dataframe example,其结构如下:id, hour, count id1, 0, 12 id1, 1, 55 .. id1, 23, 44 id2, 0, 12 id2, 1, 89 .. id2, 23, 34 etc. 每个id都有24个条目(代表...

37得票8回答
如何将Spark Streaming数据框写入Kafka主题

我正在使用Spark Streaming在两个Kafka队列之间处理数据,但似乎找不到一个好的方法从Spark向Kafka写入数据。我尝试过以下内容:input.foreachRDD(rdd => rdd.foreachPartition(partition => pa...

37得票4回答
如何在YARN客户端模式下使用spark-shell查明ClosedChannelExceptions的原因?

我一直在尝试以YARN 客户端模式运行spark-shell,但是我遇到了很多ClosedChannelException错误。我正在使用为Hadoop 2.6构建的Spark 2.0.0。 以下是异常:$ spark-2.0.0-bin-hadoop2.6/bin/spark-shell ...

35得票1回答
“spark.yarn.executor.memoryOverhead”设置的价值是什么?

spark.yarn.executor.memoryOverhead 在使用 YARN 的 Spark 作业中应该分配给 App,还是只是分配给最大值?