Monday, December 6, 2010

Use Hadoop Cluster in Astronomy

                 Using Hadoop Cluster in Astronomy
 
                              Astrophysics is a branch of Physics dealing with many fundamental things about the nature of the universe. e.g. Studying properties of dark matter & the nature of dark matter & the nature of dark energy.
To accomplish this goal it requires new methodologies for analysing & understanding petascale data sets ( i.e. data being collected at a rate 1000X greater than current surveys.) This research focusses on exploring an emerging paradigm for data intensive applications MapReduce & how it scales to the analysis of astronomical images.

Map Reduce in Astronomy :
 
               In order to exploit the elastic nature of the computational cloud, where many computers can be used at the same time, one requires an efficient way of writing parallel programs. The High Performance Computing [HPC] community has been developing such programs for roughly 20 years. The mapreduce model allows programmers to write a map function, which takes a key / value pair (e.g. id and file-name), operates on it (performs object detection) and returns a new set of key / value pairs (source list). The reduce function then aggregates / merges all the intermediate data (builds an object catalog on a stacked source list). Many problems in astronomy naturally fall into this model because of the inherent parallelizability of many astronomical tasks. The benefits are thatmapreduce is easy to write & the framework provides automatic load balancing.

1] Image Mosaicing is a general tool required by Astronomers. The SLOAN DIGITAL SURVEY [ SDSS] alone generated 1.3 million astronomical images. Combining these files to form larger composite images or to stack the individual images to detect faint sources enables a broad range of science questions to be detected (from the detection of moving asteroids that are too faint to be seen on a single image to the identification of very faint, high-red shift galaxies).
Next generation astronomical surveys such as the LSST will generate 30 TB of images per night, detecting transient sources, moving objects & hundreds of millions of stars & galaxies.
The mapreduce model specifies a computation that takes a set of input key / value pairs & produces a set of output values. We divide the computation into 3 distinct phases: Map, Reduce & Shuffle.
Shuffle is optional capability of Reduce mechanism, but since we make heavy use of it, we will therefore consider it a unique phase.
Map takes an input pair & 'emits' a set of intermediate key / value pairs. These intermediate pairs are then shuffled among processors by means of a user-supplied partitioning function. The reduce method then operates on the shuffled key / value pairs and returns a set of values. In this user-supplied methods in each phase can only see local data. Data is transmitted among compute nodes only between phases.

Friday, November 12, 2010

Building Simple MapReduce java Program

         Building Simple MapReduce java Program

Map Reduce is a combination of two functions map() and reduce().

Main class for a simple MapReduce Java Application :

public class Main
{
public static void main (String ap[])
{
MyMapReduce my = new MyMapReduce();
my.init ();
}
}

It just instantiates a class called, 'MyMapReduce'.

MapReduce Program for Factorial :


import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;


