Java.lang.OutOfMemoryError: 无法获取100字节的内存,实际获取为0。

25

我正在使用以下命令在本地模式下调用Spark 2.0中的Pyspark:

pyspark --executor-memory 4g --driver-memory 4g

输入的数据框是从一个 tsv 文件中读取的,具有580 K x 28列。我对数据框进行了一些操作,然后尝试将其导出到 tsv 文件时遇到了此错误。

df.coalesce(1).write.save("sample.tsv",format = "csv",header = 'true', delimiter = '\t')

请问如何消除这个错误。我可以轻松地显示df或计算行数。

输出数据框有23列,共3100行。

错误信息:

Job aborted due to stage failure: Task 0 in stage 70.0 failed 1 times, most recent failure: Lost task 0.0 in stage 70.0 (TID 1073, localhost): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Unable to acquire 100 bytes of memory, got 0
    at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:129)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
    at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.fetchNextRow(WindowExec.scala:300)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15$$anon$1.<init>(WindowExec.scala:309)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:289)
    at org.apache.spark.sql.execution.WindowExec$$anonfun$15.apply(WindowExec.scala:288)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
    ... 8 more

Driver stacktrace:

1
你尝试过不用 coalesce() 吗?显然你的内存不足了。你的配置是什么? - gsamaras
我已经尝试过不使用coalesce,它可以正常运行。我的配置是Intel i-7处理器,16 GB内存和Windows 7专业版。我以前使用相同的方法导出了其他包含0.5M行和15-20列的文件,并且它也可以正常工作。 - ML_Passion
你是如何解决这个问题的? - zonna
5个回答

22
我认为这个问题的原因是 coalesce()方法,尽管它避免了完整的Shuffle(就像repartition一样),但它必须缩小请求的分区中的数据。

在这里,您要求所有数据适合一个分区,因此一个任务(仅一个任务)必须处理所有数据,这可能会导致其容器受到内存限制的影响。

因此,要么请求多个分区而不是1个,要么在这种情况下避免使用coalesce()方法。


否则,您可以尝试以下链接提供的解决方案来增加内存配置:

  1. Spark java.lang.OutOfMemoryError: Java heap space
  2. Spark runs out of memory when grouping by key

我遇到了相同的问题,我没有使用coalesce并且已经增加了executor-memory,但仍然得到相同的错误信息“org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384 bytes of memory, got 0”。不仅是相同的问题,甚至是相同的值16384...你有什么想法吗? - zonna

21
在我的情况下,将coalesce(1)替换为repartition(1)起作用了。

2
请考虑在您的答案中添加更多信息 - Inder
它对我有效!如果您能解释为什么会发生这种情况,那将非常有帮助。 - Akash Tantri
它对我起作用了!它之所以有效是因为操作是并行的。 - Pedro Henrique

20

我的问题确实出在 coalesce() 上。我所做的是不使用 coalesce(),而是使用 parquet 导出文件,使用如下代码 df.write.parquet("testP")。然后重新读取文件,并使用 coalesce(1) 导出。

希望这对您也有效。


我遇到了同样的问题,我没有使用coalesce并且已经增加了executor-memory,但仍然遇到了相同的问题“org.apache.spark.memory.SparkOutOfMemoryError:无法获取16384字节的内存,得到0”。不仅是相同的问题,甚至是相同的值16384..你有任何想法吗? - zonna

17
如其他答案所述,使用 repartition(1) 而不是 coalesce(1)。原因是 repartition(1) 会确保上游处理在并行(多个任务/分区)中完成,而不仅仅在一个执行器上完成。
引用Spark文档中的Dataset.coalesce()

但是,如果您要进行大规模收缩,例如 numPartitions = 1,则可能导致计算在少于您所需的节点上进行(例如,在 numPartitions = 1 的情况下为一个节点)。为避免此情况,可以调用 instead。这将添加一个洗牌步骤,但意味着当前上游分区将并行执行(根据当前分区设置)。


3

在我的情况下,驱动程序比工作人员小。通过将驱动程序变大来解决了问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接