绕过 org.apache.hadoop.mapred.InvalidInputException: 输入模式 s3n://[...] 匹配 0 个文件

11

这是我已经在Spark用户邮件列表上提出过的问题,我希望在这里能够得到更多的成功。

虽然Spark与此直接相关,但我不确定它是否与我无法轻松解决该问题有关。

我正在尝试使用各种模式从S3获取一些文件。我的问题是其中一些模式可能什么都不返回,当它们这样做时,我会收到以下异常:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
    ... 2 more

我想要一种忽略缺失文件并在这种情况下什么也不做的方法。我认为问题在于,我不知道一个模式是否会返回任何内容,直到它实际执行,并且Spark仅在发生操作(这里是reduceByKey部分)时开始处理数据。因此,我不能只在某个地方捕获错误,然后让事情继续进行。

一种解决方案是强制Spark逐个处理每个路径,但这可能会在速度和/或内存方面付出很多代价,因此我正在寻找另一种有效的选择。

我正在使用Spark 0.9.1。 谢谢

2个回答

4

好的,我深入研究了一下Spark,并且感谢某人在Spark用户列表上的指导,我想我明白了:

sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
    .map(new Function<Tuple2<LongWritable, Text>, String>() {
        @Override
        public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
            return arg0._2.toString();
        }
    })
    .count();

还有EmptiableTextInputFormat,它可以实现魔法:

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class EmptiableTextInputFormat extends TextInputFormat {
    @Override
    public List<InputSplit> getSplits(JobContext arg0) throws IOException {
        try {
            return super.getSplits(arg0);
        } catch (InvalidInputException e) {
            return Collections.<InputSplit> emptyList();
        }
    }
}

可以最终检查 InvalidInputException 的消息,以获取更精确的信息。


有没有办法在 SparkContext.sequenceFile() 上实现相同的逻辑? - Roee Gavirel

2

如果您想要一个快速的技巧,这里有一个使用sc.wholeTextFiles的示例。

def wholeTextFilesIgnoreErrors(path: String, sc: SparkContext): RDD[(String, String)] = {
    // TODO This is a bit hacky, probabally ought to work out a better way using lower level hadoop api

    sc.wholeTextFiles(path.split(",").filter(subPath => Try(sc.textFile(subPath).take(1)).isSuccess).mkString(","))
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接