为什么Spark作业无法写入输出?

8

设置:

我有一个在10个节点的分布式Spark集群上运行的Spark作业。我正在对HDFS上的文本文件进行一些处理。该作业可以正常运行,直到最后一步:将输出保存为文本文件。

问题:

我遇到以下堆栈跟踪:

15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s
Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432; modification_time=1428427931000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/home/ds_myuser/tmp/myapp/out/part-00003
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
    at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
    at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:792)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1162)
    at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:440)
    at org.apache.spark.api.java.JavaPairRDD.saveAsTextFile(JavaPairRDD.scala:45)
    at com.somecompany.analysis.myapp.control.Main.calculateRow(Main.java:235)
    at com.somecompany.analysis.myapp.control.Main.calculatemyapp(Main.java:127)
    at com.somecompany.analysis.myapp.control.Main.main(Main.java:103)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

此外,这是我在Java代码中保存到文件的方式:

result.saveAsTextFile("/home/myuser/tmp/myapp/out");

此外,有时输出目录中只有1个部分文件,有时则没有。这是因为我试图保存到本地文件系统,并且存在竞争条件,因为所有执行程序都尝试写入同一位置吗?但部分文件名是不同的,所以我想这不应该是问题。
非常感谢任何帮助。
编辑1:
注意到另一件事情。奇怪的是,一些临时文件居然归“root”所有,我无法删除:
[myuser@myserver ~]$ rm -rf tmp/myapp/
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/.part-00001.crc': Permission denied
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/part-00001': Permission denied

编辑2:

根据

的建议,我尝试使用coalescerepartition。通过这些更改,作业成功了,但在输出目录中,我只看到了_SUCCESS文件,没有 part-xxxxx。此外,我正在执行 result.count() 操作,就在coalescerepartition之前,它输出260,因此存在某些最终输出。但是,它并未转换为part文件。

编辑3:

以下是我的代码,它将文件写入驱动程序类中:

    System.out.println("Final No. of Output Lines: " + result.count());
    result.coalesce(1, true).saveAsTextFile("file:///home/myuser/tmp3");

打印count后,以下是日志:

Final No. of Output Lines: 260
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/09 11:30:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Main.java:284
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 174 bytes
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Registering RDD 23 (coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Got job 9 (saveAsTextFile at Main.java:284) with 1 output partitions (allowLocal=false)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Final stage: Stage 21(saveAsTextFile at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284), which has no missing parents
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(22392) called with curMem=132730821, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 21.9 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(11900) called with curMem=132753213, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 11.6 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode111.mydomain.com:34468 (size: 11.6 KB, free: 5.2 GB)
15/04/09 11:30:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_17_piece0
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.TaskSchedulerImpl: Adding task set 26.0 with 4 tasks
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 26.0 (TID 36, mynode117.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 26.0 (TID 37, mynode112.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 26.0 (TID 38, mynode115.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 26.0 (TID 39, mynode119.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode115.mydomain.com:51126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode117.mydomain.com:33052 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode115.mydomain.com:34724
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode117.mydomain.com:35651
15/04/09 11:30:07 INFO network.ConnectionManager: Accepted connection from [mynode112.mydomain.com/10.211.26.212:52476]
15/04/09 11:30:07 INFO network.SendingConnection: Initiating connection to [mynode112.mydomain.com/10.211.26.212:56453]
15/04/09 11:30:07 INFO network.SendingConnection: Connected to [mynode112.mydomain.com/10.211.26.212:56453], 1 messages pending
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode119.mydomain.com:39126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode112.mydomain.com:56453 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 26.0 (TID 36) in 356 ms on mynode117.mydomain.com (1/4)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 26.0 (TID 38) in 362 ms on mynode115.mydomain.com (2/4)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode119.mydomain.com:42604
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 26.0 (TID 37) in 796 ms on mynode112.mydomain.com (3/4)
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 26.0 (TID 39) in 829 ms on mynode119.mydomain.com (4/4)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 26 (coalesce at Main.java:284) finished in 0.835 s
15/04/09 11:30:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 26.0, whose tasks have all completed, from pool 
15/04/09 11:30:08 INFO scheduler.DAGScheduler: running: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: waiting: Set(Stage 21)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: failed: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Missing parents for Stage 21: List()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284), which is now runnable
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(53664) called with curMem=132765113, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 52.4 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(19192) called with curMem=132818777, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 18.7 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode111.mydomain.com:34468 (size: 18.7 KB, free: 5.2 GB)
15/04/09 11:30:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284)
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 40, mynode112.mydomain.com, ANY, 1353 bytes)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode112.mydomain.com:56453 (size: 18.7 KB, free: 2.1 GB)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 4 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 199 bytes
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 40) in 441 ms on mynode112.mydomain.com (1/1)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 21 (saveAsTextFile at Main.java:284) finished in 0.447 s
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool 
15/04/09 11:30:08 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:284, took 1.381897276 s
[myuser@mynode111 ~]$ ls tmp3/
_SUCCESS
[myuser@mynode111 ~]$ 

顺便提一下,我正在对HDFS进行处理,并期望在saveAsTextFile()后,在我启动应用程序的本地文件系统上得到最终输出文件。我希望Spark不会将它写到其他地方(某个节点的本地文件系统中)。

快速更新:

我尝试写入HDFS,而不是本地文件系统,它可以正常工作:

result.coalesce(1, true).saveAsTextFile("hdfs://mynode20.mydomain.com:8020/user/myuser/tmp");

输出:

[myuser@mynode111 ~]$ hadoop fs -ls /user/myuser/tmp
Found 2 items
-rw-r--r--   3 myuser myuser          0 2015-04-09 11:53 /user/myuser/tmp/_SUCCESS
-rw-r--r--   3 myuser myuser      12470 2015-04-09 11:53 /user/myuser/tmp/part-00000
[myuser@mynode111 ~]$ 

用户是谁在执行该作业? - Marius Soutier
@MariusSoutier:尝试使用coalesce,作业成功了,但是我只在输出目录中看到了_SUCCESS文件,没有看到任何part-xxxxx文件。 - Bhushan
这意味着你的作业没有产生任何数据输出。在写入文本之前,尝试打印RDD中元素的数量(count())。 - Marius Soutier
@MariusSoutier:添加了代码和日志。如果需要更多细节,请告诉我。非常感谢您的关注,我真的很感激。 - Bhushan
好的,它在HDFS上运行良好,但在本地上不行。那么我猜这一定是某些权限/用户配置问题。你可以尝试以用户spark身份运行吗?并且也尝试写入/tmp目录? - Marius Soutier
显示剩余4条评论
2个回答

9

我曾遇到同样的问题,原来是因为我的Spark工作节点以root用户身份运行,而我的任务是以另一个用户身份运行的,所以在调用saveAsTextFile时,Spark工作节点首先将数据保存到磁盘上的临时位置,其所有者为root用户,然后以不同用户身份运行的Spark任务试图将由root所有的临时数据移动到最终位置,会出现权限问题。


2
嗨@elgoog,看起来我发现了同样的问题。你介意分享一下你是如何解决它的吗? - Tarun Parmar
分辨率必须是停止所有从属和主服务器,然后使用sudo重新启动它们。 - ziad.rida

3
我之前也遇到了同样的问题,后来发现如果选择在独立模式下运行,则驱动程序将由用户运行,执行器进程将由root运行。您需要做的唯一更改是:
首先,使用sbt package创建jar文件,请注意最好由用户而不是root运行sbt package。我曾尝试通过root(sudo)进行sbt包装,然后组合jar文件将被创建在其他某个地方。
在您拥有一个组装的jar文件后,再通过“sudo”进行Spark提交。
sudo /opt/spark-2.0/bin/spark-submit \
   --class ...
   --master ..
   ...

1
如果您展示一个例子,这个答案可能会更好。如果您展示了一个例子,甚至可能会得到一个+1。 - DᴀʀᴛʜVᴀᴅᴇʀ
这也是我遇到的问题。我通过保存在c:/文件夹中解决了它。 - Matthew Son

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接