public static class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, Text>
          {
             private Text word = new Text();
             private final static Text location = new Text();
             public void map(LongWritable key, Text value, OutputCollector <Text,                                         Text> output, Reporter reporter) throws IOException
               {
                   String line = value.toString();
                   StringTokenizer tokenizerLine = new StringTokenizer(Line, “\n”);
                   Text T1 = new Text();
                   Text t2 = new Text();
                   int num;
                   while (tokenizerLine.hasmoreTokens())
                      {
                          String tokenAsLine = tokenizerLine.nextToken();
                          StringTokenizer tokenizerWord = new StringTokenizer                                                                      (tokenAsLine);
                          List s1 = new ArrayList();
                          while (tokenizerLine.hasMoreTokens())
                               {
                                     String tokenAsLine = tokenizerLine.nextToken();
                                     StringTokenizer tokenizerWord = new StringTokenizer                                                                              (tokenAsList);
                                     List s1=new ArrayList();
                                     while (tokenizerWord.hasMoreTokens())
                                         {
                                             s1.add(tokenizerWord.nextToken());
                                         }
                                     for(int i=0; i<=(s1.size()-1); i++)
                                         {
                                             num = Integer.parseInt((String)s1.get(i));
                                             int fact=1;
                                     for (int j=1 ; j>= num ; j++)
                                        {
                                            fact = fact * j;
                                        }
                                    t1.set((String)s1.get(i));
                                    t2.set(“ ” + fact);
                                    output.collect(t1 , t2);
                                 }
                           }
                    }
             }


public static class Reduce extends MapReduceBase implements Reducer <Text,                                              Text, Text, Text>
          {
                public void reduce (Text key, Iterator <Text> values, outputCollector                          <Text, Text> output, Reporter reporter) throws IOException
                     {
                           boolean first = true;
                           StringBuilder toReturn = new StringBuilder();
                           while (values.hasNext())
                                {
                                    if(!first)
                                    toReturn.append(“ , ”);
                                    first = false;
                                    toReturn.append(values.next().toString());
                                }
                    }
          }


public static void main(String ap[])
{
    JobConf conf= new JobConf (Factorial.class);
    conf.setJobName(“factorial”);
    conf.setOutputKeyClass(Text.class);
    conf.setMapperClass(map.class);
    conf.setReducerClass(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(ap[0]));
    FileOutputFormat.setOutputPath(conf, new Path (ap[1]));
    try
      {
          conf.set(“io.sort.mb”, “10”);
          JobClient.runJob(conf);
      }
    catch(IOException e)
       {
           System.err.println(e.getMessage());
       }
}

Tuesday, November 2, 2010

Building HADOOP CLUSTER [ Using 2 Linux Machines ]

Building HADOOP CLUSTER [Using 2 Linux Machines]

STEP 1) Install Java 6 or above on Linux machine ( jdk1.6.0.12 )
I am having 'jdk-6u12-linux-i586.bin' on my REDHAT machine.
To Install follow commands :
# chmod 744 jdk-6u12-linux-i586.bin
# ./ jdk-6u12-linux-i586.bin


STEP 2) Download 'jce-policy-6.zip'
extract it.
# cp -f jce/*.jar $JAVA_HOME/jre/lib/seciruty/
# chmod 444 $JAVA_HOME/jre/lib/seciruty/*.jar


STEP 3) Download hadoop-0.20.0.tar.gz or any latest version
extract it and copy ' hadoop-0.20.0' folder to '/usr/local/' directory.

STEP 4) Set JAVA PATH
# export JAVA_HOME=/java_installation_folder/jdk1.6.0_12
STEP 5) Set HADOOP PATH
# export HADOOP_HOME=/usr/local/hadoop-0.20.2
# export PATH=$PATH:SHADOOP_HOME/bin

Install same on second Linux machine
Then Description of machines is :

Server IP                             HostName                                Role

1) 192.168.100.19             hostmaster         Master [ NameNode and JobTracker ]
2) 192.168.100.17             hostslave            Slave [ Datanode and TaskTracker]


STEP 6) Now do following settings on Master :

# vim /etc/hosts
make changes as...
comment all and write at the end
192.168.100.19 hostmaster
save and exit

Changes to be made on Slave Machine :

# vim /etc/hosts
make changes as...
comment all and write at the end
192.168.100.17 hostslave
192.168.100.19 hostmaster
save and exit

STEP 7) For Communication setup SSH :

Do the steps on master as well as on slave-
# ssh-keygen -t rsa
it generates the RSA public & private keys.
This is because Hadoop Master Node communicates with Slave Node using SSH.
This will generate 'id_rsa.pub' file under '/root/.ssh' directory. Now rename the Master's id_rsa.pub to '19_rsa.pub' and copy it to Slave Node (at same path).
Then execute the following command to add the Master's public key to the Slave's authorized keys.

# cat /root/.ssh/19_rsa.pub >> /root/.ssh/authorized_keys

Now try to ssh the Slave Node. It should be connected without needing any password.

# ssh 192.168.100.17

STEP 8) Setting up MASTER NODE :
Setup Hadoop to work in a fully distributed mode by configuring the configuration files under the $HADOOP_HOME/conf/ directory.

