How to directly send the output of a mapper-reducer to a another mapper-reducer without saving the output into the hdfs
Asked Answered
R

3

6

Problem Solved Eventually check my solution in the bottom


Recently I am trying to run the recommender example in the chaper6 (listing 6.1 ~ 6.4)from the Mahout in Action. But I encountered a problem and I have googled around but I can't find the solution.

Here is the problem: I have a pair of mapper-reducer

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

Then I want to use another pair of mapper-reducer to process this data

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

When I try to run the job, it cast error says

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

the first mapper-reducer write the output into the hdfs, and the second mapper-reducer try to read the output, the mapper can cast the 98955 to VarLongWritable, but can't convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable, So I am wondering is there a way to make the first mapper-reducer directly send the output to the second pair, then there is no need to do the data converting. I have looked up Hadoop in action, and hadoop: the definitive guide, it seems there is no such a way to do that, any suggestions?


Problem solved

Solution: By using SequenceFileOutputFormat, we can output and save the reduce result of the first MapReduce workflow on the DFS, then the second MapReduce workflow can read the temporary file as input by using SequenceFileInputFormat class as parameter when creating the mapper. Since the vector would be saved in binary sequence file which has specific format, the SequenceFileInputFormat can read it and transform it back to vector format.

Here are some example code:

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             
             
    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();
    
    
    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();
    
    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

If you have any problem with this please feel free to contact me

Rectocele answered 22/4, 2012 at 8:23 Comment(2)
you have to temporarily save output of first map-reduce so that the 2nd one can use it.Andres
Do you know any api that I can use to temporarily save the output? Appreciate your help!Rectocele
D
4

You need to explicitly configure the output of the first job to use the SequenceFileOutputFormat and define the output key and value classes:

job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(VarLongWritable.class);
job.setOutputKeyClass(VectorWritable.class);

Without seeing your driver code, i'm guessing you're using TextOutputFormat as the output, of the first job, and TextInputFormat as the input to the second - and this input format sends pairs of <Text, Text> to the second mapper

Dismember answered 22/4, 2012 at 13:12 Comment(1)
Thank you Chris, I have managed to generate the final output in your way, although the first mapper-reducer still write the sequenceFileOutputFormat output into the hdfs, but the second mapper-reducer is now able now recognize the VectorWritable class data. By the way, do you know any method to directly send the output to the second mapper-reducer? I think chainMapper class has implemented some method to do that, since within it map->map->reduce->map->map process, we can't see any temporary output in the hdfs. Maybe I should take look at its source-codeRectocele
A
1

I am a beginner in hadoop, it's just my guess of the answer, so please bear with it/point out if it seems to be naive.

I think it's not reasonable to send from reducer to next mapper without saving on HDFS. Because "which split of data go to which mapper" is elegantly designed to meet the locality criteria.(goes to mapper node which have data stored locally).

If you don't store it on HDFS, most likely that all the data will be transmitted by network which is slow and may cause bandwidth problem.

Alienation answered 23/4, 2012 at 23:43 Comment(1)
this seems to be a logical answer.Neural
A
0

You have to temporarily save output of first map-reduce so that the 2nd one can use it.

This might help you to understand how the output of first map-reduce is passed to 2nd one. (this is based on the Generator.java of Apache nutch).

This is the temporary dir for the output of first map-reduce:

Path tempDir =
  new Path(getConf().get("mapred.temp.dir", ".")
           + "/job1-temp-"
           + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

Setting up first map-reduce job:

JobConf job1 = getConf();
job1.setJobName("job 1");
FileInputFormat.addInputPath(...);
sortJob.setMapperClass(...);

FileOutputFormat.setOutputPath(job1, tempDir);
job1.setOutputFormat(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(...);
JobClient.runJob(job1);

Observe that the output dir is set in the job configuration. Use this in the 2nd job:

JobConf job2 = getConf();
FileInputFormat.addInputPath(job2, tempDir);
job2.setReducerClass(...);
JobClient.runJob(job2);

Remember to clean-up the temp dirs after your are done:

// clean up
FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);

Hope this helps.

Andres answered 22/4, 2012 at 10:35 Comment(3)
Thank you for your reply~~~ But I think in this way, we just make sure that the second mapper-reducer's input directory is as the same as the output directory of the first mapper-reducer. we still need to write the output into the hdfs first, and let the second job to read data from the output. However, in my second mapper-reducer, the mapper can not convert {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0} to VectorWritable because it treats the data in the output as Text, so if I run the second job, it will cast "org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable".Rectocele
till now i hav never came across way to do that without writing to hdfs. If u find a way, please share it with me. For 2nd concern: why cant you try setting the output format of first job to the input format of the 2nd job ?Andres
Actually,they are the same, the output of the first job is "VarLongWritable, VectorWritable" and the input of the second job is "VarLongWritable key, VectorWritable value". But if you write it into the hdfs, it will become text type data, and the mapper in the second job can transform the userID(Text) to VarLongWritable, but can't cast the following vector into the VectorWritable, it treats them as plain text. And I think that is the problem and I want to avoid this by directly output data to the second job without write it in the hdfsRectocele

© 2022 - 2024 — McMap. All rights reserved.