Running Job on Hadoop

Introduction:
I am going demonstrate how to create sample Map Reduce Application and how to run it on Hadoop as Job.

Prerequisite:

Hadoop Installed on VM in pseudo distributed mode.

Writing MapReduce App in Java For Word Count-

1) Create Sample Java Project

Add Follwing files-
Map.java

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}

Reduce.java

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

WordCount.java

package com.hadoop.mapreduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);

}
}

2)Add hadoop-core-x.x.jar in build path-
You will get this jar in top level directory where you have extracted Hadoop tarball while installation.

3)Creating Jar

While Creating Jar file Specify Main file-WordCount.java

with Eclipse- File->Export->Jar

with CLI-
change directory to top level directory of java project.
Create manifest file: manifest
with content-
Main-Class: WordCount.java

# jar cvfm WordCount.jar manifest *

4)Start Hadoop Services-

# start-all.sh


5)Create Input files in HDFS

The Java App requires input text file to perform Word Count.
First we need to create one text file on local VM and then copy it to HDFS.
/root/hadoop/temp/file1 file from local machine.

Create directory in HDFS where we can move file from local machine-

# hdfs dfs -mkdir /root/hadoop/app/
hadoop dfs -copyFromLocal /root/hadoop/temp/file1 /root/hadoop/app/input

This command creates input file at path /root/hadoop/app/ in HDFS.
You can view file with-

# hadoop dfs -cat /root/hadoop/app/input

6) To run it as Hadoop Job-

# hadoop jar jar_path input_file_path output_file_path

jar_path is path from local machine
input_file_path is path from HDFS
output_file_path is path from HDFS

output_file is created at run-time, it should not be existing HDFS file

For Example-

[root@localhost App]# hadoop jar /root/hadoop/app/WordCount.jar /root/hadoop/app/input /root/hadoop/app/output
13/04/03 03:44:31 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/04/03 03:44:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/04/03 03:44:31 WARN snappy.LoadSnappy: Snappy native library not loaded
13/04/03 03:44:31 INFO mapred.FileInputFormat: Total input paths to process : 1
13/04/03 03:44:33 INFO mapred.JobClient: Running job: job_201304030247_0012
13/04/03 03:44:34 INFO mapred.JobClient: map 0% reduce 0%
13/04/03 03:45:10 INFO mapred.JobClient: map 100% reduce 0%
13/04/03 03:45:37 INFO mapred.JobClient: map 100% reduce 100%
13/04/03 03:45:42 INFO mapred.JobClient: Job complete: job_201304030247_0012
13/04/03 03:45:42 INFO mapred.JobClient: Counters: 30
13/04/03 03:45:42 INFO mapred.JobClient: Job Counters
13/04/03 03:45:42 INFO mapred.JobClient: Launched reduce tasks=1
13/04/03 03:45:42 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=46083
13/04/03 03:45:42 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/03 03:45:42 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/04/03 03:45:42 INFO mapred.JobClient: Launched map tasks=2
13/04/03 03:45:42 INFO mapred.JobClient: Data-local map tasks=2
13/04/03 03:45:42 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=24580
13/04/03 03:45:42 INFO mapred.JobClient: File Input Format Counters
13/04/03 03:45:42 INFO mapred.JobClient: Bytes Read=28
13/04/03 03:45:42 INFO mapred.JobClient: File Output Format Counters
13/04/03 03:45:42 INFO mapred.JobClient: Bytes Written=26
13/04/03 03:45:42 INFO mapred.JobClient: FileSystemCounters
13/04/03 03:45:42 INFO mapred.JobClient: FILE_BYTES_READ=48
13/04/03 03:45:42 INFO mapred.JobClient: HDFS_BYTES_READ=210
13/04/03 03:45:42 INFO mapred.JobClient: FILE_BYTES_WRITTEN=64607
13/04/03 03:45:42 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=26
13/04/03 03:45:42 INFO mapred.JobClient: Map-Reduce Framework
13/04/03 03:45:42 INFO mapred.JobClient: Map output materialized bytes=54
13/04/03 03:45:42 INFO mapred.JobClient: Map input records=1
13/04/03 03:45:42 INFO mapred.JobClient: Reduce shuffle bytes=54
13/04/03 03:45:42 INFO mapred.JobClient: Spilled Records=8
13/04/03 03:45:42 INFO mapred.JobClient: Map output bytes=34
13/04/03 03:45:42 INFO mapred.JobClient: Total committed heap usage (bytes)=246751232
13/04/03 03:45:42 INFO mapred.JobClient: CPU time spent (ms)=4680
13/04/03 03:45:42 INFO mapred.JobClient: Map input bytes=18
13/04/03 03:45:42 INFO mapred.JobClient: SPLIT_RAW_BYTES=182
13/04/03 03:45:42 INFO mapred.JobClient: Combine input records=4
13/04/03 03:45:42 INFO mapred.JobClient: Reduce input records=4
13/04/03 03:45:42 INFO mapred.JobClient: Reduce input groups=4
13/04/03 03:45:42 INFO mapred.JobClient: Combine output records=4
13/04/03 03:45:42 INFO mapred.JobClient: Physical memory (bytes) snapshot=364314624
13/04/03 03:45:42 INFO mapred.JobClient: Reduce output records=4
13/04/03 03:45:42 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2140573696
13/04/03 03:45:42 INFO mapred.JobClient: Map output records=4

7) To view Output file-

[root@localhost App]# hadoop dfs -cat /root/hadoop/app/output

 

Alternatively you can below code also, Which is updated.

package org.apache.hadoop;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

	public static class TokenizerMapper extends
			Mapper {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends
			Reducer {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		// String[] otherArgs = new GenericOptionsParser(conf,
		// args).getRemainingArgs();
		if (args.length != 2) {
			System.err.println("Usage: wordcount  ");
			System.exit(2);
		}
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
Advertisements

One response to “Running Job on Hadoop

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s