In-depth understanding of internal working of map phase in a Map reduce job in hadoop?
Asked Answered
I

1

6

I am reading Hadoop: The definitive guide 3rd edtition by Tom White. It is an excellent resource for understanding the internals of Hadoop, especially Map-Reduce which I am interested in.

From the book, (Page 205):

Shuffle and Sort

MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.

What I infer from this, is that before keys are sent to reducer, they are sorted, indicating that output of map phase of job is sorted. please note: I don't call it mapper, since a map phase include both mapper (written by programmer) and in-built sort mechanism of MR framework.


The Map Side

Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain threshold size (io.sort.spill.per cent, default 0.80, or 80%), a background thread will start to spill the contents to disk. Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.
Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the back- ground thread performs an in-memory sort by key, and if there is a combiner function, it is run on the output of the sort. Running the combiner function makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.

My understanding of the above paragraph is that as the mapper is producing key-value pairs, key-value pairs are partitioned and sorted. A hypothetical example:

consider mapper-1 for a word-count program:

>mapper-1 contents
partition-1
   xxxx: 2
   yyyy: 3
partition-2
   aaaa: 15
   zzzz: 11

(Note with-in each partition data is sorted by key, but it is not necessary that partition-1's data and partition-2's data must follow sequential order)


Continuing reading the chapter:

Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files. Before the task is finished, the spill files are merged into a single partitioned and sorted output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once; the default is 10.

My understanding here is (please know the bold phrase in above para, that tricked me): Within a map-task, several files may be spilled to disk but they are merged to a single file which still contains partition and is sorted. consider the same example as above:

Before a single map-task is finished, its intermediate data could be:

mapper-1 contents

spill 1:             spill 2:           spill 2:
    partition-1         partition-1        partition-1
                          hhhh:5       
       xxxx: 2            xxxx: 3             mmmm: 2
       yyyy: 3            yyyy: 7             yyyy: 9 

    partition-2         partition-2        partition-2
       aaaa: 15           bbbb: 15            cccc: 15
       zzzz: 10           zzzz: 15            zzzz: 13

After the map-task is completed, the output from mapper will be a single file (note three spill files above are added now but no combiner applied assuming no combiner specified in job conf):

>Mapper-1 contents:
partition-1:
hhhh: 5
mmmm: 2
xxxx: 2
xxxx: 3
yyyy: 3
yyyy: 7
yyyy: 9
partition-2:
aaaa: 15
bbbb: 15
cccc: 15
zzzz: 10
zzzz: 15
zzzz: 13

so here partition-1 may correspond to reducer-1. That is data corresponding parition-1 segment above is sent to reducer-1 and data corresponding to partition-2 segment is sent to reducer-2.

If so far, my understanding is correct,

  1. how will I be able to get the intermediate file that has both partitions and sorted data from the mapper output.
  2. It is interesting to note that running mapper alone does not produce sorted output contradicting the points that data send to reducer is not sorted. More details here
  3. Even no combiner is applied if No only Mapper is run: More details here
Ingeminate answered 23/7, 2014 at 18:13 Comment(1)
Great ques and a great answer!Aegir
M
8

Map-only jobs work differently than Map-and-Reduce jobs. It's not inconsistent, just different.

how will I be able to get the intermediate file that has both partitions and sorted data from the mapper output.

You can't. There isn't a hook to be able to get pieces of data from intermediate stages of MapReduce. Same is true for getting data after the partitioner, or after a record reader, etc.

It is interesting to note that running mapper alone does not produce sorted output contradicting the points that data send to reducer is not sorted. More details here

It does not contradict. Mappers sort because the reducer needs it sorted to be able to do a merge. If there are no reducers, it has no reason to to sort, so it doesn't. This is the right behavior because I don't want it sorted in a map only job which would make my processing slower. I've never had a situation where I wanted my map output to be locally sorted.

Even no combiner is applied if No only Mapper is run: More details here

Combiners are an optimization. There is no guarantee that they actually run or over what data. Combiners are mostly there to make the reducers more efficient. So, again, just like the local sorting, combiners do not run if there are no reducers because it has no reason to.

If you want combiner-like behavior, I suggest writing data into a buffer (hashmap perhaps) and then writing out locally-summarized data in the cleanup function that runs when a Mapper finishes. Be careful of memory usage if you want to do this. This is a better approach because combiners are specified as a good-to-have optimization and you can't count on them running... even when they do run.

Mysterious answered 23/7, 2014 at 18:30 Comment(5)
Thanks for the details. Is my understanding of how the data is partitioned and sorted at Map-side is correct? both at spill files level and merged file. That is the hypothetical intermediate data I pictured above..Ingeminate
Yes, partitioned and sorted on the map side. Reducers do the final "sort" by merging the sorted partition files.Mysterious
and one more point: If i have records that needs be sorted alone, I cannot achieve this with only mapper. I need to run a "dummy" reducer which just spits out the incoming the data..Ingeminate
ahh I just noticed you are the author of MapReduce Design patterns book. gosh, it is a great resource, and I just completed first chapter in it.Ingeminate
You can achieve this with only the mapper-- it's just a bit of a hack and dangerous memory-wise. Create an empty list in the setup function. Append to that list in the map function. Sort it in the cleanup function and then do context.write in cleanup (not map). Glad you enjoy the book :)Mysterious

© 2022 - 2024 — McMap. All rights reserved.