在Hadoop HDFS目录中解压所有Gzip文件

3
在我的HDFS上,我有一堆gzip文件,我想将它们解压到普通格式。 是否有API可以做到这一点? 或者我该如何编写一个函数来完成这个任务?
我不想使用任何命令行工具; 相反,我想通过编写Java代码来完成此任务。
2个回答

6
你需要一个CompressionCodec来解压文件。gzip的实现是GzipCodec。通过编解码器获取CompressedInputStream,然后使用简单的IO输出结果。就像这样:假设你有一个名为file.gz的文件。
//path of file
String uri = "/uri/to/file.gz";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);

CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// the correct codec will be discovered by the extension of the file
CompressionCodec codec = factory.getCodec(inputPath);

if (codec == null) {
    System.err.println("No codec found for " + uri);
    System.exit(1);
}

// remove the .gz extension
String outputUri =
    CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

InputStream is = codec.createInputStream(fs.open(inputPath));
OutputStream out = fs.create(new Path(outputUri));
IOUtils.copyBytes(is, out, conf);

// close streams

更新

如果您需要获取目录中的所有文件,则应获取FileStatus,如下所示:

FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir"));

然后只需循环。
for (FileStatus status: statuses) {
    CompressionCodec codec = factory.getCodec(status.getPath());
    ...
    InputStream is = codec.createInputStream(fs.open(status.getPath());
    ...
}

实际上,我所有的gzip文件都存储在HDFS上的一个目录中,该目录中有一堆文件。我想遍历目录中的每个文件,将其解压缩,然后将结果文件存储在新目录中。为了获取目录中的文件列表,我正在使用以下代码:List<File> listFiles = (List<File>) FileUtils.listFiles(temporaryDirectory,null,true);那么如何更改上述代码以匹配此操作呢?抱歉,但我对所有这些都感到非常困惑,这就是我提出问题的原因。感谢您的帮助。 - user3690321
那么我的OutputStream仍然与以下代码相同: OutputStream out = fs.create(new Path("hdfs/ouput")); 对吗?这样会继续将我的InputStream文件复制到OutputStream路径中吗?还是我错了? - user3690321
只需尝试并查看结果。如果不起作用,请告诉我。 - Paul Samsotha
好的,最后一个问题。那么您是否也删除了 .gz 扩展名的文件?还是它们仍然存在于输入路径中? - user3690321
不,不是删除文件,只是删除路径的扩展名(这样当你保存时,.gz扩展名就不会出现,文件名仍然相同)。实际文件仍将存在。如果您想要删除它们,可以这样做。但我建议您在删除任何内容之前先练习使其正常工作(如果这是您想要的)。但是,像我说的那样,先尝试一下,玩一下,如果有问题,请在实际尝试后让我知道。 - Paul Samsotha
显示剩余2条评论

1
我使用Scalding编写的身份映射Hadoop作业来更改压缩/更改分割大小等。
class IdentityMap(args: Args) extends ConfiguredJob(args) {
  CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity)
  .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out")))
}

通用配置抽象类:

abstract class ConfiguredJob(args: Args) extends Job(args) {
  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
    val Megabyte = 1024 * 1024
    val conf = super.config(mode)
    val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte
    val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte
    val jobPriority = args.getOrElse("jobPriority","NORMAL")
    val maxHeap = args.getOrElse("maxHeap","512m")
    conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap),
      "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"),
      "mapred.min.split.size" -> splitSizeMin.toString,
      "mapred.max.split.size" -> splitSizeMax.toString,
//      "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag
      "mapred.job.priority" -> jobPriority)
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接