The reduce fails due to Task attempt failed to report status for 600 seconds. Killing! Solution?
Asked Answered
A

2

6

The reduce phase of the job fails with:

of failed Reduce Tasks exceeded allowed limit.

The reason why each task fails is:

Task attempt_201301251556_1637_r_000005_0 failed to report status for 600 seconds. Killing!

Problem in detail:

The Map phase takes in each record which is of the format: time, rid, data.

The data is of the format: data element, and its count.

eg: a,1 b,4 c,7 correseponds to the data of a record.

The mapper outputs for each data element the data for every record. eg:

key:(time, a,), val: (rid,data) key:(time, b,), val: (rid,data) key:(time, c,), val: (rid,data)

Every reduce receives all the data corresponding to same key from all the records. e.g: key:(time, a), val:(rid1, data) and key:(time, a), val:(rid2, data) reach the same reduce instance.

It does some processing here and outputs similar rids.

My program runs without trouble for a small dataset such as 10MB. But fails when the data increases to say 1G, with the above mentioned reason. I don't know why this happens. Please help!

Reduce code:

There are two classes below:

  • VCLReduce0Split
  • CoreSplit

a. VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
    //  @SuppressWarnings("unchecked")
        public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

            String key_str = key.toString();
            StringTokenizer stk = new StringTokenizer(key_str);
            String t = stk.nextToken();

            HashMap<String, String> hmap = new HashMap<String, String>();

            while(values.hasNext())
            {
                StringBuffer sbuf1 = new StringBuffer(); 
                String val = values.next().toString();
                StringTokenizer st = new StringTokenizer(val);

                String uid = st.nextToken();

                String data = st.nextToken();

                     int total_size = 0;

                     StringTokenizer stx = new StringTokenizer(data,"|");

                     StringBuffer sbuf = new StringBuffer();

                     while(stx.hasMoreTokens())
                     {
                         String data_part = stx.nextToken();
                         String data_freq = stx.nextToken();

                    //   System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
                         sbuf.append(data_part);
                         sbuf.append("|");
                         sbuf.append(data_freq);
                         sbuf.append("|");
                     }
                /*     
                     for(int i = 0; i<parts.length-1; i++)
                     {
                         System.out.println("data:--------------->"+data);
                         int part_size = Integer.parseInt(parts[i+1]);
                         sbuf.append(parts[i]);
                         sbuf.append("|");
                         sbuf.append(part_size);
                         sbuf.append("|");
                         total_size = part_size+total_size;
                         i++;
                     }*/

                sbuf1.append(String.valueOf(total_size));
                sbuf1.append(",");
                sbuf1.append(sbuf);
                if(uid.equals("203664471")){
                //  System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
                }
                hmap.put(uid, sbuf1.toString());

            }

            float threshold = (float)0.8;

            CoreSplit obj = new CoreSplit();


            ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);

            for(int i = 0; i<al.size(); i++)
            {
                CustomMapSimilarity cmaps = al.get(i);
                String xy_pair = cmaps.getRIDPair();
                String similarity = cmaps.getSimilarity();
                output.collect(new Text(xy_pair), new Text(similarity));
            }


         }
    }

b. coreSplit

package com.a;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.commons.collections.map.MultiValueMap;

public class PPJoinPlusCoreOptNewSplit{


     public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
     {

         ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap.keySet().iterator();

        MultiValueMap index = new MultiValueMap();

        String RID;
        TreeMap<String, Integer> hmap2;
        Iterator<String> iter1;

        int size;
        float prefix_size;
        HashMap<String, Float> alpha;
        HashMap<String, CustomMapOverlap> hmap_overlap;

        String data;

        while(iter.hasNext())
            {
                RID = (String)iter.next();

                String data_val = hmap.get(RID);

                StringTokenizer st = new StringTokenizer(data_val,",");
            //    System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
                String RIDsize = st.nextToken();
                size = Integer.parseInt(RIDsize);
                data = st.nextToken();


                StringTokenizer st1 = new StringTokenizer(data,"\\|");


                String[] parts = data.split("\\|");

            //  hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
        //      iter1 = hmap2.keySet().iterator();

            //  size = hmap_size.get(RID);

                prefix_size = (float)(size-(0.8*size)+1); 

                if(size==1)
                {
                    prefix_size = 1;
                }

                alpha = new HashMap<String, Float>();

                hmap_overlap = new HashMap<String, CustomMapOverlap>();

        //      Iterator<String> iter2 = hmap2.keySet().iterator();

                int prefix_index = 0;

                int pi=0;

                for(float j = 0; j<=prefix_size; j++)
                {

                    boolean prefix_chk = false;
                    prefix_index++;
                    String ptoken = parts[pi];
            //      System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
                    float val = Float.parseFloat(parts[pi+1]);
                    float temp_j = j;
                     j = j+val;
                     boolean j_l = false ;
                     float prefix_contri = 0;
                     pi= pi+2;

                     if(j>prefix_size)
                        {

                            // prefix_contri = j-temp_j;
                             prefix_contri = prefix_size-temp_j;

                            if(prefix_contri>0)
                            {
                                 j_l = true;
                                 prefix_chk = false;

                            }
                            else
                            {
                                prefix_chk = true;                              
                            }
                        }                   


                    if(prefix_chk == false){


                        filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);


                    CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
                    index.put(ptoken, cmapt);

                }

            }


                als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);

