Hadoop multiple outputs with speculative execution
Asked Answered
M

2

7

I have a task which writes avro output in multiple directories organized by few fields of the input records.

For example : 
Process records of countries across years 
and write in a directory structure of country/year 
eg:
outputs/usa/2015/outputs_usa_2015.avro 
outputs/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

What output commiter would the below code use to write the output.Is it not safe to be used with speculative execution? With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

In this post Hadoop Reducer: How can I output to multiple directories using speculative execution? It is suggested to use a custom output committer

The below code from hadoop AvroMultipleOutputs does not state any problem with speculative execution

 private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

    writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

Neither does the write method document any issues if baseoutput path is outside the job directory

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

Is there a real issue with AvroMultipleOutputs (an other outputs) with speculative execution when writing outside the job directory? If,then how do i override AvroMultipleOutputs to have it's own output committer.I can't see any outputformat inside AvroMultipleOutputs whose output committer it uses

Mirisola answered 5/5, 2015 at 19:43 Comment(3)
Did you write your own implementation? I have the same question.Besse
When you say "With speculative execution this causes(may cause) org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException", have you seen this documented anywhere, or are you speaking from experience. We are seeing the same behavior but have not found any explicit references to disable speculative execution when using multiple outputs.Adelaadelaida
Yes it is documented. There is a warning about it here archive.cloudera.com/cdh5/cdh/5/hadoop/api/org/apache/hadoop/…Mirisola
K
1

AvroMultipleOutputs will use the OutputFormat which you have registered to Job configurations while adding named output e.g using addNamedOutput API from AvroMultipleOutputs (e.g. AvroKeyValueOutputFormat).

With AvroMultipleOutputs, you might not be able to use speculative task execution feature. Even overriding it either would not help or would not be simple.

Instead you should write your own OutputFormat (most probably extending one of the available Avro output formats e.g. AvroKeyValueOutputFormat), and override/implement its getRecordWriter API, where it would return one RecordWriter instance say MainRecordWriter (just for reference).

This MainRecordWriterwould maintain a map of RecordWriter (e.g. AvroKeyValueRecordWriter) instances. Each of these RecordWriter instances would belong to one of the output file. In write API of MainRecordWriter, you would get the actual RecordWriter instance from the map (based on the record you are going to write), and write the record using this record writer. So MainRecordWriter would be just working as a wrapper over multiple RecordWriter instances.

For some similar implementation, you might like to study the code of MultiStorage class from piggybank library.

Kweiyang answered 14/5, 2015 at 20:1 Comment(0)
C
0

When you add a named output to AvroMultipleOutputs, it will call either AvroKeyOutputFormat.getRecordWriter() or AvroKeyValueOutputFormat.getRecordWriter(), which call AvroOutputFormatBase.getAvroFileOutputStream(), whose content is

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
  Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
  return path.getFileSystem(context.getConfiguration()).create(path);
}

And AvroOutputFormatBase extends FileOutputFormat (the getOutputCommitter() in the above method is in fact a call to FileOutputFormat.getOutputCommitter(). Hence, AvroMultipleOutputs should have the same constraints as MultipleOutputs.

Corvin answered 14/5, 2015 at 23:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.