How to Write a MapReduce Program in Java

This tutorial provides a step by step tutorial on writing your first hadoop mapreduce program in java. This tutorial uses gradle build system for the mapreduce java project. This program requires a running hadoop installation.

Quick Introduction to MapReduce

MapReduce is a programming framework which enables processing of very large sets of data using a cluster of commodity hardware. It works by distributing the processing logic across a large number machines each of which will apply the logic locally to a subset of the data. The final result is consolidated and written to the distributed file system.

There are mainly 2 components of a mapreduce program. The mapper and the reducer. The mapper operates on the data to produce a set of intermediate key/value pairs. This data is then fed to a reducer with the values grouped on the basis of the key. The reducer computes the final result operating on the grouped values.

Problem Statement for the MapReduce Program

Problem Statement: Using mapreduce framework, find the frequency of characters in a very large file (running into a few terabytes!). The output consists of two columns - The ASCII character and the number of occurrences of the character in the input file.

We solve this problem using three classes - mapper, reducer and the driver. The driver is the entry point for the mapreduce program. Hadoop mapreduce will use the configured mapper and reducer to compute the desired output.

Prerequisites for Java MapReduce Program

  • Java 1.8 or above
  • Gradle 3.x or above

Creating the MapReduce Java Project in Gradle

Run the following command on console to create a simple Java project in gradle. Ensure that gradle and java is already installed on the system.

gradle init --type java-application

This creates an initial set of files for the Java gradle project. Delete App.java and AppTest.java from the new project (contained in src/main/java and src/test/java folders).

Configuring the MapReduce Gradle Build

Replace the build.gradle in the project with the following,

apply plugin: 'java-library'
apply plugin: 'application'

mainClassName = "AlphaCounter"

jar {
    manifest { attributes 'Main-Class': "$mainClassName" }
}
repositories { jcenter() }

dependencies { compile 'org.apache.hadoop:hadoop-client:2.7.3' }

Writing the Mapper Class

Copy the following class to the src/main/java folder. This is the mapper class for our mapreduce program. The mapreduce framework will pass each line of data as the value variable to the map function. Our program will convert it into a key/value pair where each character becomes a key and the value is set as 1.

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// A mapper class converting each line of input into a key/value pair
// Each character is turned to a key with value as 1
public class AlphaMapper extends Mapper<Object, Text, Text, LongWritable> {
    private final static LongWritable one = new LongWritable(1);
    private Text character = new Text();

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String v = value.toString();
        for (int i = 0; i < v.length(); i++) {
            character.set(v.substring(i, i + 1));
            context.write(character, one);
        }
    }
}

Writing the Reducer Class

Now copy the following reducer function to src/main/java folder. The mapreduce program will collect all the values for a specific key (a character and its occurrence count in our example) and pass it to the reduce function. Our function computes the total number of occurrences by adding up all the values.

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

// Calculate occurrences of a character
public class AlphaReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    private LongWritable result = new LongWritable();

    public void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

Writing the MapReduce Entry point Class

Finally copy the main entry point class for our mapreduce program. This sets up the mapreduce job including the name of mapper and reducer classes.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// The driver program for mapreduce job.
public class AlphaCounter extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AlphaCounter(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = this.getConf();

        // Create job
        Job job = Job.getInstance(conf, "Alpha Counter Job");
        job.setJarByClass(AlphaCounter.class);

        job.setMapperClass(AlphaMapper.class);
        job.setReducerClass(AlphaReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setInputFormatClass(TextInputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setOutputFormatClass(TextOutputFormat.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

Running the Java MapReduce Program

Run the following command from the project folder to create a jar file for our project,

gradle jar

Copy the jar created to the hadoop home folder. Open a command window and navigate to the hadoop home folder.

First create a simple text file with the content "Hello World" and save it as input.txt. Upload the file to HDFS file system using the following command. This will copy the file to hdfs home folder.

bin/hdfs dfs -put input.txt .

Finally run the mapreduce program from the command line,

bin/hadoop jar mapreducedemo.jar ./input.txt output

Viewing the MapReduce Output

Run the following command to view the output of the mapreduce program,

bin/hdfs dfs -cat output/*

The console output consists of every character in "Hello World" and the number of occurrences of each character as shown below.

     1
H    1
W    1
d    1
e    1
l    3
o    2
r    1

Using a Reducer Program as Combiner

It is possible in mapreduce to configure the reducer as a combiner. A combiner is run locally immediately after execution of the mapper function. Since it is run locally, it substantially improves the performance of the mapreduce program and reduces the data items to be processed in the final reducer stage. Note that combiner can only be used in functions which are commutative and associative.

Add the following line to AlphaCounter.java to configure the reducer as the combiner,

job.setCombinerClass(AlphaReducer.class);