                for(int i = 0; i<als.size(); i++)
                {
                    if(als.get(i).getRIDPair()!=null)
                    {
                        alsim.add(als.get(i));

                    }
                }

            }

         return alsim;

     }


     public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
     {
            @SuppressWarnings("unchecked")

            ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);

            if((positions_list!=null) &&(positions_list.size()!=0))
            {

                CustomMapPrefixTokens cmapt ;
                String y;
                Iterator<String> iter3;
                int y_size = 0;
                float check_size = 0;
            //  TreeMap<String, Integer> hmapy;
                float RID_val=0;
                float y_overlap = 0;
                float ubound = 0;
                ArrayList<Float> fl = new ArrayList<Float>();

              StringTokenizer st;

            for(int k = 0; k<positions_list.size(); k++)
            {
                cmapt = positions_list.get(k);

                if(!cmapt.getRID().equals(RID))
                {

                 y = hmap.get(cmapt.getRID());

                // iter3 = y.keySet().iterator();

                 String yRID = cmapt.getRID();

                 st = new StringTokenizer(y,",");

                 y_size = Integer.parseInt(st.nextToken());

                 check_size = (float)0.8*(size);

                if(y_size>=check_size)
                {

                    //hmapy = hmap.get(yRID);

                    String y_data = st.nextToken();

                    StringTokenizer st1 = new StringTokenizer(y_data,"\\|");


                    while(st1.hasMoreTokens())
                    {
                        String token = st1.nextToken();
                        if(token.equals(ptoken))
                        {

                            String nxt_token = st1.nextToken();
                    //      System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
                            RID_val = (float)Integer.parseInt(nxt_token);
                            break;
                        }
                    }

                 //    RID_val = (float) hmapy.get(ptoken); 
                     float alpha1 = (float)(0.8/1.8)*(size+y_size);

                     fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);

                     ubound = fl.get(0);
                     y_overlap = fl.get(1);


                    positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);

                  }

                }   
            }
        }



     }


   public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
   {

     float y_overlap_total = 0;

            if(null!=hmap_overlap.get(cmapt.getRID()))
            {

            y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();

            if((y_overlap_total+ubound)>=alpha1)
            {

                CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());

                float y_o_t = y_overlap+y_overlap_total;

                cmap_tmp.setOverlap(y_o_t);
                hmap_overlap.put(cmapt.getRID(),cmap_tmp);

            }
            else
            {
                float n = 0;
                hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
            }

            }
            else
            {
                CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
                hmap_overlap.put(cmapt.getRID(), cmap_tmp);

            }

   }

   public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
   {

            alpha.put(cmapt.getRID(), alpha1);
            float min1 = y_size-cmapt.getPosition();
            float min2 = size-j;
            float min = 0;

            float y_overlap = 0;

            if(min1<min2)
            {
                min = min1;
            }
            else
            {
                min = min2;
            }
            if(j_l==true)
            {
                val = prefix_contri;    
            }                                       
            if(RID_val<val)
            {
                y_overlap = RID_val;
            }
            else
            {
                y_overlap = val;
            }

            float ubound = y_overlap+min;

            ArrayList<Float> fl = new ArrayList<Float>();
            fl.add(ubound);
            fl.add(y_overlap);

            return fl;

   }


     public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
     {

         float jaccard = 0;

         CustomMapSimilarity cms = new CustomMapSimilarity(null, null);   
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap_overlap.keySet().iterator();

        while(iter.hasNext())
        {
            String key = (String)iter.next();

            CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);

            float overlap = (float)val.getOverlap();

            if(overlap>0)
            {

               String yRID = val.getRID();

              String RIDpair = RID+" "+yRID;

             jaccard = unionIntersection(hmap, RIDpair);

             if(jaccard>0.8)
                {
                    cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
                    alsim.add(cms);
                }

            }

        }

         return alsim;

     }


     public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
     {


            StringTokenizer st = new StringTokenizer(RIDpair);

            String xRID = st.nextToken();

            String yRID = st.nextToken();

            String xdata = hmap.get(xRID);

            String ydata = hmap.get(yRID);


            int total_union = 0;

            int xval = 0;
            int yval = 0;
            int part_union = 0;

            int total_intersect = 0;

        //  System.out.println("xdata:------*************>"+xdata);

            StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
            StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
        //  String[] xpart = xdata.split(",");
        //  String[] ypart = ydata.split(",");

            xtokenizer.nextToken();
            ytokenizer.nextToken();

            String datax = xtokenizer.nextToken();
            String datay = ytokenizer.nextToken();


            HashMap<String,Integer> x = new HashMap<String, Integer>();
            HashMap<String,Integer> y = new HashMap<String, Integer>();


            String [] xparts;

                 xparts = datax.toString().split("\\|");


              String [] yparts;

                 yparts = datay.toString().split("\\|");


                 for(int i = 0; i<xparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(xparts[i+1]);
                     x.put(xparts[i], part_size);

                     i++;
                 }

                 for(int i = 0; i<yparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(yparts[i+1]);
                     y.put(xparts[i], part_size);

                     i++;
                 }


             Set<String> xset = x.keySet();
             Set<String> yset = y.keySet();

            for(String elm:xset )
            {

                yval = 0;

                xval = (Integer)x.get(elm);

                part_union = 0;
                int part_intersect = 0;
                if(yset.contains(elm)){

                    yval = (Integer) y.get(elm);

                if(xval>yval)
                {
                    part_union = xval;
                    part_intersect = yval;
                }
                else
                {
                    part_union = yval;
                    part_intersect = xval;
                }
                total_intersect = total_intersect+part_intersect;
                }
                else
                {
                    part_union = xval;
                }

                total_union = total_union+part_union;


            }


            for(String elm: yset)
            {
                part_union = 0;

                if(!xset.contains(elm))
                {
                    part_union = (Integer) y.get(elm);
                    total_union = total_union+part_union;
                }

            }

            float jaccard = (float)total_intersect/total_union;

         return jaccard;

     }

}
Adina answered 7/3, 2013 at 20:42 Comment(2)
can you post your reducer code?Lumberyard
I ve added the code. Can you suggest me, if I ve to change something inorder to make it more CPU efficient, and so on.Adina
K
10

