summaryrefslogtreecommitdiff
path: root/hadoop/wordcount
diff options
context:
space:
mode:
Diffstat (limited to 'hadoop/wordcount')
-rw-r--r--hadoop/wordcount/README6
-rw-r--r--hadoop/wordcount/WordCount.java57
-rw-r--r--hadoop/wordcount/pipes/Makefile12
-rw-r--r--hadoop/wordcount/pipes/jobconf.xml16
-rwxr-xr-xhadoop/wordcount/pipes/run.sh12
-rw-r--r--hadoop/wordcount/pipes/wordcount.cc38
-rw-r--r--hadoop/wordcount/pipes/wordcount.hh34
-rwxr-xr-xhadoop/wordcount/run.sh5
8 files changed, 180 insertions, 0 deletions
diff --git a/hadoop/wordcount/README b/hadoop/wordcount/README
new file mode 100644
index 0000000..b13f384
--- /dev/null
+++ b/hadoop/wordcount/README
@@ -0,0 +1,6 @@
+source http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
+ javac -classpath /usr/lib/hadoop/hadoop-0.20.2-cdh3u1-core.jar -d wordcount_classes WordCount.java
+ mkdir wordcount_classes
+ jar -cvf wordcount.jar -C wordcount_classes/ .
+ hadoop jar wordcount.jar org.myorg.WordCount in/infile out/outfile
+
diff --git a/hadoop/wordcount/WordCount.java b/hadoop/wordcount/WordCount.java
new file mode 100644
index 0000000..8917aee
--- /dev/null
+++ b/hadoop/wordcount/WordCount.java
@@ -0,0 +1,57 @@
+package org.myorg;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+
+public class WordCount {
+
+ // Mapper
+ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
+ String line = value.toString(); // TextInputFormat
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ // Reducer
+ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
+ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ // Driver
+ public static void main(String[] args) throws Exception {
+ JobConf conf = new JobConf(WordCount.class);
+ conf.setJobName("wordcount");
+ conf.setNumReduceTasks(200);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+ conf.setMapperClass(Map.class);
+ conf.setCombinerClass(Reduce.class); // Combiner
+ conf.setReducerClass(Reduce.class);
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+ FileInputFormat.setInputPaths(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+ JobClient.runJob(conf);
+ }
+
+}
+
diff --git a/hadoop/wordcount/pipes/Makefile b/hadoop/wordcount/pipes/Makefile
new file mode 100644
index 0000000..c892bb8
--- /dev/null
+++ b/hadoop/wordcount/pipes/Makefile
@@ -0,0 +1,12 @@
+CC = g++
+HADOOP_INSTALL = ../hadoop-0.20.205.0
+PLATFORM = Linux-amd64-64
+CPPFLAGS = -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
+
+wordcount: wordcount.cc
+ $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
+ -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@
+
+wordcountc: wordcountc.cc
+ $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
+ -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@
diff --git a/hadoop/wordcount/pipes/jobconf.xml b/hadoop/wordcount/pipes/jobconf.xml
new file mode 100644
index 0000000..facdbce
--- /dev/null
+++ b/hadoop/wordcount/pipes/jobconf.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+<configuration>
+ <property>
+ <name>hadoop.pipes.executable</name>
+ <value>path/to/dp_hadoop_pipes_test</value>
+ </property>
+ <property>
+ <name>hadoop.pipes.java.recordreader</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hadoop.pipes.java.recordwriter</name>
+ <value>true</value>
+ </property>
+</configuration>
+
diff --git a/hadoop/wordcount/pipes/run.sh b/hadoop/wordcount/pipes/run.sh
new file mode 100755
index 0000000..0b02be7
--- /dev/null
+++ b/hadoop/wordcount/pipes/run.sh
@@ -0,0 +1,12 @@
+#!/bin/sh
+
+HADOOP=/usr/lib/hadoop/
+
+$HADOOP/bin/hadoop pipes \
+ -D hadoop.pipes.java.recordreader=true \
+ -D hadoop.pipes.java.recordwriter=true \
+ -D mapred.reduce.tasks=30 \
+ -input in/bible.txt \
+ -output out/bible-wc \
+ -program ./wordcount
+
diff --git a/hadoop/wordcount/pipes/wordcount.cc b/hadoop/wordcount/pipes/wordcount.cc
new file mode 100644
index 0000000..c9394d5
--- /dev/null
+++ b/hadoop/wordcount/pipes/wordcount.cc
@@ -0,0 +1,38 @@
+#include "wordcount.hh"
+
+
+void
+WordcountMapper::map(HadoopPipes::MapContext &context)
+{
+ typedef boost::tokenizer<> tokenizer_t;
+ tokenizer_t tokenizer(context.getInputValue());
+
+ for(tokenizer_t::const_iterator i = tokenizer.begin();
+ tokenizer.end() != i; ++i) {
+ context.emit(boost::to_lower_copy(*i), "1");
+ }
+}
+
+void
+WordcountReducer::reduce(HadoopPipes::ReduceContext &context)
+{
+ uint32_t count(0);
+
+ do {
+ ++count;
+ } while(context.nextValue());
+
+ //std::cout << context.getInputKey() << endl;
+ context.emit(context.getInputKey(),
+ boost::lexical_cast<std::string>(count));
+}
+
+
+int
+main(int argc, char *argv[])
+{
+ HadoopPipes::TemplateFactory2<WordcountMapper,
+ WordcountReducer> factory;
+ return HadoopPipes::runTask(factory);
+}
+
diff --git a/hadoop/wordcount/pipes/wordcount.hh b/hadoop/wordcount/pipes/wordcount.hh
new file mode 100644
index 0000000..629acf6
--- /dev/null
+++ b/hadoop/wordcount/pipes/wordcount.hh
@@ -0,0 +1,34 @@
+#ifndef __WORDCOUNT_HH__
+#define __WORDCOUNT_HH__
+
+
+#include <iostream>
+#include <string>
+
+#include "hadoop/Pipes.hh"
+#include "hadoop/TemplateFactory.hh"
+
+#include <boost/algorithm/string.hpp>
+#include <boost/tokenizer.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace std;
+
+
+class WordcountMapper : public HadoopPipes::Mapper
+{
+ public:
+ WordcountMapper(const HadoopPipes::TaskContext &) {};
+ void map(HadoopPipes::MapContext &context);
+};
+
+class WordcountReducer : public HadoopPipes::Reducer
+{
+ public:
+ WordcountReducer(const HadoopPipes::TaskContext &) {};
+ void reduce(HadoopPipes::ReduceContext & context);
+};
+
+
+#endif
+
diff --git a/hadoop/wordcount/run.sh b/hadoop/wordcount/run.sh
new file mode 100755
index 0000000..2906bfb
--- /dev/null
+++ b/hadoop/wordcount/run.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+
+hadoop jar wordcount.jar org.myorg.WordCount in/marec.lc.en out/marec.lc.en-wc
+