Joins with Map Reduce

Posted on

Excellent post on understanding Joins with MapReduce

Source Open

I have been reading on Join implementations available for Hadoop for past few days. In this post I recap some techniques I learnt during the process. The joins can be done at both Map side and Join side according to the nature of data sets of to be joined.

Reduce Side Join

Let’s take the following tables containing employee and department data.

Let’s see how join query below can be achieved using reduce side join.

Map side is responsible for emitting the join predicate values along with the corresponding record from each table so that records having same department id in both tables will end up at on same reducer which would then do the joining of records having same department id. However it is also required to tag the each record to indicate from which table the record originated so that joining happens between records of two tables. Following…

View original post 409 more words

MapReduce Basics

Posted on Updated on

MapReduce is a programming model designed to process large volume of data-set’s in parallel by dividing the work to multiple nodes in the Hadoop cluster.

MapReduce works by breaking the processing into 2 phases called the map phase and the reduce phase.Each phase as key value pairs as input and output(which can be chosen by programmers). The programmer specifies 2 functions known as the map function and the reduce function.

Map function takes input key/value pairs to intermediate key/value pairs(it can be of same type or different from input pairs).

Reduce function processes the intermediate values processes the intermediate values for a particular key generated by the map function and generates the output.The number of reducers can be 1 or many.

lets understand (visualize) how MapReduce work by taking classic word count program and then go further.


hi how are you
happy hadoop learning
hadoop is very interesting
Enjoy hadoop learning

Expected output: count of each word in the data set.

For Mapper, each line of the data-set is passed as input ( TextInputFormat – it is a default InputFormat ) and mapper produces the below output:

I/P: hi how are you O/P: (hi,1) (how,1) (are,1) (you,1)
I/P: happy hadoop learning O/P: (happy,1) (hadoop,1) (learning,1)
I/P: hadoop is very interesting O/P: (hadoop,1) (is,1) (very,1) (interesting,1)
I/P: Enjoy hadoop learning O/P: (Enjoy,1) (hadoop,1) (learning,1)

Note: Here the byte offset of the line acts as key which we ignore as it is of no use

In between Mapper and Reducer, there is a phase called shuffle/sort(Framework takes care about this phase). This phase collects data from mappers and sorts and then sends to reducer.

shuffle/Sort will send input to reducer as :

(are,1) (Enjoy,1) (hadoop,[1,1,1]) (happy,1) (hi,1) (how,1) (interesting,1) (is,1) (learning,[1,1]) (very,1) (you,1)

Input to Reducer would be O/P of Mapper and reducer will produce below O/P(do the aggregation):

Enjoy    1
are    1
hadoop    3
happy    1
hi    1
how    1
interesting    1
is    1
learning    2
very    1
you    1

Lets express this in code. We require a map function, a reduce function and code to run this job.

map function: To implement a map function we need to extend Mapper class which declares abstract map() method.

reduce function: To implement a reduce function we need to extend Reducer class which declares abstract reduce() method.

Mapper takes LongWritable, Text as input key value pair and produces Text, IntWritable as output key value pair.

Text is equal to Java string,LongWritable is equal to Java Long and IntWritable to Java integer.

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Count {
 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            context.write(word, one); //inserting output as key->word from input data and value->1(one)
 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable values, Context context)
      throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        context.write(key, new IntWritable(sum));
 public static void main(String[] args) throws Exception {
    Job job = Job.getInstance(new Configuration());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);


Configuring Apache Flume with example

Posted on


Apache Hadoop is becoming a standard data processing framework in large enterprises. The data is being generated in massive amount and which needs to be written on HDFS, this gave birth to a new project by the name of Apache Flume. Flume’s HDFS and HBase sinks provides a set of features that makes it possible to write data in any format that is supported by these systems and in a MapReduce/Hive/Impala/Pig friendly way. We are just covering how to install flume and get this working :

1. Download the apache flume from http://archive.apache.org/dist/flume/ to /usr/local/src