The reason for the timeouts might be a long-running computation in your reducer without reporting the progress back to the Hadoop framework. This can be resolved using different approaches:

I. Increasing the timeout in mapred-site.xml:

<property>
  <name>mapred.task.timeout</name>
  <value>1200000</value>
</property>

The default is 600000 ms = 600 seconds.

II. Reporting progress every x records as in the Reducer example in javadoc:

public void reduce(K key, Iterator<V> values,
                          OutputCollector<K, V> output, 
                          Reporter reporter) throws IOException {
   // report progress
   if ((noValues%10) == 0) {
     reporter.progress();
   }

   // ...
}

optionally you can increment a custom counter as in the example:

reporter.incrCounter(NUM_RECORDS, 1);
Killian answered 7/3, 2013 at 21:15 Comment(4)
Hi, thanks for the reply!. I ve pasted my reduce code above. In my reduce class, the main computation starts after, the entire reduce_value_list has been read. In this case, it the program is tied put in the main computation outside the reduce_value_list while loop, how to report progress? Besides, can you suggest any CPU efficient way of doing the above pasted code? Initially, I had used hashmaps, which offered more CPU efficiency, but removed it due to memory issues.Adina
I assume the longest and most CPU-intensive computation is done in similarityCalculation(). You should report progress in this method. You also should consider replacing all string tokenizing with proper classes, so that string parsing and tokenizing is done only once. This could improve your algorithm.Killian
Another approach would be to rewrite the algorithm, so that some of the computation of overlappings are done in the mappers. The parallel computation would hopefully speed up the algorithm. However this is sth you have to figure out for yourself and validate if the approach is valid for the specific algorithm you would like to implement.Killian
Thanks for the answers... the job doesn't fail now, but is running for hours because it is CPU intenive!Adina
S
2

It's possible that you might have consumed all of Java's heap space or GC is happening too frequently giving no chance to the reducer to report status to master and is hence killed.

Another possibility is that one of the reducer is getting too skewed data, i.e. for a particular rid, a lot of records are there.

Try to increase your java heap by setting the following config: mapred.child.java.opts

to

-Xmx2048m

Also, try and reduce the number of parallel reducers by setting the following config to a lower value than what it currently has (default value is 2):

mapred.tasktracker.reduce.tasks.maximum

Spanos answered 7/3, 2013 at 21:4 Comment(1)
Thanks for the reply. It was very helpful. I ve pasted the reduce code, can u suggest more efficient ways of implementing it?Adina

© 2022 - 2024 — McMap. All rights reserved.