MapReduce Basics

Posted on Updated on

MapReduce is a programming model designed to process large volume of data-set’s in parallel by dividing the work to multiple nodes in the Hadoop cluster.

MapReduce works by breaking the processing into 2 phases called the map phase and the reduce phase.Each phase as key value pairs as input and output(which can be chosen by programmers). The programmer specifies 2 functions known as the map function and the reduce function.

Map function takes input key/value pairs to intermediate key/value pairs(it can be of same type or different from input pairs).

Reduce function processes the intermediate values processes the intermediate values for a particular key generated by the map function and generates the output.The number of reducers can be 1 or many.

lets understand (visualize) how MapReduce work by taking classic word count program and then go further.

Data-set:

hi how are you
happy hadoop learning
hadoop is very interesting
Enjoy hadoop learning

Expected output: count of each word in the data set.

For Mapper, each line of the data-set is passed as input ( TextInputFormat – it is a default InputFormat ) and mapper produces the below output:

I/P: hi how are you O/P: (hi,1) (how,1) (are,1) (you,1)
I/P: happy hadoop learning O/P: (happy,1) (hadoop,1) (learning,1)
I/P: hadoop is very interesting O/P: (hadoop,1) (is,1) (very,1) (interesting,1)
I/P: Enjoy hadoop learning O/P: (Enjoy,1) (hadoop,1) (learning,1)

Note: Here the byte offset of the line acts as key which we ignore as it is of no use

In between Mapper and Reducer, there is a phase called shuffle/sort(Framework takes care about this phase). This phase collects data from mappers and sorts and then sends to reducer.

shuffle/Sort will send input to reducer as :

(are,1) (Enjoy,1) (hadoop,[1,1,1]) (happy,1) (hi,1) (how,1) (interesting,1) (is,1) (learning,[1,1]) (very,1) (you,1)

Input to Reducer would be O/P of Mapper and reducer will produce below O/P(do the aggregation):

Enjoy    1
are    1
hadoop    3
happy    1
hi    1
how    1
interesting    1
is    1
learning    2
very    1
you    1

Lets express this in code. We require a map function, a reduce function and code to run this job.

map function: To implement a map function we need to extend Mapper class which declares abstract map() method.

reduce function: To implement a reduce function we need to extend Reducer class which declares abstract reduce() method.

Mapper takes LongWritable, Text as input key value pair and produces Text, IntWritable as output key value pair.

Text is equal to Java string,LongWritable is equal to Java Long and IntWritable to Java integer.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      
public class Count {
     
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one); //inserting output as key->word from input data and value->1(one)
        }
    }
 }
        
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable values, Context context)
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
 }
        
 public static void main(String[] args) throws Exception {
   
    Job job = Job.getInstance(new Configuration());
    job.setJarByClass(Count.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
   }

}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s