In a MapReduce job when Map tasks start producing output, the output is sorted by keys and the map outputs are also transferred to the nodes where reducers are running. This whole process is known as shuffle phase in the Hadoop MapReduce.
Though shuffle phase is internal to Hadoop framework but there are several configuration parameters to control it. This tuning helps in running your MapReduce job efficiently. In this post we’ll see what happens during sorting and shuffling at both mapper as well as reducer end.
Shuffling and sorting at Map end
When the map task starts producing output it is first written to a memory buffer which is 100 MB by default.
It is configured using mapreduce.task.io.sort.mb parameter in mapred-site.xml.
When the memory buffer reaches a certain threshold then only the map output is spilled to the disk. Configuration parameter for it is mapreduce.map.sort.spill.percent which is by default 80% of the allotted memory buffer size. Once this threshold is reached, a thread will begin to spill the contents to disk in the background.
Before the map output is written to disk following actions are taken-
- Output is divided into partitions as per the number of reducers. For example if there are 4 reducers then each map output is divided into 4 partitions. A partition can have data for more than one key but the data for any specific key resides in a single partition.
If there are 10 mappers running then output of each mapper is divided into 4 partitions and then the partition having the similar kind of keys is transferred to a reducer.
- With in each partition data is also sorted by key.
- If there is a combiner defined that is also executed.
Every time buffer memory reaches the threshold a new spill file is created and the actions as stated above are executed. At the end before the map tasks finishes all these files spilled to the disk are merged to create a single file while still honoring the partition boundaries and the sorting of keys with in each partition.
Shuffle phase at Reduce end
Once the Map output is written to the local disk of the node where Map task is running, the partitions are to be transferred to the reducers. Each reducer will get the data of its particular partition from all the mappers.
For example if there are 4 map tasks and 2 reducers then output of all these 4 maps will be divided into 2 partitions, one for each reducer.
As soon as the map task finishes and notifies ApplicationMaster the reduce tasks start copying the data of that particular map. It doesn’t wait for all the running map tasks to finish. Reducer uses threads to copy map outputs in parallel. How many threads to run is configurable and the parameter for the same is “mapreduce.reduce.shuffle.parallelcopies”. The default number of parallel transfers run by reduce during the copy (shuffle) phase is 5.
On the reduce side also data is kept in the memory buffer, if it fits in the memory itself then it helps in reduce task to execute faster. The size of the memory buffer is configured using the “mapreduce.reduce.shuffle.input.buffer.percent” parameter. It denotes the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle. Default is 70%.
If data doesn’t fit the memory then it is spilled to the disk. Threshold for that is set using the following 2 configuration parameters-
mapreduce.reduce.merge.inmem.threshold– The threshold, in terms of the number of files for the in-memory merge process. When we accumulate threshold number of files we initiate the in-memory merge and spill to disk. Default number of files is 1000.
mapreduce.reduce.shuffle.merge.percent– The usage threshold at which an in-memory merge will be initiated, expressed as a percentage of the total memory allocated to storing in-memory map outputs, as defined by mapreduce.reduce.shuffle.input.buffer.percent.
Once the data from all the mappers is copied and merged to create a single sorted file (partitions from all the mappers, sorted by keys) that becomes the input for the reduce task.
That’s all for the topic Shuffle Phase in Hadoop MapReduce. If something is missing or you have something to share about the topic please write a comment.
You may also like
- Speculative Execution in Hadoop Framework
- Parquet File Format in Hadoop
- How to Fix Corrupt Blocks And Under Replicated Blocks in HDFS
- How to Write to a File in Java
- Thread Priority in Java
- throw Keyword in Java Exception Handling