如何将小型ORC文件合并成更大的ORC文件?

9
大多数SO和网络上的问题/答案都讨论使用Hive将一堆小ORC文件组合成一个更大的文件,但是我的ORC文件是按天分隔的日志文件,我需要保持它们的分开。我只想每天“卷起”ORC文件(这些文件是HDFS中的目录)。
我很可能需要用Java编写解决方案,并遇到了OrcFileMergeOperator,这可能是我需要使用的东西,但还为时过早。
解决此问题的最佳方法是什么?
3个回答

16
您无需重新发明轮子。 ALTER TABLE table_name [PARTITION partition_spec] CONCATENATEHive 0.14.0 开始可用,可用于将小型 ORC 文件合并为一个大文件。合并是在条带级别进行的,这避免了解压缩和解码数据。它的速度很快。建议创建按日期分区的外部表(分区是目录),然后将它们全部合并指定 PARTITION (day_column) 作为分区规范。
请参见此处:LanguageManual+ORC

1
很不幸,由于数据是“journald -o json”的输出结果,经过转换成ORC格式后存储在一个外部表中,并且在HDFS中的目录结构为 yyyy/mm/dd/file1.orc file2.orc file3.orc 等,因此我没有“day_column”可用。所有时间戳都以epoch时间为准。 - Chris C
3
CONCATENATE函数适用于外部表格吗?据我了解,不适用。 - Omar Ali
2
@OmarAli 当然可以,它是有效的。无论是外部还是托管的表都没有关系。外部表和托管表之间唯一的区别在于DROP表的行为。托管表的DROP将同时删除数据。而外部表的DROP只会删除表定义。此外,您可以同时在HDFS上创建几个不同的表。 - leftjoin
1
@FoxanNg 是的,它是在Hive的3.0.0和2.4.0版本中添加的,具体信息请参见https://issues.apache.org/jira/browse/HIVE-17403。 - leftjoin
1
真不敢相信这么难找。非常感谢! - dz902
显示剩余3条评论

12

这里有一些不错的答案,但没有一个可以让我运行cron作业以便我可以每天进行汇总。我们有journald日志文件每天写入HDFS,我不想每天在工作时都在Hive中运行查询。

我最终采取的做法对我来说似乎更直接。我编写了一个Java程序,使用ORC库扫描目录中的所有文件并创建一个文件列表List。然后打开一个新的Writer,它是“组合”文件(以“.”开头,因此Hive无法看到,否则将失败)。然后程序打开列表中的每个文件,读取内容并写入组合文件。读取完所有文件后,程序会删除这些文件。我还添加了按需要一次处理一个目录的功能。

注意:您需要一个模式文件。Journald日志可以以JSON格式输出“journalctl -o json”,然后可以使用Apache ORC工具生成模式文件,或者手动编写一个。由ORC自动生成的模式很好,但手动编写的模式始终更好。

注意:要原样使用此代码,您需要一个有效的keytab并在类路径中添加-Dkeytab=。

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}

1
严肃点,有人给我自己的解决方案点了踩?那就提供一个更好的替代方案,不要只是点踩然后走开。 - Chris C
1
你知道如何禁用认证(Kerberos),如果不需要的话吗? - Ryan
@Ryan,你解决了如何禁用认证(Kerberos)的问题吗? - Riddle
@Riddle 我有一个存储库,里面有一些代码可以分享。你能在星期二提醒我吗?现在正在与 COVID 对抗。 - Ryan
@Ryan 确定 - 希望你感觉好些,保重! - Riddle
显示剩余2条评论

2
这里有一个使用PyORC的Python小脚本,可以将小的ORC文件连接在一起。虽然它不是用Java编写的,但我觉得它比当前的解决方案或使用Hive更简单易懂。请看下面的代码:

PyORC是一个很好用的工具。

import pyorc
import argparse


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
    parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
    args = parser.parse_args()

    schema = str(pyorc.Reader(args.files[0]).schema)

    with pyorc.Writer(args.output, schema) as writer:
        for i, f in enumerate(args.files):
            reader = pyorc.Reader(f)
            if str(reader.schema) != schema:
                raise RuntimeError(
                    "Inconsistent ORC schemas.\n"
                    "\tFirst file schema: {}\n"
                    "\tFile #{} schema: {}"
                    .format(schema, i, str(reader.schema))
                )
            for line in reader:
                writer.write(line)


if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接