Deep Dive into Hadoop Map Reduce Part -2

Reading Time: 8 minutes

Prerequisite: Hadoop Basic and understanding of Deep Dive in Hadoop Map reduce Part -1 Blog.

MapReduce Tutorial: Introduction

In this MapReduce Tutorial blog, I am going to introduce you to MapReduce, which is one of the core building blocks of processing in the Hadoop framework. Before moving ahead, I would suggest you to get familiar with HDFS concepts which I have covered in my previous HDFS tutorial blog. This will help you to understand the MapReduce concepts quickly and easily.

Before we begin, let us have a brief understanding of the following.

What is Big Data?

Big Data Poster

Big Data is a collection of data that is huge in volume, yet growing exponentially with time. It is data with so large size and complexity that none of the traditional data management tools can store it or process it efficiently. Big data is also data but with huge size. A better example of Big Data would be the currently trending Social Media sites like Facebook, Instagram, WhatsApp, and YouTube.

Hadoop Size

What is Hadoop?

Hadoop is an open-source, Java-based framework used for storing and processing big data. The data is stored on inexpensive commodity servers that run as clusters. Its distributed file system enables concurrent processing and fault tolerance. Developed by Doug Cutting and Michael J. Cafarella, Hadoop uses the MapReduce programming model for faster storage and retrieval of data from its nodes. The framework is managed by Apache Software Foundation and is licensed under Apache License 2.0.

For years, while the processing power of application servers has been increasing manifold, databases have lagged behind due to their limited capacity and speed. However, today, as many applications are generating big data to be processed, Hadoop plays a significant role in providing a much-needed makeover to the database world.

From a business point of view, too, there are direct and indirect benefits. By using open-source technology on inexpensive servers that are mostly in the cloud (and sometimes on-premises), organizations achieve significant cost savings.

Additionally, the ability to collect massive data, and the insights derived from crunching this data, results in better business decisions in the real-world—such as the ability to focus on the right consumer segment, weed out or fix erroneous processes, optimize floor operations, provide relevant search results, perform predictive analytics, and so on.

Google released a paper on MapReduce technology in December 2004. This became the genesis of the Hadoop Processing Model. So, MapReduce is a programming model that allows us to perform parallel and distributed processing on huge data sets. The topics that I have covered in this MapReduce tutorial blog are as follows:

MapReduce Tutorial: Traditional Way

Big data flow

Let us understand, when the MapReduce framework was not there, how parallel and distributed processing used to happen in a traditional way. So, let us take an example where I have a electricity log containing the daily average and heavy use equipment records as below file sample record set where “HE” Stands for heavy equipment and

{“BuildingRegionIDN”:”ENWThh1565″,”HomeID”:”15465428421″,”category”:”office”,”Rooms”:[{“ES5″:”57″,”HE2″:”105″,”HE0″:”93″,”HE9″:”137″,”ES2″:”58″,”ES7″:”22”},{“HE13″:”56″,”HE14″:”109″,”HE5″:”27″,”ES1″:”93″,”ES12″:”45”},{“ES12″:”68″,”ES8″:”87″,”HE6″:”79″,”HE8″:”76″,”ES14″:”134″,”HE9″:”62”},{“HE9″:”31″,”HE6″:”44″,”ES5″:”59″,”ES7″:”139″,”HE5″:”104”},{“HE13″:”10″,”HE0″:”25″,”HE10″:”10″,”HE8″:”60″,”HE6″:”143”},{“ES13″:”11″,”ES7″:”127″,”HE10″:”96″,”ES5″:”130”},{“ES12″:”68″,”ES8″:”102″,”ES4″:”79″,”ES13″:”125″,”HE4″:”116″,”HE6″:”144”},{“HE7″:”38″,”ES10″:”83″,”ES1″:”86″,”ES5″:”53”},{“HE3″:”30″,”EC8″:”50″,”SE6″:”25”},{“CH6″:”20″,”EC3″:”38″,”HE6″:”55″}],”LOGFORDATE”:”25/03/2020″}

