Create Scalding Source like TextLine that combines multiple files into single mappers
Asked Answered
F

2

4

We have many small files that need combining. In Scalding you can use TextLine to read files as text lines. The problem is we get 1 mapper per file, but we want to combine multiple files so that they are processed by 1 mapper.

I understand we need to change the input format to an implementation of CombineFileInputFormat, and this may involve using cascadings CombinedHfs. We cannot work out how to do this, but it should be just a handful of lines of code to define our own Scalding source called, say, CombineTextLine.

Many thanks to anyone who can provide the code to do this.

As a side question, we have some data that is in s3, it would be great if the solution given works for s3 files - I guess it depends on whether CombineFileInputFormat or CombinedHfs works for s3.

Furry answered 28/5, 2014 at 16:45 Comment(0)
M
9

You get the idea in your question, so here is what possibly is a solution for you.

Create your own input format that extends the CombineFileInputFormat and uses your own custom RecordReader. I am showing you Java code, but you could easily convert it to scala if you want.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {

    public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
        private final RecordReader<LongWritable,Text> delegate;

        public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
            FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
            delegate = new LineRecordReader(conf, fileSplit);
        }

        @Override
        public boolean next(LongWritable key, Text value) throws IOException {
            return delegate.next(key, value);
        }

        @Override
        public LongWritable createKey() {
            return delegate.createKey();
        }

        @Override
        public Text createValue() {
            return delegate.createValue();
        }

        @Override
        public long getPos() throws IOException {
            return delegate.getPos();
        }

        @Override
        public void close() throws IOException {
            delegate.close();
        }

        @Override
        public float getProgress() throws IOException {
            return delegate.getProgress();
        }
    }

    @Override
    public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
    }

}

Then you need to extend the TextLine class and make it use your own input format you just defined (Scala code from now on).

import cascading.scheme.hadoop.TextLine
import cascading.flow.FlowProcess
import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
import cascading.tap.Tap
import com.twitter.scalding.{FixedPathSource, TextLineScheme}
import cascading.scheme.Scheme

class CombineFileTextLine extends TextLine{

  override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
    super.sourceConfInit(flowProcess, tap, conf)
    conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
  }
}

Create a scheme for the for your combined input.

trait CombineFileTextLineScheme extends TextLineScheme{

  override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

Finally, create your source class:

case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme

If you want to use a single path instead of multiple ones, the change to your source class is trivial.

I hope that helps.

Mom answered 29/5, 2014 at 14:6 Comment(1)
Awesome thanks!! Why this is not in Scalding by default! Anyway, I've tried to edit it slightly to change the custom InputFormat name to CombinedInputFormat because having it the same name as the scheme CombineFileTextLine TBH confused the hell out of me when I first saw this. Also I'm going to add the imports because that can throw things off too (in particular TextLine).Furry
O
0

this should do the trick, ya man? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces

Odessaodetta answered 28/5, 2014 at 20:41 Comment(1)
Thanks, but that only helps increase the number of mappers, not decrease it, particularly when there are many small files. "This can be used to increase the number of map tasks, but will not set the number below that which Hadoop determines via splitting the input data."Furry

© 2022 - 2024 — McMap. All rights reserved.