loading 1GB data into hbase taking 1 hour
Asked Answered
S

3

5

I want to load 1GB (10 Million Records) CSV file into Hbase. I wrote Map-Reduce Program for it. My Code is working fine but taking 1 hour to complete. Last Reducer is taking more than half an hour time. Could anyone please help me out?

My Code is as follows:

Driver.Java


    package com.cloudera.examples.hbase.bulkimport;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * HBase bulk import example
* Data preparation MapReduce job driver *
    *
  1. args[0]: HDFS input path *
  2. args[1]: HDFS output path *
  3. args[2]: HBase table name *
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game 1 tip-off time (seconds from epoch) * Thu, 03 Jun 2010 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // Load generated HFiles into table // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }

HColumnEnum.java


        package com.cloudera.examples.hbase.bulkimport;

    /**
     * HBase table columns for the 'srv' column family
     */
    public enum HColumnEnum {
      SRV_COL_employeeid ("employeeid".getBytes()),
      SRV_COL_eventdesc ("eventdesc".getBytes()),
      SRV_COL_eventdate ("eventdate".getBytes()),
      SRV_COL_objectname ("objectname".getBytes()),
      SRV_COL_objectfolder ("objectfolder".getBytes()),
      SRV_COL_ipaddress ("ipaddress".getBytes());

      private final byte[] columnName;

      HColumnEnum (byte[] column) {
        this.columnName = column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }

HBaseKVMapper.java

package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import au.com.bytecode.opencsv.CSVParser;

/**
 * HBase bulk import example
 * <p>
 * Parses Facebook and Twitter messages from CSV files and outputs
 * <ImmutableBytesWritable, KeyValue>.
 * <p>
 * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
 * into the correct HBase table region.
 * <p>
 * The KeyValue value holds the HBase mutation information (column family,
 * column, and value)
 */
public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

  final static byte[] SRV_COL_FAM = "srv".getBytes();
  final static int NUM_FIELDS = 6;

  CSVParser csvParser = new CSVParser();
  int tipOffSeconds = 0;
  String tableName = "";

  // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
  //    .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));

  ImmutableBytesWritable hKey = new ImmutableBytesWritable();
  KeyValue kv;

  /** {@inheritDoc} */
  @Override
  protected void setup(Context context) throws IOException,
      InterruptedException {
    Configuration c = context.getConfiguration();

  //  tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
    tableName = c.get("hbase.table.name");
  }

  /** {@inheritDoc} */
  @Override
  protected void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

    /*if (value.find("Service,Term,") > -1) {
      // Skip header
      return;
    }*/

    String[] fields = null;

    try {
      fields = value.toString().split(",");
      //csvParser.parseLine(value.toString());
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
      return;
    }

    if (fields.length != NUM_FIELDS) {
      context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
      return;
    }

    // Get game offset in seconds from tip-off
  /*  DateTime dt = null;

    try {
      dt = p.parseDateTime(fields[9]);
    } catch (Exception ex) {
      context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
      return;
    }

    int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey = String.format("%04d", gameOffset);

    String username = fields[2];
    if (username.equals("")) {
      username = fields[3];
    }*/

    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
        .getBytes());

    // Service columns
    if (!fields[0].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[1].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[2].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[3].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[4].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
      context.write(hKey, kv);
    }

    if (!fields[5].equals("")) {
      kv = new KeyValue(hKey.get(), SRV_COL_FAM,
          HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
      context.write(hKey, kv);
    }


    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);

    /*
     * Output number of messages per quarter and before/after game. This should
     * correspond to the number of messages per region in HBase
     */
  /*  if (gameOffset < 0) {
      context.getCounter("QStats", "BEFORE_GAME").increment(1);
    } else if (gameOffset < 900) {
      context.getCounter("QStats", "Q1").increment(1);
    } else if (gameOffset < 1800) {
      context.getCounter("QStats", "Q2").increment(1);
    } else if (gameOffset < 2700) {
      context.getCounter("QStats", "Q3").increment(1);
    } else if (gameOffset < 3600) {
      context.getCounter("QStats", "Q4").increment(1);
    } else {
      context.getCounter("QStats", "AFTER_GAME").increment(1);
    }*/
  }
}

Please help me to improve the performance or Please let me know if you have any alternate solution with sample code.

MY mapred-site.xml

 <?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
  <name>mapred.job.tracker</name>
    <value>namenode:54311</value>
    </property>

<property>
  <name>mapred.reduce.parallel.copies</name>
    <value>20</value>
    </property>

<property>
  <name>tasktracker.http.threads</name>
    <value>50</value>
    </property>

<property>
  <name>mapred.job.shuffle.input.buffer.percent</name>
    <value>0.70</value>
    </property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>reduce.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.job.shuffle.merge.percent</name>
    <value>0.65</value>
    </property>

<property>
  <name>mapred.task.timeout</name>
    <value>1200000</value>
    </property>