Configuration Property :
Property                                               Explanation
1) fs.default.name                              NameNode URI
2) mapred.job.tracker                       JobTracker URI
3) dfs.replication                                Number of replication
4) hadoop.tmp.dir (optional)              Temp Directory

Let us Start with Configuration files :

1) $HADOOP_HOME/conf/hadoop-env.sh
make change as...
export JAVA_HOME=/java_installation_folder/jdk1.6.0_12

2) $HADOOP_HOME/conf/core-site.xml

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

3) $HADOOP_HOME/conf/hdfs-site.xml

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

4) $HADOOP_HOME/conf/mapred-site.xml

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>

5) $HADOOP_HOME/conf/masters
192.168.100.19

6) $HADOOP_HOME/conf/slaves
192.168.100.17

Now copy all these files to /conf directory of SLAVE Machine.

STEP 9) Setup Master and Slave Node : (run on both machines)

# hadoop namenode -format
# start-all.sh


Now your Cluster is Ready to run Jobs

Tuesday, October 26, 2010

Planet Hadoop...!!!

Hadoop

Hadoop is an open source Java platform. It lets one easilt write and run distributed applications on large compter clusters to process vast amounts of data. It implements HDFS [Hadoop Distributed File System].

HDFS:

HDFS is a distributed filesystem that runs on large cluster of commodity machines. HDFS divides applications into many blocks of work. HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. Mapreduce can then process the data where it is located. HDFS has a Master / Slave architecture. A HDFS cluster consists of a single NameNode- a master server that manages the file system namespaces & regulates access to files by clients. In addition there are no. of DataNodes, usually one per node in the cluster, which manage storage attached to the modes that they run on. HDFS exposes a file system Namespace & allows user data to be stored in files. Internally a file is split into one or more blocks & these blocks are stored in a set of DataNodes. The NameNode executes file system Namespace operations like opening, closing & renaming files & directories. It also determines mapping of blocks to DataNodes. The DataNodes are responsible for serving read & write requests from the file system's clients. The DataNodes also perform block creation, deletion & replication upon instruction from the NameNode.


MapReduce:
 
It is breaking a problem into independent pieces to be worked on in parallel. MapReduce is an abstraction that allows Google's engineers to perform simple computations, while hiding the details of parallelisation, data distribution, load balancing & fault tolerance.

MapReduce as a Programming Model:

MAP: it is written by a user of the MapReduce library, takes on input pair & produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key and passes them to the Reduce function.

REDUCE: it is also written by the user, it accepts intermediate key & a set of values for that key. It merges together these values to form a possibly smaller set of values.
The MapReduce framework consists of a single master Jobtracker and on slave Tasktracker per cluster node. The master is responsible for scheduling the job's component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master.
The Hadoop job client then submits the job (jar / executables) & configuraions to the Jobtracker which then assumes the responsibility of distributing the software / configuration to the slaves, scheduling tasks & monitoring them, providing status & diagnostic information to the job client.

PIG : A dataflow language and execution environment for exploring very large datasets. Pig runs on HDFS and MapReduce clusters.

HBASE : A distributed column oriented database. Hbase uses HDFS for its underlying storage and supports batch-style computations using MapReduce & point queries.

Zookeepers : A distributed, Highly available coordination service. Zookeeper provides primitives such as distributed locks that can be used for building distributed applications.

Hive : A distributed data warehouse. Hive manages data stored in HDFS & provides a query language based on SQL for querying the data.

Chukwa : A distributed data collection and analysis system. Chukwa runs collectors that store data in HDFS and it uses MapReduce to produce reports.


Hadoop Default Ports :

1) HDFS : Namenode 50070 dfs.http.address
                 Datanode 50075 dfs.datanode.http.addresss
                 secondary Namenode 50090 dfs.secondary.http.address
                 backup/checkpoint node 50105
2) MR :     JobTracker 50030 mapred.job.tracker.http.address
                 Tasktracker 50060 mapred.task.tracker.http.address



Hadoop Installation On Linux:

Installing Java:

