how to prevent hadoop job to fail on corrupted input file
Asked Answered
E

3

8

I'm running hadoop job on many input files. But if one of the files is corrupted the whole job is fails.

How can I make the job to ignore the corrupted file? maybe write for me some counter/error log but not fail the whole job

Escalante answered 17/2, 2013 at 10:41 Comment(0)
A
7

It depends on where your job is failing - if a line is corrupt, and somewhere in your map method an Exception is thrown then you should just be able to wrap the body of your map method with a try / catch and just log the error:

protected void map(LongWritable key, Text value, Context context) {
  try {
    // parse value to a long
    int val = Integer.parseInt(value.toString());

    // do something with key and val..
  } catch (NumberFormatException nfe) {
    // log error and continue
  }
}

But if the error is thrown by your InputFormat's RecordReader then you'll need to amend the mappers run(..) method - who's default implementation is as follows:

public void run(Context context) {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);
}

So you could amend this to try and catch the exception on the context.nextKeyValue() call but you have to be careful on just ignoring any errors thrown by the reader - an IOExeption for example may not be 'skippable' by just ignoring the error.

If you have written your own InputFormat / RecordReader, and you have a specific exception which denotes record failure but will allow you to skip over and continue parsing, then something like this will probably work:

public void run(Context context) {
  setup(context);
  while (true) {
    try {
      if (!context.nextKeyValue()) { 
        break;
      } else {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } catch (SkippableRecordException sre) {
      // log error
    }

  }
  cleanup(context);
}

But just to re-itterate - your RecordReader must be able to recover on error otherwise the above code could send you into an infinite loop.

For your specific case - if you just want to ignore a file upon the first failure then you can update the run method to something much simpler:

public void run(Context context) {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  } catch (Exception e) {
    // log error
  }
}

Some final words of warning:

  • You need to make sure that it isn't your mapper code which is causing the exception to be thrown, otherwise you'll be ignoring files for the wrong reason
  • GZip compressed files which are not GZip compressed will actually fail in the initialization of the record reader - so the above will not catch this type or error (you'll need to write your own record reader implementation). This is true for any file error that is thrown during record reader creation
Allerie answered 17/2, 2013 at 14:7 Comment(5)
It throws me: cascading.tuple.TupleException: unable to read from input identifier: hdfs://master/input/2013_1_17/file.close.uploaded.gz at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:127) Caused by: java.io.EOFException: Unexpected end of input stream at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:99)Escalante
I'm also unable to gunzip this file on linuxEscalante
Sounds like a GZip file that wasn't closed out properly. You may be able to read line by line until you reach the error, and just ignore the error (you'll have some data loss). You should also investigate why the GZip file wasn't properly closed outAllerie
Thanks a lot. Can I do such a thing in cascading?Escalante
I'm not too familiar with cascading so i'd post another follow-up questionAllerie
S
2

This is what Failure Traps are used for in cascading:

Whenever an operation fails and throws an exception, if there is an associated trap, the offending Tuple is saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.

This will essentially let your job continue and let you check your corrupt files later

If you are somewhat familiar with cascading in your flow definition statement:

    new FlowDef().addTrap( String branchName, Tap trap );

Failure Traps

Spinule answered 9/4, 2013 at 21:30 Comment(0)
P
0

There is also another possible way. You could use mapred.max.map.failures.percent configuration option. Of course this way of solving this problem could also hide some other problems occurring during map phase.

Peeper answered 19/4, 2013 at 12:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.