diff options
Diffstat (limited to 'hadoop/wordcount')
| -rw-r--r-- | hadoop/wordcount/README | 6 | ||||
| -rw-r--r-- | hadoop/wordcount/WordCount.java | 57 | ||||
| -rw-r--r-- | hadoop/wordcount/pipes/Makefile | 12 | ||||
| -rw-r--r-- | hadoop/wordcount/pipes/jobconf.xml | 16 | ||||
| -rwxr-xr-x | hadoop/wordcount/pipes/run.sh | 12 | ||||
| -rw-r--r-- | hadoop/wordcount/pipes/wordcount.cc | 38 | ||||
| -rw-r--r-- | hadoop/wordcount/pipes/wordcount.hh | 34 | ||||
| -rwxr-xr-x | hadoop/wordcount/run.sh | 5 | 
8 files changed, 180 insertions, 0 deletions
| diff --git a/hadoop/wordcount/README b/hadoop/wordcount/README new file mode 100644 index 0000000..b13f384 --- /dev/null +++ b/hadoop/wordcount/README @@ -0,0 +1,6 @@ +source http://hadoop.apache.org/common/docs/current/mapred_tutorial.html + javac -classpath /usr/lib/hadoop/hadoop-0.20.2-cdh3u1-core.jar -d wordcount_classes WordCount.java + mkdir wordcount_classes + jar -cvf wordcount.jar -C wordcount_classes/ . + hadoop jar wordcount.jar org.myorg.WordCount in/infile out/outfile + diff --git a/hadoop/wordcount/WordCount.java b/hadoop/wordcount/WordCount.java new file mode 100644 index 0000000..8917aee --- /dev/null +++ b/hadoop/wordcount/WordCount.java @@ -0,0 +1,57 @@ +package org.myorg; + +import java.io.IOException; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; + +public class WordCount { + +   // Mapper +   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { +     private final static IntWritable one = new IntWritable(1); +     private Text word = new Text(); +     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { +       String line = value.toString(); // TextInputFormat +       StringTokenizer tokenizer = new StringTokenizer(line); +       while (tokenizer.hasMoreTokens()) { +         word.set(tokenizer.nextToken()); +         output.collect(word, one); +       } +     } +   } + +   // Reducer +   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { +     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { +       int sum = 0; +       while (values.hasNext()) { +         sum += values.next().get(); +       } +       output.collect(key, new IntWritable(sum)); +     } +   } + +   // Driver +   public static void main(String[] args) throws Exception { +     JobConf conf = new JobConf(WordCount.class); +     conf.setJobName("wordcount"); +     conf.setNumReduceTasks(200); +     conf.setOutputKeyClass(Text.class); +     conf.setOutputValueClass(IntWritable.class); +     conf.setMapperClass(Map.class); +     conf.setCombinerClass(Reduce.class); // Combiner +     conf.setReducerClass(Reduce.class); +     conf.setInputFormat(TextInputFormat.class); +     conf.setOutputFormat(TextOutputFormat.class); +     FileInputFormat.setInputPaths(conf, new Path(args[0])); +     FileOutputFormat.setOutputPath(conf, new Path(args[1])); +     JobClient.runJob(conf); +   } + +} + diff --git a/hadoop/wordcount/pipes/Makefile b/hadoop/wordcount/pipes/Makefile new file mode 100644 index 0000000..c892bb8 --- /dev/null +++ b/hadoop/wordcount/pipes/Makefile @@ -0,0 +1,12 @@ +CC = g++ +HADOOP_INSTALL = ../hadoop-0.20.205.0 +PLATFORM = Linux-amd64-64 +CPPFLAGS = -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include + +wordcount: wordcount.cc +	$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ +	-lhadooputils -lcrypto -lpthread -g -O2 -static -o $@ + +wordcountc: wordcountc.cc +	$(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ +	-lhadooputils -lcrypto -lpthread -g -O2 -static -o $@ diff --git a/hadoop/wordcount/pipes/jobconf.xml b/hadoop/wordcount/pipes/jobconf.xml new file mode 100644 index 0000000..facdbce --- /dev/null +++ b/hadoop/wordcount/pipes/jobconf.xml @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<configuration> +  <property> +    <name>hadoop.pipes.executable</name> +    <value>path/to/dp_hadoop_pipes_test</value> +  </property> +  <property> +    <name>hadoop.pipes.java.recordreader</name> +    <value>true</value> +  </property> +  <property> +    <name>hadoop.pipes.java.recordwriter</name> +    <value>true</value> +  </property> +</configuration> + diff --git a/hadoop/wordcount/pipes/run.sh b/hadoop/wordcount/pipes/run.sh new file mode 100755 index 0000000..0b02be7 --- /dev/null +++ b/hadoop/wordcount/pipes/run.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +HADOOP=/usr/lib/hadoop/ + +$HADOOP/bin/hadoop pipes \ +                   -D hadoop.pipes.java.recordreader=true  \ +                   -D hadoop.pipes.java.recordwriter=true \ +                   -D mapred.reduce.tasks=30 \ +                   -input in/bible.txt \ +                   -output out/bible-wc  \ +                   -program ./wordcount + diff --git a/hadoop/wordcount/pipes/wordcount.cc b/hadoop/wordcount/pipes/wordcount.cc new file mode 100644 index 0000000..c9394d5 --- /dev/null +++ b/hadoop/wordcount/pipes/wordcount.cc @@ -0,0 +1,38 @@ +#include "wordcount.hh" + + +void +WordcountMapper::map(HadoopPipes::MapContext &context) +{ +  typedef boost::tokenizer<> tokenizer_t; +  tokenizer_t tokenizer(context.getInputValue()); + +  for(tokenizer_t::const_iterator i = tokenizer.begin(); +      tokenizer.end() != i; ++i) { +    context.emit(boost::to_lower_copy(*i), "1"); +  } +} + +void +WordcountReducer::reduce(HadoopPipes::ReduceContext &context) +{ +  uint32_t count(0); + +  do { +	  ++count; +  } while(context.nextValue()); + +  //std::cout << context.getInputKey() << endl; +  context.emit(context.getInputKey(), +               boost::lexical_cast<std::string>(count)); +} + + +int +main(int argc, char *argv[]) +{ +  HadoopPipes::TemplateFactory2<WordcountMapper, +                                WordcountReducer> factory; +  return HadoopPipes::runTask(factory); +} + diff --git a/hadoop/wordcount/pipes/wordcount.hh b/hadoop/wordcount/pipes/wordcount.hh new file mode 100644 index 0000000..629acf6 --- /dev/null +++ b/hadoop/wordcount/pipes/wordcount.hh @@ -0,0 +1,34 @@ +#ifndef __WORDCOUNT_HH__ +#define __WORDCOUNT_HH__ + + +#include <iostream> +#include <string> + +#include "hadoop/Pipes.hh" +#include "hadoop/TemplateFactory.hh" + +#include <boost/algorithm/string.hpp> +#include <boost/tokenizer.hpp> +#include <boost/lexical_cast.hpp> + +using namespace std; + + +class WordcountMapper : public HadoopPipes::Mapper +{ +  public: +    WordcountMapper(const HadoopPipes::TaskContext &) {}; +    void map(HadoopPipes::MapContext &context); +}; + +class WordcountReducer : public HadoopPipes::Reducer +{ +  public: +    WordcountReducer(const HadoopPipes::TaskContext &) {}; +    void reduce(HadoopPipes::ReduceContext & context); +}; + + +#endif + diff --git a/hadoop/wordcount/run.sh b/hadoop/wordcount/run.sh new file mode 100755 index 0000000..2906bfb --- /dev/null +++ b/hadoop/wordcount/run.sh @@ -0,0 +1,5 @@ +#!/bin/sh + + +hadoop jar wordcount.jar org.myorg.WordCount in/marec.lc.en out/marec.lc.en-wc + | 