{“BuildingRegionIDN”:”ENWThh1565″,”HomeID”:”15465428433″,”category”:”Home”,”Rooms”:[{“HE13″:”56″,”HE14″:”109″,”HE5″:”27″,”ES1″:”93″,”ES12″:”45”},{“ES12″:”68″,”ES8″:”87″,”HE6″:”79″,”HE8″:”76″,”ES14″:”134″,”HE9″:”62”},{“HE9″:”31″,”HE6″:”44″,”ES5″:”59″,”ES7″:”139″,”HE5″:”104”},{“ES13″:”11″,”ES7″:”127″,”HE10″:”96″,”ES5″:”130”},{“ES12″:”68″,”ES8″:”102″,”ES4″:”79″,”ES13″:”125″,”HE4″:”116″,”HE6″:”144”},{“HE7″:”38″,”ES10″:”83″,”ES1″:”86″,”ES5″:”53”},{“HE3″:”30″,”EC8″:”50″,”SE6″:”25”},{“CH6″:”20″,”EC3″:”38″,”HE6″:”55″}],”LOGFORDATE”:”25/03/2020″}

{“BuildingRegionIDN”:”ENWThh1565″,”HomeID”:”15465428434″,”category”:”Home”,”Rooms”:[{“ES5″:”57″,”HE2″:”105″,”HE0″:”93″,”HE9″:”137″,”ES2″:”58″,”ES7″:”22”},{“ES12″:”68″,”ES8″:”87″,”HE6″:”79″,”HE8″:”76″,”ES14″:”134″,”HE9″:”62”},{“HE9″:”31″,”HE6″:”44″,”ES5″:”59″,”ES7″:”139″,”HE5″:”104”},{“HE13″:”10″,”HE0″:”25″,”HE10″:”10″,”HE8″:”60″,”HE6″:”143”},{“ES13″:”11″,”ES7″:”127″,”HE10″:”96″,”ES5″:”130”},{“HE3″:”30″,”EC8″:”50″,”SE6″:”25”},{“CH6″:”20″,”EC3″:”38″,”HE6″:”55″}],”LOGFORDATE”:”25/03/2020″}

of the years from 2012 to 2018. Here, I want to calculate the Total Heavy equipment uses per day for every home in each Month.

So, just like in the traditional way, I will split the data into smaller parts or blocks and store them in different machines. Then, I will find the Sum of all heavy equipment used in minutes in each part stored in the corresponding Home. At last, I will combine the results received from each of the Home to have the final output. Let us look at the challenges associated with this traditional approach:

  1. Critical path problem: It is the amount of time taken to finish the job without delaying the next milestone or actual completion date. So, if, any of the machines delay the job, the whole work gets delayed.
  2. Reliability problem: What if, any of the machines which are working with a part of data fails? The management of this failover becomes a challenge.
  3. Equal split issue: How will I divide the data into smaller chunks so that each machine gets even part of data to work with. In other words, how to equally divide the data such that no individual machine is overloaded or underutilized. 
  4. The single split may fail: If any of the machines fail to provide the output, I will not be able to calculate the result. So, there should be a mechanism to ensure this fault tolerance capability of the system.
  5. Aggregation of the result: There should be a mechanism to aggregate the result generated by each of the machines to produce the final output. 

These are the issues that I will have to take care of individually while performing parallel processing of huge data sets when using traditional approaches.

To overcome these issues, we have the MapReduce framework which allows us to perform such parallel computations without bothering about the issues like reliability, fault tolerance, etc. Therefore, MapReduce gives you the flexibility to write code logic without caring about the design issues of the system. 

MapReduce Tutorial: Advantages of MapReduce

The two biggest advantages of MapReduce are:

1. Parallel Processing:

In MapReduce, we are dividing the job among multiple nodes and each node works with a part of the job simultaneously. So, MapReduce is based on the Divide and Conquer paradigm which helps us to process the data using different machines. As the data is processed by multiple machines instead of a single machine in parallel, the time taken to process the data gets reduced by a tremendous amount as shown in the figure below (2).

Parallel Processing