before installing hadoop on linux we need to install java 6 or above on linux.
It is available at http://java.sun.com/products/archive/j2se/6u12
I am using jdk-6u12-linux-i586.bin
you can choose the version 32 bit or 64 bit as per your configuration and o/s.
I am currently using redhat linux 5 (32 bit).

Download the java package then allow permissions i.e.
# chmod 744 jdk-6u12-linux-i586.bin
# ./jdk-6u12-linux-i586.bin


After that set $JAVA_HOME path i.e.
# export JAVA_HOME=<build tool directory>/jdk1.6.0_12

we also need some necessary files-
download jce_policy-6.zip
extract it using...
#unzip jce_policy-6.zip
# cp -f jce/*.jar $JAVA_HOME/jre/lib/security/


# chmod 744 $JAVA_HOME/jre/lib/security/*.jar

Installing Hadoop:

Download hadoop-0.20.2.tar.gz file from
http://www.higherpass.com/linux/

or give command-
# wget http://www.gtlib.gatech.edu/pub/apache/hadoop/core/hadoop-0.20.2

After downloading of tar file we need to extract it, for that

# tar -xvzf hadoop-0.20.2.tar.gz

then copy that hadoop installation folder to '/usr/local' location-
# cp -r hadoop-0.20.2/ /usr/local

Hadoop Setup :

Setup HADOOP_HOme environment cariable to the install directory & append $HADOOP_HOME/bin to PATH environment variable.

# export HADOOP_HOME=/usr/local/hadoop-0.20.2
# export PATH=$PATH:$HADOOP_HOME/bin


Now configure JAVA_HOME path at the '/usr/local/hadoop-0.20.2/conf/hadoop-env.sh' file.

Hadoop Pseudo Distributed Cluster:

1) edit /conf/core-site.xml and make changes as...

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

2) edit /conf/hdfs-site.xml and make changes as...

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

3) edit /conf/mapred-site.xml and make changes as...

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>

After that setup password less ssh with keys. Hadoop communicates over 'ssh' so we need to setup ssh keys, Using ssh Agents & tunnels.

# ssh-keygen -t dsa -p ' ' -f ~/.ssh/id_dsa

# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys



Format the HDFS file system to prepare it for use. This creates the needed directory structures for the HDFS filesystem.

# hadoop namenode -format

now everything is configured & setup start the daemons for that give command-

# start-all.sh


Running A Job on Cluster Mode:

1)create a simple file say “test.txt”

# vim test.txt
hello this is a sample file for hadoop
hello this file contains simple data.

Save this file and exit

2) Insert this file into hdfs
# hadoop dfs -put test.txt /

The wordcount program is present at '/usr/local/hadoop-0.20.2/src/examples/org/apache/hadoop/examples/WordCount.java'

The Program WordCount.java is:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{


private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}



public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}



And the jar file is present at '/usr/local/hadoop-0.20.2/hadoop-0.20.2-examples.jar'

To run program give command :

# hadoop jar hadoop-0.20.2-examples.jar wordcount /test.txt /out

You will get following output on screen

10/10/27 10:26:16 INFO input.FileInputFormat: Total input paths to process : 1
10/10/27 10:26:16 INFO mapred.JobClient: Running job: job_201010270916_0005
10/10/27 10:26:17 INFO mapred.JobClient: map 0% reduce 0%
10/10/27 10:26:27 INFO mapred.JobClient: map 100% reduce 0%
10/10/27 10:26:39 INFO mapred.JobClient: map 100% reduce 100%
10/10/27 10:26:41 INFO mapred.JobClient: Job complete: job_201010270916_0005
10/10/27 10:26:41 INFO mapred.JobClient: Counters: 17
10/10/27 10:26:41 INFO mapred.JobClient: Job Counters
10/10/27 10:26:41 INFO mapred.JobClient: Launched reduce tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: Launched map tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: Data-local map tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: FileSystemCounters
10/10/27 10:26:41 INFO mapred.JobClient: FILE_BYTES_READ=50
10/10/27 10:26:41 INFO mapred.JobClient: HDFS_BYTES_READ=20
10/10/27 10:26:41 INFO mapred.JobClient: FILE_BYTES_WRITTEN=132
10/10/27 10:26:41 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=28
10/10/27 10:26:41 INFO mapred.JobClient: Map-Reduce Framework
10/10/27 10:26:41 INFO mapred.JobClient: Reduce input groups=4
10/10/27 10:26:41 INFO mapred.JobClient: Combine output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Map input records=1
10/10/27 10:26:41 INFO mapred.JobClient: Reduce shuffle bytes=50
10/10/27 10:26:41 INFO mapred.JobClient: Reduce output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Spilled Records=8
10/10/27 10:26:41 INFO mapred.JobClient: Map output bytes=36
10/10/27 10:26:41 INFO mapred.JobClient: Combine input records=4
10/10/27 10:26:41 INFO mapred.JobClient: Map output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Reduce input records=4