2. cd /usr/local/;tar -xzvf src/apache-flume-1.4.0-bin.tar.gz

3. cd /apache;ln -s /usr/local/apache-flume-1.4.0-bin/ flume

4. cd flume; vi conf/flume.conf

agent.sources = logstream
agent.channels = memoryChannel
agent.channels.memoryChannel.type = memory
agent.sources.logstream.channels = memoryChannel
agent.sources.logstream.type = exec
agent.sources.logstream.command = tail -f /apache/flume/test
agent.sinks = hdfsSink
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path = hdfs://hacluster:8020/flumetest
agent.sinks.hdfsSink.hdfs.fileType = SequenceFile
agent.sinks.hdfsSink.hdfs.writeFormat = Text

View original post 203 more words


Posted on Updated on

Hive Shell:

Hive commands can be executed either via hive shell in interactive mode or by using a script and invoking with ‘hive -f script_name

Creating a database:

hive> create database hive_test;

Prints ‘OK’ if the command is executed successfully. You can also execute ‘show databases’ to list the database names .

Type ‘use hive_test’ to start using this database.

Create table:

‘CREATE TABLE test_tab(name STRING,id INT)
row format delimited
fields terminated by ‘,’;

This will create a table named ‘test_tab’ in hive_test database.

‘row format delimited’ -> delimiter expecting by this table. default delimiter is newline
fields terminated by ‘,’ -> Here we will load a csv file so the feilds are terminated by ‘,’

Table can also be created using ‘CREATE EXTERNAL TABLE‘ .

If we create table using ‘CREATE TABLE’ without ‘external’ keyword then Hive will move the data during load operation to hive warehouse. Unnecessarily we are duplicating the same data at both Hadoop filesystem and in Hive
warehouse. This can be avoided by using ‘External’ keyword.

LOCATION ‘/home/external_table’;
LOAD DATA INPATH ‘/home/test/test.txt’ INTO TABLE test_tab;

Load data to Hive table:
data can be loaded from local file system or from Hadoop file system.

load data local inpath ‘/home/test/test.csv’into table test_tab; will load data from local file system(Note the local keyword before inpath).

load data inpath ‘/home/test/test.txt’ into table test_tab; will load data from Hadoop file system.

Note: Hive wont throw any error during ‘load’ operation related to table schema because actual validation takes place during execution time.


Hive table can be modified using alter . For instance we can add or drop columns from a table using alter.

ALTER TABLE test_tab ADD COLUMNS column_test

ALTER TABLE test_tab DROP column_test

to rename table name ALTER TABLE test_tab RENAME to test_table


We can drop a table in Hive using ‘drop’.  When we drop data then both metadata and date would be deleted but for external tables only metadata would be deleted ( data would be untouched ).

DROP TABLE [ IF EXISTS ] test_table

Partition and buckets:

For efficient querying Hive supports partitioning and buckets.

Using partition a table can be divided into related parts. For instance, a table can be partitioned based on columns such as city,date or country. Using partition, it becomes easy to query a portion of data in a table.

Buckets further sub divides a table or partition into buckets. Bucketing works based on the value of hash function of some column of a table.

< full post under construction >

Pig Installation on Ubuntu

Posted on

Helpful blog for installing PIG

Big Data World

pigExecution Modes

Pig has two execution modes :

  • Local Mode – To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local).
  • MapReduce Mode – To run Pig in MapReduce mode, you need access to a Hadoop cluster and HDFS installation. MapReduce mode is the default mode; you can, but don’t need to, specify it using the -x flag (pig OR pig -x mapreduce).

The pig-0.11.1 installation is done in below versions of Linux and Hadoop respectively.


HADOOP 1.1.2

I have hduser as a dedicated hadoop system user. I had installed my Hadoop in /home/hduser/hadoop folder. Now I am going to install pig in /usr/lib/pig folder.

  • Download Pig from here.
  • Enter into the directory where the stable version is downloaded. By…

View original post 150 more words