How do I Combine or Merge Small ORC files into Lar

2020-01-29 09:57发布

问题:

Most questions/answers on SO and the web discuss using Hive to combine a bunch of small ORC files into a larger one, however, my ORC files are log files which are separated by day and I need to keep them separate. I only want to "roll-up" the ORC files per day (which are directories in HDFS).

I need to write the solution in Java most likely and have come across OrcFileMergeOperator which may be what I need to use, but still too early to tell.

What is the best approach to solving this issue?

回答1:

There are good answers here, but none of those allow me to run a cron job so that I can do daily roll-ups. We have journald log files writing daily to HDFS and I do not want to run a query in Hive every day when I come in.

What I ended up doing seemed more straightforward to me. I wrote a Java program that uses the ORC libraries to scans all files in a directory and creates an List of those files. Then opens a new Writer which is the "combined" file (which starts with a "." so it is hidden from Hive or else Hive will fail). The program then opens each file in the List and reads the contents and writes out to the combined file. When all files have been read, it deletes the files. I also added capability to run one directory at at time in case that was needed.

NOTE: You will need a schema file. Journald logs can be output in json "journalctl -o json" and then you can use the Apache ORC tools to generate a schema file or you can do one by hand. The auto-gen from ORC is good but manual is always better.

NOTE: To use this code as-is, you will need to a valid keytab and add -Dkeytab= in your classpath.

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}


回答2:

You do not need to re-invent the wheel.

ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE can be used to merge small ORC files into a larger file since Hive 0.14.0. The merge happens at the stripe level, which avoids decompressing and decoding the data. It works fast. I'd suggest to create an external table partitioned by day (partitions are directories), then merge them all specifying PARTITION (day_column) as a partition spec.

See here: LanguageManual+ORC



标签: java hive hdfs orc