Thursday, November 10, 2011

Hadoop Simple Indexer source code with Map reduce

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */

package lineindexer;

/**
 *
 * @author masteruser
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Main extends Configured implements Tool{



  public static class LineIndexMapper extends MapReduceBase
      implements Mapper<LongWritable, Text, Text, Text> {

    private final static Text word = new Text();
    private final static Text location = new Text();

    public void map(LongWritable key, Text val,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
      String fileName = fileSplit.getPath().getName();
      location.set(fileName);

      String line = val.toString();
      StringTokenizer itr = new StringTokenizer(line.toLowerCase());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, location);
      }
    }
  }



  public static class LineIndexReducer extends MapReduceBase
      implements Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

      boolean first = true;
      StringBuilder toReturn = new StringBuilder();
      while (values.hasNext()){
        if (!first)
          toReturn.append(", ");
        first=false;
        toReturn.append(values.next().toString());
      }

      output.collect(key, new Text(toReturn.toString()));
    }
  }


  /**
   * The actual main() method for our program; this is the
   * "driver" for the MapReduce job.
   */


  public int run(String[] args) throws Exception {
         JobConf conf = new JobConf(getConf(), Main.class);
         conf.setJobName("LineIndexer");

          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(Text.class);

          conf.setMapperClass(LineIndexMapper.class);
             conf.setReducerClass(LineIndexReducer.class);

          conf.setInputFormat(TextInputFormat.class);
             conf.setOutputFormat(TextOutputFormat.class);

     
         int i=args.length;
           
         

          FileInputFormat.setInputPaths(conf, new Path(args[i-2]));
          FileOutputFormat.setOutputPath(conf, new Path(args[i-1]));

          JobClient.runJob(conf);
          return 0;
       }


  public static void main(String[] args) throws Exception {
  

          int res = ToolRunner.run(new Configuration(), new Main(), args);
          System.exit(res);
    
  }
}

No comments:

Post a Comment

How to Generate and use the ssh key on Gerrit, github.io, gitlab, and bitbucket.

 Details can be found here -