在Hadoop Map Reduce中重命名部分文件

22

我尝试按照http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html页面上的示例,使用MultipleOutputs类。

驱动程序代码

    Configuration conf = new Configuration();
    Job job = new Job(conf, "Wordcount");
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
            Text.class, IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);

减速器代码

public class WordCountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    private MultipleOutputs<Text, IntWritable> mos;
    public void setup(Context context){
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        //context.write(key, result);
        mos.write("text", key,result);
    }
    public void cleanup(Context context)  {
         try {
            mos.close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
         }
}

Reducer的输出被发现重命名为text-r-00000。

但问题在于我也得到了一个空的part-r-00000文件。这是MultipleOutputs预期的行为吗,还是我的代码出了问题?请给予建议。

我尝试的另一种替代方法是使用FileSystem类遍历我的输出文件夹并手动重命名所有以part开头的文件。

什么是最好的方法?

FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
} 
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
2个回答

21
即使你正在使用MultipleOutputs,默认的OutputFormat(我相信它是TextOutputFormat)仍然会被使用,因此它将会初始化并创建这些你所看到的part-r-xxxxx文件。
它们为空的原因是因为你没有执行任何context.write,因为你正在使用MultipleOutputs。但是这并不会阻止在初始化期间创建它们。
要摆脱它们,你需要定义你的OutputFormat来表明你不希望有任何输出。你可以这样做:
job.setOutputFormat(NullOutputFormat.class);

将该属性设置后,这应该确保您的部件文件根本不会被初始化,但您仍然可以在MultipleOutputs中获得输出。

您还可以使用LazyOutputFormat,这将确保仅在有数据时/如果有数据时才创建输出文件,而不是初始化空文件。您可以这样做:

import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
请注意,您在Reducer中使用的原型是MultipleOutputs.write(String namedOutput, K key, V value),它只使用默认输出路径,该路径将基于您的namedOutput 生成类似于:{namedOutput}-(m|r)-{part-number}的内容。如果您想对输出文件名有更多控制,请使用原型MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath),它可以让您根据键/值在运行时生成文件名。

1
@ Charles:我在下面的帖子中看到了LazyOutputFormat的使用,但我不知道如何使用它。非常感谢您的回复。http://stackoverflow.com/questions/10924852/map-reduce-output-files-part-r-and-part?rq=1 - Arun A K
@Charles:当我尝试使用“NullOutputFormat”时,甚至MultipleOutputs也没有被写入输出路径……这有意义吗?还是我漏了什么? - OhadR

11

要更改输出文件的基本名称,您只需在Driver类中执行以下操作: job.getConfiguration().set("mapreduce.output.basename", "text"); 这将导致文件名被称为“text-r-00000”。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接