This post shows an Avro MapReduce example program using the Avro MapReduce API.
As an example word count MapReduce program is used where the output will be an Avro data file.
Required jars
avro-mapred-1.8.2.jar
Avro word count MapReduce example
Since output is Avro file so an Avro schema has to be defined, we’ll have two fields in the schema "word" and "count".
In the code you can see the use of AvroKey
and AvroValue
for the key and value pairs.
Also for output AvroKeyOutputFormat
class is used.
To define the map output and the output of a MaReduce job AvroJob
class is used for job configuration.
import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AvroWordCount extends Configured implements Tool{ /// Schema private static final Schema AVRO_SCHEMA = new Schema.Parser().parse( "{\n" + " \"type\": \"record\",\n" + " \"name\": \"WordCount\",\n" + " \"doc\": \"word count\",\n" + " \"fields\":\n" + " [\n" + " {\"name\": \"word\", \"type\": \"string\"},\n"+ " {\"name\": \"count\", \"type\": \"int\"}\n"+ " ]\n"+ "}\n"); // Map function public static class AvroWordMapper extends Mapper<LongWritable, Text, AvroKey<Text>, AvroValue<GenericRecord>>{ private Text word = new Text(); private GenericRecord record = new GenericData.Record(AVRO_SCHEMA); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Splitting the line on spaces String[] stringArr = value.toString().split("\\s+"); for (String str : stringArr) { word.set(str); // creating Avro record record.put("word", str); record.put("count", 1); context.write(new AvroKey<Text>(word), new AvroValue<GenericRecord>(record)); } } } // Reduce function public static class AvroWordReducer extends Reducer<AvroKey<Text>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable>{ public void reduce(AvroKey<Text> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { int sum = 0; for (AvroValue<GenericRecord> value : values) { GenericRecord record = value.datum(); sum += (Integer)record.get("count"); } GenericRecord record = new GenericData.Record(AVRO_SCHEMA); record.put("word", key.datum()); record.put("count", sum); context.write(new AvroKey<GenericRecord>(record), NullWritable.get()); } } public static void main(String[] args) throws Exception{ int exitFlag = ToolRunner.run(new AvroWordCount(), args); System.exit(exitFlag); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "AvroWC"); job.setJarByClass(getClass()); job.setMapperClass(AvroWordMapper.class); job.setReducerClass(AvroWordReducer.class); AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING)); AvroJob.setMapOutputValueSchema(job, AVRO_SCHEMA); AvroJob.setOutputKeySchema(job, AVRO_SCHEMA); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AvroKeyOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
After creating jar you can run this Avro MapReduce program using the following command.
hadoop jar /home/knpcode/knpcodehadoop.jar org.knpcode.AvroWordCount /user/input/count /user/out/result
This program is executed on a simple text file with only two lines.
This is a test file. This is a Hadoop MapReduce program file.
The output file can be checked using the avrotools.jar.
hadoop jar /path/to/avro-tools-1.8.2.jar tojson /user/out/result/part-r-00000.avro {"word":"Hadoop","count":1} {"word":"MapReduce","count":1} {"word":"This","count":2} {"word":"a","count":2} {"word":"file.","count":2} {"word":"is","count":2} {"word":"program","count":1} {"word":"test","count":1}
That's all for the topic Avro MapReduce Example. If something is missing or you have something to share about the topic please write a comment.
You may also like
- What is Data Locality in Hadoop
- How to Use LZO Compression in Hadoop
- YARN Fair Scheduler With Example
- ClassNotFoundException in Java
- Deadlock in Java With Examples
- Java Program to Find Longest Palindrome in The Given String
- Java Multithreading Interview Questions And Answers
- Invoke Getters and Setters Using Java Reflection
No comments:
Post a Comment