the output will be stored at /out/part-00000 file, provided the output folder should not be there before running the program. Otherwise you will get an error.

To remove file from HDFS give command:
# hadoop dfs -rm /test.txt

to remove contents of entire directory recursively give command;
# hadoop dfs -rmr    /

To check the output give command:
# hadoop dfs -cat /out/part-00000    or    # hadoop dfs -cat /out/part-r-00000

you get following data stored in it...
a 1
contains 1
data. 1
file 2
for 1
hadoop 1
hello 2
is 1
sample 1
simple 1
this 2



To check contents of HDFS file system give command:

# hadoop dfs -ls

Explaination of WordCount.java :

The program needs 3 classes to run: a Mapper, a Reducer and a Driver. The Driver tells Hadoop how to run the MapReduce process. The Mapper and Reducer operate on data.

1) Mapping List : The first phase of MapReduce program is 'Mapping'. A list of data elements are provided, one at a time to a function called, ' Mapper', which transforms each element individually to an output data element.

2) Reducing List : Reducing here means aggregating value together and returns a single output.

3) Keys and Values : the mapping & reducing functions receive not just values, but (key, value) pairs. The default input format used by Hadoop presents each line of an Input file as a separate input to the Mapper function and not the entire file at a time.

4) StringTokenizer object : used to break up the line into words.

5) Output.collect() : this method will copy the values it receives as input, so we are free to overwrite variables we use.

6) Driver Method : there is one final component of a Hadoop MapReduce program called, ' Driver'. The Driver initiates the job & instructs the Hadoop platform to execute your code on a set of input files & controls where the output files are placed.

7) InputPath Argument : given input directory.

8) OutputPath Argument : Directory in which output from reducers are written into files.

9) JobClient object : it captures the configuration information to run job. Mapping and Reducing functions are identified by setMapperClass() & setreducerClass() methods.
Data types emitted by the reducer are identified by setOutputKeyClass() and setOutputValueClass(). If this is not the case, the methods of the JobConf class will override these. The input types fed to the Mapper are controlled by the InputFormat used. The default Input Format, “TextInputFormat” will load data in as (LongWritable, Text) pairs. The Long value is byte offset of the line in file. The text object holds the string contents of the line of the file.
The text object holds the string contents of the line of the file. The call to JobClient.runJob(conf) will submit the job to Mapreduce. This will block until the job completes. If the job fails, it will throw an IOException. JobClient also provides a non-blocking version called, 'submitJob()'.

10) InputFormat : it is a class that provides following functionality:-
     a) Selects the files or other objects that should be used for input.
     b) Defines the InputSplits that break a file into tasks.
     c) Provides a factory for RecordReader Object that read the file.
         e.g. FileInputFormat : it is provided with a path containing files to read. The FileInputFormat will read all files in this directory. Then divides these files into one or more InputSplit each. We can choose which InputFormat to apply to our input files for a job by calling the setInputFormat() method of the JobConf object that defines the job.

11) Input Split : An InputSplit describes a unit of work that comprises a single map task in a MapReduce program.

