Spark管道示例

4

我是Spark的新手,正在尝试弄清楚pipe方法的工作原理。我有以下Scala代码:

sc.textFile(hdfsLocation).pipe("preprocess.py").saveAsTextFile(hdfsPreprocessedLocation)

hdfsLocation和hdfsPreprocessedLocation的值是正确的。以下代码可从命令行工作,证明了这一点。

hadoop fs -cat hdfsLocation/* | ./preprocess.py | head

当我运行上述Spark代码时,我会得到以下错误。
14/11/25 09:41:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Cannot run program "preprocess.py": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1041)
at org.apache.spark.rdd.PipedRDD.compute(PipedRDD.scala:119)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:135)
at java.lang.ProcessImpl.start(ProcessImpl.java:130)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1022)
... 12 more

为了解决Hadoop streaming的问题,我会使用--files属性,因此我尝试在Spark中使用相同的方法。我使用以下命令启动Spark:
bin/spark-shell --files ./preprocess.py

但是这还是出现了相同的错误。

我找不到使用Spark与外部进程通过管道进行通信的好例子,所以我不确定我的做法是否正确。希望能得到帮助。

谢谢


你能给我 preprocess.py 文件吗?我需要一些管道的示例。谢谢! - DunkOnly
1个回答

2
我不确定这是否是正确答案,所以我不会最终确定,但似乎在本地和集群模式下运行Spark时文件路径是不同的。当没有使用--master运行Spark时,管道命令的路径相对于本地计算机。当使用--master运行Spark时,管道命令的路径是./。 更新: 实际上这不正确。我使用SparkFiles.get()获取文件名。结果,在RDD上调用.pipe()时,命令字符串在驱动程序上评估,然后传递给工作程序。因此,SparkFiles.get()不是获取文件名的恰当方式。文件名应该是./,因为SparkContext.addFile()应该将该文件放置在每个工作程序从中运行的位置上的./。但我对.pipe非常失望,所以我完全删除了我的代码中的.pipe,转而使用我写的PipeUtils对象的.mapPartitions组合here。这实际上更有效率,因为我只需要针对每个分区一次性地承担脚本启动成本,而不是针对每个示例。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接