How to Chain MapReduce Job in Hadoop

In many scenarios you would like to create a sequence of MapReduce jobs to completely transform and process the data. This is better than putting every thing in a single MapReduce job and making it very complex.
In fact you can get your data through various sources and use a sequence of various applications too. That can be done by creating a work flow using Oozie but that is a topic for another post. In this post we’ll see how to chain MapReduce job in Hadoop using ChainMapper and ChainReducer.

ChainMapper in Hadoop

ChainMapper is one of the predefined MapReduce class in Hadoop. ChainMapper class allows you to use multiple Mapper classes within a single Map task. The Mapper classes are invoked in a chained fashion where the output of the first mapper becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task’s output.
You can add mappers to a ChainMapper using addMapper() method.

ChainReducer in Hadoop

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. For each record output by the Reducer, the Mapper classes are invoked in a chained fashion. The output of the reducer becomes the input of the first mapper and output of first becomes the input of the second, and so on until the last Mapper, the output of the last Mapper will be written to the task’s output.

To add a Mapper class to the chain reducer you can use addMapper() method.
To set the Reducer class to the chain job you can use setReducer() method.

Chaining MapReduce job

Using the ChainMapper and the ChainReducer classes it is possible to compose MapReduce jobs that look like [MAP+ / REDUCE MAP*].

When you are using chained MapReduce you can have a combination as follows-

  1. One or more mappers
  2. Single Reducer
  3. Zero or more mappers (optional and to be used only if chained reducer is used)

When you are using chained MapReduce job the data from mappers or reducer is stored (and used) in the memory rather than on disk that reduces the disk IO to a large extent.

MapReduce chaining example

There is data of stocks with stock symbol, price and transaction in a day in the following format.

AAA		23	5677
BBB		23	12800
aaa		26	23785
.....
.....

In the data symbols are not always in the uppercase. So there are two mappers, in first relevant fields are extracted (symbol and transaction). In the second mapper symbols are converted to upper case.

Then there is a reducer that adds the transaction per symbol. Then with in the reduce task there is an InverseMapper that inverses the key, value pair. Note that InverseMapper is a predefined Mapper class with in the Hadoop framework that is why there is no implementation of it in the example code.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class StockTrans extends Configured implements Tool{
  // Mapper 1
  public static class StockFieldMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text symbol = new Text();
    
    public void map(LongWritable key, Text value, Context context) 
        throws IOException, InterruptedException {
      // Splitting the line on tab
      String[] stringArr = value.toString().split("\t");
      //Setting symbol and transaction values
      symbol.set(stringArr[0]);
      Integer trans = Integer.parseInt(stringArr[2]);
      context.write(symbol, new IntWritable(trans));
    }
  }
	
  // Mapper 2
  public static class UpperCaseMapper extends Mapper<Text, IntWritable, Text, IntWritable>{
    public void map(Text key, IntWritable value, Context context) 
        throws IOException, InterruptedException {
    
      String symbol = key.toString().toUpperCase();       
      context.write(new Text(symbol), value);
    }
  }
	
  // Reduce function
  public static class TotalTransReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    public void reduce(Text key, Iterable values, Context context) 
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }      
      context.write(key, new IntWritable(sum));
    }
  }	

  public static void main(String[] args) throws Exception {
    int exitFlag = ToolRunner.run(new StockTrans(), args);
    System.exit(exitFlag);
  }
	
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Stock transactio");
    job.setJarByClass(getClass());
    // MapReduce chaining
    Configuration map1Conf = new Configuration(false);
    ChainMapper.addMapper(job, StockFieldMapper.class, LongWritable.class, Text.class,
        Text.class, IntWritable.class,  map1Conf);
    
    Configuration map2Conf = new Configuration(false);
    ChainMapper.addMapper(job, UpperCaseMapper.class, Text.class, IntWritable.class,
           Text.class, IntWritable.class, map2Conf);
    
    Configuration reduceConf = new Configuration(false);		
    ChainReducer.setReducer(job, TotalTransReducer.class, Text.class, IntWritable.class,
        Text.class, IntWritable.class, reduceConf);

    ChainReducer.addMapper(job, InverseMapper.class, Text.class, IntWritable.class,
        IntWritable.class, Text.class, null);
     
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

On running this code after creating the jar.

hadoop jar /home/knpcode/Documents/knpcode/knpcodehadoop.jar org.knpcode.StockTrans /user/input/StockTrans.txt /user/output/stock

Output

hdfs dfs -cat /user/output/stock/part-r-00000

50483	AAA
180809	BBB

Related Posts

That’s all for the topic How to Chain MapReduce Job in Hadoop. If something is missing or you have something to share about the topic please write a comment.


You may also like

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.