How do I Combine or Merge Small ORC files into Larger ORC file?
Asked Answered
H

3

9

Most questions/answers on SO and the web discuss using Hive to combine a bunch of small ORC files into a larger one, however, my ORC files are log files which are separated by day and I need to keep them separate. I only want to "roll-up" the ORC files per day (which are directories in HDFS).

I need to write the solution in Java most likely and have come across OrcFileMergeOperator which may be what I need to use, but still too early to tell.

What is the best approach to solving this issue?

Hudak answered 26/4, 2018 at 11:48 Comment(0)
H
12

There are good answers here, but none of those allow me to run a cron job so that I can do daily roll-ups. We have journald log files writing daily to HDFS and I do not want to run a query in Hive every day when I come in.

What I ended up doing seemed more straightforward to me. I wrote a Java program that uses the ORC libraries to scans all files in a directory and creates an List of those files. Then opens a new Writer which is the "combined" file (which starts with a "." so it is hidden from Hive or else Hive will fail). The program then opens each file in the List and reads the contents and writes out to the combined file. When all files have been read, it deletes the files. I also added capability to run one directory at at time in case that was needed.

NOTE: You will need a schema file. Journald logs can be output in json "journalctl -o json" and then you can use the Apache ORC tools to generate a schema file or you can do one by hand. The auto-gen from ORC is good but manual is always better.

NOTE: To use this code as-is, you will need to a valid keytab and add -Dkeytab= in your classpath.

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}
Hudak answered 27/4, 2018 at 18:51 Comment(7)
Seriously, someone downvoted my own solution? Then offer a better alternative, don't just down vote and move on.Hudak
any chance you know how to disable the auth (Kerberos) if not required?Mutation
@Mutation did you figure out how to disable auth (kerberos)?Swartz
@Swartz I have a repo with some code I could share. Can you remind me Tuesday? Down with covid right now.Mutation
@Swartz Let me know if this snippet helps you at all goonlinetools.com/snapshot/code/#ktak5lzugmoyapeyipuy8eMutation
@Mutation sure - Hope you feel better, tc!Swartz
@Mutation Thanks! Ryan, I did make some progress with the code snippet you shared but now I am running into this error - java.io.IOException: There is no primary group for UGI dummyuser (auth:SIMPLE)Swartz
Q
16

You do not need to re-invent the wheel.

ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE can be used to merge small ORC files into a larger file since Hive 0.14.0. The merge happens at the stripe level, which avoids decompressing and decoding the data. It works fast. I'd suggest to create an external table partitioned by day (partitions are directories), then merge them all specifying PARTITION (day_column) as a partition spec.

See here: LanguageManual+ORC

Quattlebaum answered 26/4, 2018 at 12:19 Comment(8)
Unfortunately, I do not have a "day_column" as the data is an output of "journald -o json" which is converted into ORC and stored in an external table in HDFS with directory structure of yyyy/mm/dd/file1.orc file2.orc file3.orc, etc. All timestamps are in epoch time.Hudak
@ChrisC But you can create new external table on top of data root folder with additional partition column, then mount partitions to the locations yyyy/mm/dd, then use hive to concatenate all partitions. 1) Create external table + partition by load_day. 2) Alter table add partition, etc to mount them all. 3) concatenate using HiveQuattlebaum
Does CONCATENATE work on external tables as well? I understand it doesn't.Commercial
@OmarAli Sure of course, it WORKS. External or manged, does not matter. The only difference between external and managed is DROP table behavior. Managed table DROP will drop data as well. External table drop will drop only table definition. Also you can create few different tables on top of the same directory in HDFS at a time.Quattlebaum
Error: Error while compiling statement: FAILED: SemanticException org.apache.hadoop.hive.ql.parse.SemanticException: Concatenate/Merge can only be performed on managed tables (state=42000,code=40000)Aurignacian
@FoxanNg Interesting. What is your Hive version? You can make4 the table managed ALTER TABLE abc SET TBLPROPERTIES('EXTERNAL'='TRUE') and after concatenating, make it external againQuattlebaum
@FoxanNg yes, it was added in 3.0.0, 2.4.0 versions of Hive in issues.apache.org/jira/browse/HIVE-17403Quattlebaum
Can't believe this is so difficult to find. BIG THANK YOU!Schnitzler
H
12

There are good answers here, but none of those allow me to run a cron job so that I can do daily roll-ups. We have journald log files writing daily to HDFS and I do not want to run a query in Hive every day when I come in.

What I ended up doing seemed more straightforward to me. I wrote a Java program that uses the ORC libraries to scans all files in a directory and creates an List of those files. Then opens a new Writer which is the "combined" file (which starts with a "." so it is hidden from Hive or else Hive will fail). The program then opens each file in the List and reads the contents and writes out to the combined file. When all files have been read, it deletes the files. I also added capability to run one directory at at time in case that was needed.

NOTE: You will need a schema file. Journald logs can be output in json "journalctl -o json" and then you can use the Apache ORC tools to generate a schema file or you can do one by hand. The auto-gen from ORC is good but manual is always better.

NOTE: To use this code as-is, you will need to a valid keytab and add -Dkeytab= in your classpath.

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}
Hudak answered 27/4, 2018 at 18:51 Comment(7)
Seriously, someone downvoted my own solution? Then offer a better alternative, don't just down vote and move on.Hudak
any chance you know how to disable the auth (Kerberos) if not required?Mutation
@Mutation did you figure out how to disable auth (kerberos)?Swartz
@Swartz I have a repo with some code I could share. Can you remind me Tuesday? Down with covid right now.Mutation
@Swartz Let me know if this snippet helps you at all goonlinetools.com/snapshot/code/#ktak5lzugmoyapeyipuy8eMutation
@Mutation sure - Hope you feel better, tc!Swartz
@Mutation Thanks! Ryan, I did make some progress with the code snippet you shared but now I am running into this error - java.io.IOException: There is no primary group for UGI dummyuser (auth:SIMPLE)Swartz
B
2

Here is a small script in Python using PyORC to concatenate the small ORC files together. I know it doesn't directly answer your question because it's not in Java, but I find it way simpler than the current solutions, or than using Hive.

import pyorc
import argparse


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-o', '--output', type=argparse.FileType(mode='wb'))
    parser.add_argument('files', type=argparse.FileType(mode='rb'), nargs='+')
    args = parser.parse_args()

    schema = str(pyorc.Reader(args.files[0]).schema)

    with pyorc.Writer(args.output, schema) as writer:
        for i, f in enumerate(args.files):
            reader = pyorc.Reader(f)
            if str(reader.schema) != schema:
                raise RuntimeError(
                    "Inconsistent ORC schemas.\n"
                    "\tFirst file schema: {}\n"
                    "\tFile #{} schema: {}"
                    .format(schema, i, str(reader.schema))
                )
            for line in reader:
                writer.write(line)


if __name__ == '__main__':
    main()
Batson answered 7/4, 2021 at 7:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.