I am reading Hadoop: The definitive guide 3rd edtition
by Tom White. It is an excellent resource for understanding the internals of Hadoop
, especially Map-Reduce
which I am interested in.
From the book, (Page 205):
Shuffle and Sort
MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.
What I infer from this, is that before keys are sent to reducer, they are sorted, indicating that output of map phase of job is sorted. please note: I don't call it mapper, since a map phase include both mapper (written by programmer) and in-built sort mechanism of MR framework.
The Map Side
Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain threshold size (io.sort.spill.per cent, default 0.80, or 80%), a background thread will start to spill the contents to disk. Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.
Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the back- ground thread performs an in-memory sort by key, and if there is a combiner function, it is run on the output of the sort. Running the combiner function makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.
My understanding of the above paragraph is that as the mapper is producing key-value pairs, key-value pairs are partitioned and sorted. A hypothetical example:
consider mapper-1 for a word-count program:
>mapper-1 contents
partition-1
xxxx: 2
yyyy: 3
partition-2
aaaa: 15
zzzz: 11
(Note with-in each partition data is sorted by key, but it is not necessary that partition-1's data and partition-2's data must follow sequential order)
Continuing reading the chapter:
Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files. Before the task is finished, the spill files are merged into a single partitioned and sorted output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once; the default is 10.
My understanding here is (please know the bold phrase in above para, that tricked me): Within a map-task, several files may be spilled to disk but they are merged to a single file which still contains partition and is sorted. consider the same example as above:
Before a single map-task is finished, its intermediate data could be:
mapper-1 contents
spill 1: spill 2: spill 2:
partition-1 partition-1 partition-1
hhhh:5
xxxx: 2 xxxx: 3 mmmm: 2
yyyy: 3 yyyy: 7 yyyy: 9
partition-2 partition-2 partition-2
aaaa: 15 bbbb: 15 cccc: 15
zzzz: 10 zzzz: 15 zzzz: 13
After the map-task is completed, the output from mapper will be a single file (note three spill files above are added now but no combiner applied assuming no combiner specified in job conf):
>Mapper-1 contents:
partition-1:
hhhh: 5
mmmm: 2
xxxx: 2
xxxx: 3
yyyy: 3
yyyy: 7
yyyy: 9
partition-2:
aaaa: 15
bbbb: 15
cccc: 15
zzzz: 10
zzzz: 15
zzzz: 13
so here partition-1 may correspond to reducer-1. That is data corresponding parition-1 segment above is sent to reducer-1 and data corresponding to partition-2 segment is sent to reducer-2.
If so far, my understanding is correct,
- how will I be able to get the intermediate file that has both partitions and sorted data from the mapper output.
- It is interesting to note that running mapper alone does not produce sorted output contradicting the points that data send to reducer is not sorted. More details here
- Even no combiner is applied if No only Mapper is run: More details here