<property>
    <name>mapred.child.java.opts</name>
        <value>-Xms1024M -Xmx2048M</value>
        </property>



<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>-1</value>
    </property>

<property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
</property>

<property>
    <name>mapred.map.output.compression.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<property>
    <name>io.sort.mb</name>
    <value>800</value>
</property>


<property>
  <name>mapred.child.ulimit</name>
    <value>unlimited</value>
    </property>

<property>
<name>io.sort.factor</name>
<value>100</value>
<description>More streams merged at once while sorting files.</description>
</property>  


 <property>
 <name>mapreduce.admin.map.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>
 <property>
 <name>mapreduce.admin.reduce.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>


<property>
   <name>mapred.min.split.size</name>
   <value>0</value>
</property>

<property>
   <name>mapred.job.map.memory.mb</name>
     <value>-1</value>
     </property>

<property>
   <name>mapred.jobtracker.maxtasks.per.job</name>
        <value>-1</value>
             </property>


</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://namenode:54310/hbase</value>
    <description>The directory shared by RegionServers.
    </description>
</property>

<property>
    <name>hbase.master</name>
    <value>slave:60000</value>
    <description>The host and port that the HBase master runs at.
    A value of 'local' runs the master and a regionserver
    in a single process.
    </description>
</property>

<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
    <description>The mode the cluster will be in. Possible values are
    false: standalone and pseudo-distributed setups with managed Zookeeper
    true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
    </description>
</property>

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>slave</value>
    <description>Comma separated list of servers in the ZooKeeper Quorum.
    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
    By default this is set to localhost for local and pseudo-distributed modes
    of operation. For a fully-distributed setup, this should be set to a full
    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
    this is the list of servers which we will start/stop ZooKeeper on.
    </description>
</property>

<property>
       <name>hbase.zookeeper.property.clientPort</name>
       <value>2181</value>
</property>

<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/work/zoo_data</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
</property>

</configuration>

Please help me out so i can improve my performance.

Sartorius answered 2/5, 2014 at 6:6 Comment(8)
Have you tried profiling and monitoring I/O?Johnsonjohnsonese
Any more than a minute sounds pretty slow to me. It sounds like something is wrong and profiling your application would seem to be the best approach. I would start with the process which is using the most CPU.Doodle
Might have something to do with transaction size, if you're not already doing it then I would try batching the inserts into say 5k per transaction.Burtonburty
I have put all best configuration on my mapred-site.xml but still not increase the performance. I am using 5 node cluster. my all node having quard core processors and 8GB RAM. Please suggest how can i improve the performanceSartorius
What is your mapred-site.xml and hbase-site.xml? Do you have any hardware metrics like network, IO, CPU utilization?Leukocyte
@Leukocyte Please find below mapredsite.xml and hbase-site.xmlSartorius
How about your JVM Heap size? And Hadoop JVM Heap size? Did you try to check that? Try to set it at 2048mb minNolly
You mentioned the last reducer is taking more time. Is it possible your keys are not properly partitioning. Meaning first three reducers are getting say 25% of the workload the last one is getting 75% of the 10 million records. Can you hash the keys or create a custom partition method.Ostensive
S
0

I have created only mapper class and take hbase output format class. Now it's taking 10 Min. My Network speed is very slow that is why it's taking long time.

Sartorius answered 26/5, 2014 at 14:14 Comment(1)
This can be achieved through normal java program with less than time interval of mapreduce. please see my answer.Conference
C
5

First thing, why do we need Mapreduce program for loading data in to Hbase for such a small file(1GB).

In my experience I have processed 5GB Json using Jackson streaming( I don't want to take all json in to memory) and persisted in Hbase with in 8 minutes, by using batching technique.

I used hbase puts in batch List objects of 100000 record.

Below is code snippet through which I achieved this. Same thing can be done while parsing other formats as well)

May be you need to call this method in 2 places

1) with Batch of 100000 records.

2) For processing reminder of your batch records are less than 100000

  public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }
Conference answered 2/5, 2016 at 7:51 Comment(0)
S
0

I have created only mapper class and take hbase output format class. Now it's taking 10 Min. My Network speed is very slow that is why it's taking long time.

Sartorius answered 26/5, 2014 at 14:14 Comment(1)
This can be achieved through normal java program with less than time interval of mapreduce. please see my answer.Conference
P
0

It can be further fine tuned by specifying the number of Region splits to be used while creating the Hbase table. As the number of reducers instances for bulk loading will also depend on the number of Regions. This can be done with the following command

hbase org.apache.hadoop.hbase.util.RegionSplitter -c <number of regions> -f <column families> <New Hbase Table Name> <splitAlgorithm>

for split algorithm one can specify

  • UniformSplit - treats keys as arbitrary bytes

  • HexStringSplit - treats keys as hexa decimal ASCII

Pilate answered 25/9, 2014 at 11:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.