2. Data Locality: 

Instead of moving data to the processing unit, we are moving the processing unit to the data in the MapReduce Framework.  In the traditional system, we used to bring data to the processing unit and process it. But, as the data grew and became very huge, bringing this huge amount of data to the processing unit posed the following issues: 

  • Moving huge data to processing is costly and deteriorates the network performance. 
  • Processing takes time as the data is processed by a single unit which becomes the bottleneck.
  • The master node can get over-burdened and may fail.  

Now, MapReduce allows us to overcome the above issues by bringing the processing unit to the data. So, as you can see in the above image that the data is distributed among multiple nodes where each node processes the part of the data residing on it. This allows us to have the following advantages:

  • It is very cost-effective to move processing unit to the data.
  • The processing time is reduced as all the nodes are working with their part of the data in parallel.
  • Every node gets a part of the data to process and therefore, there is no chance of a node getting overburdened. 

MapReduce Tutorial: Explanation of MapReduce Program

Before jumping into the details, let us have a glance at a MapReduce example program to have a basic idea about how things work in a MapReduce environment practically. I have taken the same electricity Heavy equipment uses count example where I have to find out the total minutes of heavy equipment used for each house. And Don’t worry guys, if you don’t understand the code when you look at it for the first time, just bear with me while I walk you through each part of the MapReduce code.

The entire MapReduce program can be fundamentally divided into three parts:

  • Mapper Phase Code
  • Reducer Phase Code
  • Driver Code

We will understand the code for each of these three parts sequentially.

MapReduce Tutorial: Explanation of MapReduce Program

Before jumping into the details, let us have a glance at a MapReduce example program to have a basic idea about how things work in a MapReduce environment practically. I have taken the same electricity Heavy equipment uses count example where I have to find out the total minutes of heavy equipment uses for each house. And Don’t worry guys, if you don’t understand the code when you look at it for the first time, just bear with me while I walk you through each part of the MapReduce code.

The entire MapReduce program can be fundamentally divided into three parts:

  • Mapper Phase Code
  • Reducer Phase Code
  • Driver Code

We will understand the code for each of these three parts sequentially.

Sample Data
Mapper code:
import java.io.IOException;    
import java.util.StringTokenizer;    
import org.apache.hadoop.io.IntWritable;    
import org.apache.hadoop.io.LongWritable;    
import org.apache.hadoop.io.Text;    
import org.apache.hadoop.mapred.MapReduceBase;    
import org.apache.hadoop.mapred.Mapper;    
import org.apache.hadoop.mapred.OutputCollector;    
import org.apache.hadoop.mapred.Reporter;    
public class ES_Mapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{    
    private final static IntWritable totalcount = new IntWritable(0);    
    private Text word = new Text();    
    public void map(LongWritable key, Text value,OutputCollector<Text,IntWritable> output,     
           Reporter reporter) throws IOException{    
        String line = value.toString();  
        String[] splitStrings =line.split("\"HE");
    int initial=0;
    String HomeID=null;  
    
    for(String w:splitStrings){  
     
    if(initial==0)
    {
         String[] splitStrings0 =w.split("HomeID");
         String[] splitStrings1 =splitStrings0[1].split(":");
         String[] splitStrings2 =splitStrings1[1].split("\"");
         HomeID=new String(splitStrings2[1]); 
        initial++;
    }
    else{
         String[] splitStrings1 =w.split(":");
         String[] splitStrings2 =splitStrings1[1].split("\"");
         totalcount=totalcount+Integer.parseInt(splitStrings2[1]);
    }
        }  
        
            output.collect(HomeID, totalcount);    
            
    }    
    
}
  • We have created a class Map that extends the class Mapper which is already
  • defined in the MapReduce Framework.
  • We define the data types of input and output key/value pair after the class declaration using angle brackets.
  • Both the input and output of the Mapper is a key/value pair. 
  • Input:
    • The key is nothing but the offset of each line in the text file: LongWritable
    • The value is each individual line (as shown in the figure at the right): Text(Log Record For Each Home)
  • Output:
    • The key is the tokenized HouseID: Text
    • We have the evaluated value in our case which is sum of all heavy equipments uses : IntWritable
    • Example – 1, HouseID=15465428421  Total Heavy Equipment. Uses 1650
