设置:
我有一个在10个节点的分布式Spark集群上运行的Spark作业。我正在对HDFS上的文本文件进行一些处理。该作业可以正常运行,直到最后一步:将输出保存为文本文件。
问题:
我遇到以下堆栈跟踪:
15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s
Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432; modification_time=1428427931000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/home/ds_myuser/tmp/myapp/out/part-00003
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:792)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1162)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:440)
at org.apache.spark.api.java.JavaPairRDD.saveAsTextFile(JavaPairRDD.scala:45)
at com.somecompany.analysis.myapp.control.Main.calculateRow(Main.java:235)
at com.somecompany.analysis.myapp.control.Main.calculatemyapp(Main.java:127)
at com.somecompany.analysis.myapp.control.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
此外,这是我在Java
代码中保存到文件的方式:
result.saveAsTextFile("/home/myuser/tmp/myapp/out");
此外,有时输出目录中只有1个部分文件,有时则没有。这是因为我试图保存到本地文件系统,并且存在竞争条件,因为所有执行程序都尝试写入同一位置吗?但部分文件名是不同的,所以我想这不应该是问题。
非常感谢任何帮助。
编辑1:
注意到另一件事情。奇怪的是,一些临时文件居然归“root”所有,我无法删除:
[myuser@myserver ~]$ rm -rf tmp/myapp/
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/.part-00001.crc': Permission denied
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/part-00001': Permission denied
编辑2:
根据
的建议,我尝试使用coalesce
和repartition
。通过这些更改,作业成功了,但在输出目录中,我只看到了_SUCCESS
文件,没有 part-xxxxx。此外,我正在执行 result.count()
操作,就在coalesce
或repartition
之前,它输出260,因此存在某些最终输出。但是,它并未转换为part文件。
编辑3:
以下是我的代码,它将文件写入驱动程序类中:
System.out.println("Final No. of Output Lines: " + result.count());
result.coalesce(1, true).saveAsTextFile("file:///home/myuser/tmp3");
打印count
后,以下是日志:
Final No. of Output Lines: 260
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/09 11:30:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Main.java:284
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 174 bytes
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Registering RDD 23 (coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Got job 9 (saveAsTextFile at Main.java:284) with 1 output partitions (allowLocal=false)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Final stage: Stage 21(saveAsTextFile at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284), which has no missing parents
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(22392) called with curMem=132730821, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 21.9 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(11900) called with curMem=132753213, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 11.6 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode111.mydomain.com:34468 (size: 11.6 KB, free: 5.2 GB)
15/04/09 11:30:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_17_piece0
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.TaskSchedulerImpl: Adding task set 26.0 with 4 tasks
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 26.0 (TID 36, mynode117.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 26.0 (TID 37, mynode112.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 26.0 (TID 38, mynode115.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 26.0 (TID 39, mynode119.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode115.mydomain.com:51126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode117.mydomain.com:33052 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode115.mydomain.com:34724
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode117.mydomain.com:35651
15/04/09 11:30:07 INFO network.ConnectionManager: Accepted connection from [mynode112.mydomain.com/10.211.26.212:52476]
15/04/09 11:30:07 INFO network.SendingConnection: Initiating connection to [mynode112.mydomain.com/10.211.26.212:56453]
15/04/09 11:30:07 INFO network.SendingConnection: Connected to [mynode112.mydomain.com/10.211.26.212:56453], 1 messages pending
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode119.mydomain.com:39126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode112.mydomain.com:56453 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 26.0 (TID 36) in 356 ms on mynode117.mydomain.com (1/4)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 26.0 (TID 38) in 362 ms on mynode115.mydomain.com (2/4)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode119.mydomain.com:42604
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 26.0 (TID 37) in 796 ms on mynode112.mydomain.com (3/4)
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 26.0 (TID 39) in 829 ms on mynode119.mydomain.com (4/4)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 26 (coalesce at Main.java:284) finished in 0.835 s
15/04/09 11:30:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 26.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO scheduler.DAGScheduler: running: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: waiting: Set(Stage 21)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: failed: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Missing parents for Stage 21: List()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284), which is now runnable
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(53664) called with curMem=132765113, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 52.4 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(19192) called with curMem=132818777, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 18.7 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode111.mydomain.com:34468 (size: 18.7 KB, free: 5.2 GB)
15/04/09 11:30:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284)
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 40, mynode112.mydomain.com, ANY, 1353 bytes)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode112.mydomain.com:56453 (size: 18.7 KB, free: 2.1 GB)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 4 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 199 bytes
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 40) in 441 ms on mynode112.mydomain.com (1/1)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 21 (saveAsTextFile at Main.java:284) finished in 0.447 s
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:284, took 1.381897276 s
[myuser@mynode111 ~]$ ls tmp3/
_SUCCESS
[myuser@mynode111 ~]$
顺便提一下,我正在对HDFS进行处理,并期望在saveAsTextFile()
后,在我启动应用程序的本地文件系统上得到最终输出文件。我希望Spark不会将它写到其他地方(某个节点的本地文件系统中)。
快速更新:
我尝试写入HDFS,而不是本地文件系统,它可以正常工作:
result.coalesce(1, true).saveAsTextFile("hdfs://mynode20.mydomain.com:8020/user/myuser/tmp");
输出:
[myuser@mynode111 ~]$ hadoop fs -ls /user/myuser/tmp
Found 2 items
-rw-r--r-- 3 myuser myuser 0 2015-04-09 11:53 /user/myuser/tmp/_SUCCESS
-rw-r--r-- 3 myuser myuser 12470 2015-04-09 11:53 /user/myuser/tmp/part-00000
[myuser@mynode111 ~]$
coalesce
,作业成功了,但是我只在输出目录中看到了_SUCCESS
文件,没有看到任何part-xxxxx
文件。 - Bhushancount()
)。 - Marius Soutier