将多个序列文件合并成一个Hadoop序列文件

6

如何在Hadoop中将多个序列文件合并为一个序列文件

谢谢。

可能是重复的问题:在Reduce阶段后合并输出文件 - Shahryar
4个回答

5

如果您想将多个文件合并为单个文件,则有两种方法:

本地语言


getmerge

用法: hadoop fs -getmerge <src> <localdst>

将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件。可选地,可以设置addnl以启用在每个文件末尾添加换行符。



Java API


org.apache.hadoop.fs.FileUtil.copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString);

将一个目录中的所有文件复制到一个输出文件中(合并)

复制到HDFS

put

用法:hadoop dfs -put <localsrc> ... <dst>

将单个源文件或多个源文件从本地文件系统复制到目标文件系统。还可以从stdin读取输入并写入目标文件系统。

copyFromLocal

用法: hadoop dfs -copyFromLocal <localsrc> URI

与put命令类似,不同之处在于源文件限制为本地文件引用。


谢谢。但我想将输出文件输出到HDFS。 - cldo
@cldo 我修改了我的答案,请看一下。 - saurabh shashank
3
我使用getmerge命令将本地文件合并后放入hdfs中,然后在该文件上运行Hadoop作业。但是出现了错误:java.io.IOException: File is corrupt!at org.apache.hadoop.io.SequenceFile$Reader.readBlock(SequenceFile.java:1734)。 - cldo
4
是的,“getmerge”字面上将输入文件的字节连接起来。这对于文本文件来说很好用,但对于序列文件,您需要智能键值合并。最重要的是,您不希望一个文件的头部被复制到另一个文件中,然后被解释为记录条目。 - Ben Sidhom

4
你有没有考虑使用forqlift?我编写了它来处理特定的SequenceFile任务,包括SequenceFile合并。在你的情况下,你可以运行:
forqlift seq2seq --file new_combined_file.seq  \
    original_file1.seq  original_file2.seq original_file3.seq ...

虽然 forqlift 的 seq2seq 工具被标记为“实验性”... 但在我的(尽管有限的)内部测试中表现良好。


2
太好了!你有没有考虑以更容易让其他人贡献的方式托管源代码? - Ben Sidhom

3
不能使用hadoop getmerge来合并序列文件,因为它会将它们作为二进制文件进行合并,而不是作为序列文件(所以在合并后的文件中会有很多头信息...)。
因此,你可以像@Donald-miner建议的那样编写一个只有单个reduce器的小hadoop作业,或者通过使用SequenceFile.ReaderSeuquenceFile.Writer编写一个独立的合并器。

我选择了第二个选项,这是我的代码:

package ru.mail.go.webbase.markov.hadoop.utils;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFilesUtils {
    private static final Configuration conf = HBaseConfiguration.create();

public static <K, V> void merge(Path fromDirectory, Path toFile, Class<K> keyClass, Class<V> valueClass) throws IOException {
    FileSystem fs = FileSystem.get(conf);

    if (!fs.isDirectory(fromDirectory)) {
        throw new IllegalArgumentException("'" + fromDirectory.toString() + "' is not a directory");
    }

    SequenceFile.Writer writer = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(toFile),
            SequenceFile.Writer.keyClass(keyClass),
            SequenceFile.Writer.valueClass(valueClass)
            );

    for (FileStatus status : fs.listStatus(fromDirectory)) {
        if (status.isDirectory()) {
            System.out.println("Skip directory '" + status.getPath().getName() + "'");
            continue;
        }

        Path file = status.getPath();

        if (file.getName().startsWith("_")) {
            System.out.println("Skip \"_\"-file '" + file.getName() + "'"); //There are files such "_SUCCESS"-named in jobs' ouput folders 
            continue;
        }

        System.out.println("Merging '" + file.getName() + "'");

        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

        while (reader.next(key, value)) {
            writer.append(key, value);
        }

        reader.close();
    }

    writer.close();
}
}

这是我的测试:

public class SequenceFilesUtilsTest {
private static final String OUT_PATH = "./UNIVERSE/SequenceFilesUtilsTest/";

@Before
public void initEnviroment() throws IOException {
    TestUtils.createDirectory(OUT_PATH);
    TestUtils.createDirectory(OUT_PATH + "/in");
}

@Test
public void test() throws Exception {
    Configuration conf = HBaseConfiguration.create();

    Path inPath1 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in1.seq");
    System.out.println("Saving first part to '" + inPath1 + "'");
    SequenceFile.Writer writer1 = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(inPath1),
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class)
            );
    writer1.append(new LongWritable(101), new Text("FIRST1"));
    writer1.append(new LongWritable(102), new Text("FIRST2"));
    writer1.append(new LongWritable(103), new Text("FIRST3"));
    writer1.append(new LongWritable(104), new Text("FIRST4"));
    writer1.close();

    Path inPath2 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in2.seq");
    System.out.println("Saving second part to '" + inPath2 + "'");
    SequenceFile.Writer writer2 = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(inPath2),
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class)
            );
    writer2.append(new LongWritable(201), new Text("SND1"));
    writer2.append(new LongWritable(202), new Text("SND2"));
    writer2.append(new LongWritable(203), new Text("SND3"));
    writer2.close();

    SequenceFilesUtils.merge(
            new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in"),
            new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq"),
            LongWritable.class,
            Text.class);

    Path mergedPath = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq");
    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(mergedPath));
    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);

    reader.next(key, value);
    Assert.assertEquals(101, key.get());
    Assert.assertEquals("FIRST1", value.toString());

    reader.next(key, value);
    Assert.assertEquals(102, key.get());
    Assert.assertEquals("FIRST2", value.toString());

    reader.next(key, value);
    Assert.assertEquals(103, key.get());
    Assert.assertEquals("FIRST3", value.toString());

    reader.next(key, value);
    Assert.assertEquals(104, key.get());
    Assert.assertEquals("FIRST4", value.toString());

    reader.next(key, value);
    Assert.assertEquals(201, key.get());
    Assert.assertEquals("SND1", value.toString());

    reader.next(key, value);
    Assert.assertEquals(202, key.get());
    Assert.assertEquals("SND2", value.toString());

    reader.next(key, value);
    Assert.assertEquals(203, key.get());
    Assert.assertEquals("SND3", value.toString());

    reader.close();
}
}

3

如果您正在处理大量序列文件,则建议编写一个MapReduce作业,使用Mapper作为mapper和Reducer作为reducer。 对于i/o格式,请使用SequenceFileInputFormatSequenceFileOutputFormat将reducer的数量设置为1。 这些都是在driver/main代码中的Configuration和Job对象中设置的内容。请参见如何设置输出格式, 如何设置输入格式, 如何设置mapper如何设置reducer
请注意,MapperReducer的默认行为是不对数据进行任何处理 - 只是将其传递。这就是为什么在这里不编写映射函数或缩减函数的原因。
这将加载您的序列文件,在映射器中不对数据进行任何操作,将所有记录洗牌到缩减器中,然后将它们全部输出到一个文件中。这会导致在输出序列文件中对键进行排序的副作用。

1
将reducers的数量设置为1会导致性能下降吗? - Balaji Boggaram Ramanarayan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接