12) RecordReader : the InputSplit has defined a slice of work, but does not describe how to access it. The RecordReader class actually loads the data from its source & converts it into (key,value) pairs, suitable for reading by the Mapper. The Recordreader instance is defined by InputFormat. The default InputFormat is TextInputFormat, provides a LineRecordReader, which treats each line of the Input file as a new value. The RecordReader is invoke repeatedly on the input until the entire InputSplit has been consumed. Each invocation of the RecordReader leads to another call to to the map() method of the Mapper.

13) Mapper : Given a key and value the map() method emits (key,value) pairs which are forwarded to the Reducers. The map() method receives 2 parameters in addition to the key & the value.
       a) the OutputCollector object has a method named collect() which will forward a (key,value) pair to the reduce phase of the job.
       b) the Reporter object provides information about the current task; its getInputSplit() method will return an object describing the current InputSplit. The setStatus() method allows you to emit a status message back to the user. The incrCounter() method allows you to increment shared performance counters. Each Mapper can increment the counters & JobTracker will collect the increments made by different processes & aggregate them for later retrieval when the job ends.

14) Partition and Shuffle : the process of moving map outputs to the reducers is known as, 'Shuffling'. The Partitioner class determines which partition a given (key, value) pair will go to.

15) Sort : each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted to the Reducer.
16) Reducer : a Reducer instance is created for each reduce task. For each key in the partition assigned to a Reducer, the Reducer's reduce() method is called once. The Reducer also receives as parameters 'OutputCollector' and Reporter objects; they are used in the same manner as in the map() method.

17) OutputFormat : the (key,value) pairs provided to this OutputCollector are then written to output files. The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS. The output directory is set by the FileOutputFormat.setOutputPath() method. We can control which particular OutputFormat is used by calling the setOutputFormat() method of the JobConf object that defines your MapReduce job.

18) RecordWriter : these are used to write the individual records to the files as directed by OutputFormat.

19) Combiner : it runs after Mapper & before Reducer. Its usage is optional.

                       WordCount.java Program Description

a) Class TokenizerMapper : this class is having methods -
I) map (object key, Text value, Mapper.Content content)
It is called once per each key / value pair in the input split.
II) StringTokenizer class : it is used to split the string

b) Class IntSumReducer : it is having method -
reduce (Key key, Iterable <IntWritable> values, Reducer.Content content)
This method is called once for each key. Most applications will define their reduce class by overriding this method.

c) Class GenericOptionParser : It is a utility to parse command line arguments generic to the Hadoop framework. This class recognizes several standard command line arguments, enabling applications to specify a namenode, a jobtracker, additional configuration resources etc.

d) GenericOptionsParser (Configuration conf, String [] args) : this create a GenericOptionsParser to parse only the generic Hadoop arguments.
Method : getPemainingArgs () : It returns an array of strings containing only application specific arguments.

e) Job Class :
Methods :  
I) FileInputFormat.addInputPath (Job job, Path path) : Add a path to the list of inputs for the map-reduce job.
II) FileOutputFormat.setOutputPath ( Job job, Path outputDir) : set the path of the output directory for the map-reduce job.
III) setMapperClass () : sets the applications mapper class.
IV) setCombinerClass() : set the combiner class for the job.
V) setJarByClass() : set the Jars by finding where a given class came from.
VI) setReducerClass() : set the Reducer for the job.
VII) setOutputKeyClass () : set the key class for the job output data.
VIII) setOutputValueClass() : set the value class for the job outputs.
IX) waitForCompletion() : Submit the job to the cluster and wait for it to finish.

                           Simple Map Reduce Program

map ( string key, String value) :
// key : document name
// value : document contents
for each word x in value :
EmitIntermediate (x, “1”);

reduce (String key, Iterator values) :
// key : a word
// value : a list of counts
int result = 0;
for each v in values :
result t = ParseInt (v);
Emit (AsString (result));

i.e. the map() emits each word plus an associated count of occurrences.
The reduce() sums together all counts emitted for a particular word.
In addition we need to write code to fill a mapreduce specification object with the names of the input and output files, and optional tuning parameters. The user then invokes the “mapreduce()” passing it he specification object. The users code is linked together with MapReduce library.