Reducer Code:
import java.io.IOException;    
import java.util.Iterator;    
import org.apache.hadoop.io.IntWritable;    
import org.apache.hadoop.io.Text;    
import org.apache.hadoop.mapred.MapReduceBase;    
import org.apache.hadoop.mapred.OutputCollector;    
import org.apache.hadoop.mapred.Reducer;    
import org.apache.hadoop.mapred.Reporter;    
        
public class ES_Reducer  extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable> {    
    public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,    
     Reporter reporter) throws IOException {    
    int sum=0;    
    while (values.hasNext()) {    
    sum+=values.next().get();    
    }    
    output.collect(key,new IntWritable(sum));    
    }    
}
  • We have created a class Reduce which extends class Reducer like that of Mapper.
  • We define the data types of input and output key/value pair after the class declaration using angle brackets as done for Mapper.
  • Both the input and the output of the Reducer is a key-value pair.
  • Input:
    • The key nothing but those unique words which have been generated after the sorting and shuffling phase: Text
    • The value is a list of integers corresponding to each key: IntWritable
    • Example – [HouseID,Totaluses for a day], etc.
  • Output:
    • The key is all the unique words present in the input text file: Text
    • The value is the number of occurrences of each of the unique words: IntWritable
    • Example – HomeID and overall uses for a month . 
  • We have aggregated the values present in each of the list corresponding to each key and produced the final answer.
  • In general, a single reducer is created for each of the unique words, but, you can specify the number of reducer in mapred-site.xml.
Driver Code:
 import java.io.IOException;    
 import org.apache.hadoop.fs.Path;    
 import org.apache.hadoop.io.IntWritable;    
 import org.apache.hadoop.io.Text;    
 import org.apache.hadoop.mapred.FileInputFormat;    
 import org.apache.hadoop.mapred.FileOutputFormat;    
 import org.apache.hadoop.mapred.JobClient;    
 import org.apache.hadoop.mapred.JobConf;    
 import org.apache.hadoop.mapred.TextInputFormat;    
 import org.apache.hadoop.mapred.TextOutputFormat;    
 
   public class ES_Runner {    
        public static void main(String[] args) throws IOException{    
            JobConf conf = new JobConf(ES_Runner.class);    
            conf.setJobName("Heavy Used Total");    
            conf.setOutputKeyClass(Text.class);    
            conf.setOutputValueClass(IntWritable.class);            
            conf.setMapperClass(ES_Mapper.class);    
            conf.setCombinerClass(ES_Reducer.class);    
            conf.setReducerClass(ES_Reducer.class);         
            conf.setInputFormat(TextInputFormat.class);    
            conf.setOutputFormat(TextOutputFormat.class);           
            FileInputFormat.setInputPaths(conf,new Path(args[0]));    
            FileOutputFormat.setOutputPath(conf,new Path(args[1]));     
            JobClient.runJob(conf);    
        }    
 }   
  • In the driver class, we set the configuration of our MapReduce job to run in Hadoop.
  • We specify the name of the job, the data type of input/output of the mapper and reducer.
  • We also specify the names of the mapper and reducer classes.
  • The path of the input and output folder is also specified.
  • The method setInputFormatClass () is used for specifying how a Mapper will read the input data or what will be the unit of work. Here, we have chosen TextInputFormat so that a single line is read by the mapper at a time from the input text file.
  • The main () method is the entry point for the driver. In this method, we instantiate a new Configuration object for the job.
Run the MapReduce code:
  1. Create the jar file of this program and name it HeavyUseCalculator.jar.
  2. Run the jar file
    hadoop jar /home/saurabhd/ HeavyUseCalculator.jar /testHOMEAUTOMATE/data.txt /r_output
  3. The output is stored in /r_output/part-00000
  4. Now execute the command to see the output.
    hdfs dfs -cat /r_output/part-00000