Aggregation on Text Fields in the MapReduce Example

Introduction

Following is a simple MapReduce example which is a little different than the standard “Word Count” example in that it takes (tab) delimited text, and counts the occurrences of values in a certain field. More details about the implementing it are described below.

package com.npntraining.hadoop.mapreduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;

public class FieldCountsExample {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            String[] fields = line.split("\t");
            String firstName = fields[1];
            word.set(firstName);
            output.collect(word, one);
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }
    }


    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(FieldCountsExample.class);
        conf.setJobName("Field Counts");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setCombinerClass(Reduce.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path("/data"));
        FileOutputFormat.setOutputPath(conf, new Path("/tmpout"));

        JobClient.runJob(conf);
    }
}

You can see above in the Map class that each line of text is split using split("\t"). The second field in the resulting array is used as the map key, and 1 is use as it’s value. This data is then aggregated in the combiner(optional) and reducer until the final result of a count by first name is available in HDFS.

 

Here is how to set things up to run the above MapReduce job:
Create an Executable Jar containing your MapReduce classes

This can be done a variety of ways. This example assumes Maven is being used.

mvn package #creates npntraining-hadoop-1.0-SNAPSHOT.jar used below

 

Create a working Hadoop instance
You must first have a working Hadoop installation to run this on. I personally like to create a Docker container using the sequenceiq/docker-spark image.

 

Create an HDFS directory for your input data
If you do not have an HDFS directory containing the data you want to aggregate, create one.
hadoop fs -mkdir /data

 

Add data to your HDFS directory

Add text file(s) to your newly created HDFS directory.

hadoop fs -put npndatagen_people_wHeader_v1_5k.txt /data

 

Run program from the command line
hadoop jar npntraining-hadoop-1.0-SNAPSHOT.jar \
com.npntraining.hadoop.mapreduce.FieldCountsExample

 

Print output from HDFS
hadoop fs -cat /tmpout /*

Related Post