Inverted Index : The map function parses each document and emits a sequence of (word, document Id) pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document Ids and emits a (word, list (document Id)) pair. The set of all output pairs forms a single inverted index.




                      Designing  HADOOP CLUSTER

                    It Needs 3 Linux Machines to make a cluster. (Even you can make it on 2 machines but 3 are better).
           The root of the distribution is referred to as HADOOP_HOME. All machines in the cluster must have the same HADOOP_HOME path. Export HADOOP_HOME in login script /etc/bashrc to make it persistent.
Hadoop configuration is driven by 2 configuration files in HADOOP_HOME/conf directory. The default configuration settings appear in the read-only 'hadoop-default.xml' file. Node specific configuration settings appear in the 'hadoop-site.xml'.
               Another important file is conf/slaves. On the JobTracker this file lists all the hosts on which the TaskTracker daemon has to be started. On NameNode it lists all the hosts on which the DataNode daemon has to be started. You must maintain this file manually, even if you are scaling uoto a large no. of nodes.

                  Finally conf/hadoop-env.sh contains configuration options such as JAVA_HOME, the location of logs & the directory where process Id's are stored. NameNode & JobTracker on 2 separate nodes & DataNode & TaskTracker on a third node. [ In case of 2 machines you can install NameNode & JobTracker on one machine and DataNode & TaskTracker on second machine].
The conf/slaves file on first 2 nodes contained the Ip address of the 3rd machine(NameNode). All 4 daemons used the same conf/hadoop-site.xml file. Specifically hadoop-site-namenode.xml

Setup PassPhrase SSH :

If you want to use hostnames to refer to the nodes, you must also edit the /etc/hosts file on each node to reflect the proper mapping between the hostnames & Ip addresses.

Hadoop Startup :
To start cluster you need to start both the HDFS & MapReduce.

1) First on NameNode :
Navigate to HADOOP_HOME
2) Format a new Distributed filesystem using,
$ hadoop namenode -format
3) Start HDFS by running command on NameNode,
$ start-all.sh
this script also consult conf/slaves file on NameNode and start the DataNode daemon on all the listed slaves.
4) Start MapReduce with following command on the designated JobTracker.
$ start-mapred.sh
this script also consult the conf/slaves file on the JobTracker & starts the TaskTracker daemon on all the listed slaves.
5) To cross check whether the cluster is running properly, you can look at the process running on each node, using jps, on NameNode you should see the processes Jps, NameNode and if you have only a 3 node cluster, SecondaryNameNode.
6) On JobTracker check for Jps and JobTracker.
7) On TaskTracker / DataNode you should see Jps, DataNode and TaskTracker.

Running MapReduce Jobs:
  
               Once you have Hadoop Cluster running, you can see it in action by executing one of the example MapReduce Java class files bundled in hadoop-0.20.2-examples.jar. As an example take Grep which extracts matching strings from text files and counts how many time they occurred. To begin create an input set for Grep. In this case input will be a set of files in the /conf directory.

# hadoop dfs -copyFromLocal conf input
# hadoop dfs -ls
# hadoop jar hadoop-0.20.2-examples.jar grep input grep-output 'dfs[a-z]+'
# hadoop dfs -get grep-output output
# cd output
# ls


 Some Important Hadoop Commands

# start-all.sh
this command starts all the parameters like jobtracker, namenode, task tracker, datanode etc. on Hadoop cluster

# hadoop dfs -put sample.txt input
this command inserts 'sample.txt' file into HDFS (/user/root/input folder).

# hadoop dfs -rm input/sample.txt
Used to delete sample.txt from HDFS.
To remove all files in input directory recursively give command :
# hadoop dfs -rmr input
To run java program on Hadoop give command :
# hadoop jar hadoop-0.20.2-examples.jar wordcount input/sample.txt /out
the output will be stored at '/user/root/out/part-00000' file or '/user/root/out/part-r-00000' file