如何在Hadoop Reduce中获取当前文件名

5

我正在使用WordCount示例,在Reduce函数中,我需要获取文件名。

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
  public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    int sum = 0;
    while (values.hasNext()) {
      sum += values.next().get();
    }
    String filename = ((FileSplit)(.getContext()).getInputSplit()).getPath().getName();
    // ----------------------------^ I need to get the context and filename!
    key.set(key.toString() + " (" + filename + ")");
    output.collect(key, new IntWritable(sum));
  }
}

这是目前修改后的代码,我想要获取文件名以便打印输出。我尝试了以下方法:Java Hadoop:如何创建 Mapper 来将输入文件转换为每个文件中的行数? 但我无法获取 context 对象。
我是 Hadoop 新手,需要帮助。有谁可以提供帮助吗?
3个回答

4

因为您正在使用“旧API”,而构造“context”是“新API”的一部分,所以您无法获取“context”。

相反,请查看此单词计数示例:http://wiki.apache.org/hadoop/WordCount

在这种情况下,请查看reduce函数的签名:

public void reduce(Text key, Iterable<IntWritable> values, Context context) 

看!上下文!请注意,在这个示例中,它从.mapreduce.导入,而不是.mapred.

这是新的hadoop用户经常遇到的问题,所以不要感到难过。通常情况下,您应该坚持使用新的API,原因有很多。但是,一定要非常小心您找到的示例。另外,请注意新旧API是不兼容的(例如,您不能同时拥有一个新的API映射器和一个旧的API减速器)。


只是好奇 - 为什么更喜欢新的API而不是旧的API - 我以为两者都会得到支持 - 也许我不是最新的。 - Praveen Sripati
如何在旧API的reduce函数中获取文件名? - Praveen Kumar Purushothaman

4

使用旧的MR API (org.apache.hadoop.mapred package),在mapper/reducer类中添加以下内容。

String fileName = new String();
public void configure(JobConf job)
{
    filename = job.get("map.input.file");
}

使用新的MR API(org.apache.hadoop.mapreduce包),将以下内容添加到mapper / reducer类中。
String fileName = new String();
protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException
{
    fileName = ((FileSplit) context.getInputSplit()).getPath().toString();
}

2
我用了这种方法,它起作用了!
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
      FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
      String filename = fileSplit.getPath().getName();
      word.set(tokenizer.nextToken());
      output.collect(word, one);
    }
  }
}

如果我能改进它,请告诉我!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接