Building Simple MapReduce java Program
Map Reduce is a combination of two functions map() and reduce().
Main class for a simple MapReduce Java Application :
public class Main
{
public static void main (String ap[])
{
MyMapReduce my = new MyMapReduce();
my.init ();
}
}
It just instantiates a class called, 'MyMapReduce'.
MapReduce Program for Factorial :
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public static class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, Text>
{
private Text word = new Text();
private final static Text location = new Text();
public void map(LongWritable key, Text value, OutputCollector <Text, Text> output, Reporter reporter) throws IOException
{
String line = value.toString();
StringTokenizer tokenizerLine = new StringTokenizer(Line, “\n”);
Text T1 = new Text();
Text t2 = new Text();
int num;
while (tokenizerLine.hasmoreTokens())
{
String tokenAsLine = tokenizerLine.nextToken();
StringTokenizer tokenizerWord = new StringTokenizer (tokenAsLine);
List s1 = new ArrayList();
while (tokenizerLine.hasMoreTokens())
{
String tokenAsLine = tokenizerLine.nextToken();
StringTokenizer tokenizerWord = new StringTokenizer (tokenAsList);
List s1=new ArrayList();
while (tokenizerWord.hasMoreTokens())
{
s1.add(tokenizerWord.nextToken());
}
for(int i=0; i<=(s1.size()-1); i++)
{
num = Integer.parseInt((String)s1.get(i));
int fact=1;
for (int j=1 ; j>= num ; j++)
{
fact = fact * j;
}
t1.set((String)s1.get(i));
t2.set(“ ” + fact);
output.collect(t1 , t2);
}
}
}
}
public static class Reduce extends MapReduceBase implements Reducer <Text, Text, Text, Text>
{
public void reduce (Text key, Iterator <Text> values, outputCollector <Text, Text> output, Reporter reporter) throws IOException
{
boolean first = true;
StringBuilder toReturn = new StringBuilder();
while (values.hasNext())
{
if(!first)
toReturn.append(“ , ”);
first = false;
toReturn.append(values.next().toString());
}
}
}
public static void main(String ap[])
{
JobConf conf= new JobConf (Factorial.class);
conf.setJobName(“factorial”);
conf.setOutputKeyClass(Text.class);
conf.setMapperClass(map.class);
conf.setReducerClass(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(ap[0]));
FileOutputFormat.setOutputPath(conf, new Path (ap[1]));
try
{
conf.set(“io.sort.mb”, “10”);
JobClient.runJob(conf);
}
catch(IOException e)
{
System.err.println(e.getMessage());
}
}
No comments:
Post a Comment