summaryrefslogtreecommitdiff
path: root/hadoop
diff options
context:
space:
mode:
Diffstat (limited to 'hadoop')
-rw-r--r--hadoop/cacheArchive/input8
-rw-r--r--hadoop/cacheArchive/mapper.py15
-rw-r--r--hadoop/cacheArchive/my_module.zipbin0 -> 524 bytes
-rw-r--r--hadoop/cacheArchive/my_module/__init__.py0
-rw-r--r--hadoop/cacheArchive/my_module/mod_a.py3
-rw-r--r--hadoop/cacheArchive/my_module/mod_b.py3
-rw-r--r--hadoop/cacheArchive/other_module.zipbin0 -> 347 bytes
-rw-r--r--hadoop/cacheArchive/other_module/__init__.py0
-rw-r--r--hadoop/cacheArchive/other_module/other.py3
-rw-r--r--hadoop/cacheArchive/output/part-0000012
-rw-r--r--hadoop/cacheArchive/output/part-0000112
-rwxr-xr-xhadoop/cacheArchive/streaming.sh28
-rw-r--r--hadoop/streaming/mapper/input10
-rw-r--r--hadoop/streaming/mapper/mapper_test.py9
-rwxr-xr-xhadoop/streaming/mapper/mapper_test.sh23
-rw-r--r--hadoop/streaming/mapper/mapper_test1.py9
-rwxr-xr-xhadoop/streaming/mapper/mapper_test1.sh24
-rw-r--r--hadoop/streaming/no_reducer/input8
-rwxr-xr-xhadoop/streaming/no_reducer/no_reducer.rb9
-rwxr-xr-xhadoop/streaming/no_reducer/no_reducer.sh23
-rw-r--r--hadoop/streaming/partitioner/input8
-rwxr-xr-xhadoop/streaming/partitioner/partitioner_test.sh22
-rwxr-xr-xhadoop/streaming/partitioner/partitioner_test1.sh27
-rw-r--r--hadoop/streaming/secondary_sort/input8
-rwxr-xr-xhadoop/streaming/secondary_sort/secondary_sort.sh30
-rw-r--r--hadoop/streaming/test/input10
-rwxr-xr-xhadoop/streaming/test/test.sh23
-rw-r--r--hadoop/wordcount/README6
-rw-r--r--hadoop/wordcount/WordCount.java57
-rw-r--r--hadoop/wordcount/pipes/Makefile12
-rw-r--r--hadoop/wordcount/pipes/jobconf.xml16
-rwxr-xr-xhadoop/wordcount/pipes/run.sh12
-rw-r--r--hadoop/wordcount/pipes/wordcount.cc38
-rw-r--r--hadoop/wordcount/pipes/wordcount.hh34
-rwxr-xr-xhadoop/wordcount/run.sh5
35 files changed, 507 insertions, 0 deletions
diff --git a/hadoop/cacheArchive/input b/hadoop/cacheArchive/input
new file mode 100644
index 0000000..71ac1b5
--- /dev/null
+++ b/hadoop/cacheArchive/input
@@ -0,0 +1,8 @@
+a
+b
+c
+d
+e
+f
+g
+h
diff --git a/hadoop/cacheArchive/mapper.py b/hadoop/cacheArchive/mapper.py
new file mode 100644
index 0000000..a7dd9f4
--- /dev/null
+++ b/hadoop/cacheArchive/mapper.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python2
+
+import sys
+
+sys.path.append('.')
+import my_module, other_module
+from my_module import mod_a
+from my_module import mod_b
+from other_module import other
+
+for line in sys.stdin:
+ mod_a.bla(line)
+ mod_b.blubb(line)
+ other.foo(line)
+
diff --git a/hadoop/cacheArchive/my_module.zip b/hadoop/cacheArchive/my_module.zip
new file mode 100644
index 0000000..65d960b
--- /dev/null
+++ b/hadoop/cacheArchive/my_module.zip
Binary files differ
diff --git a/hadoop/cacheArchive/my_module/__init__.py b/hadoop/cacheArchive/my_module/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/hadoop/cacheArchive/my_module/__init__.py
diff --git a/hadoop/cacheArchive/my_module/mod_a.py b/hadoop/cacheArchive/my_module/mod_a.py
new file mode 100644
index 0000000..ab65e6e
--- /dev/null
+++ b/hadoop/cacheArchive/my_module/mod_a.py
@@ -0,0 +1,3 @@
+def bla(a):
+ print a.strip()
+
diff --git a/hadoop/cacheArchive/my_module/mod_b.py b/hadoop/cacheArchive/my_module/mod_b.py
new file mode 100644
index 0000000..d684220
--- /dev/null
+++ b/hadoop/cacheArchive/my_module/mod_b.py
@@ -0,0 +1,3 @@
+def blubb(a):
+ print a.strip()
+
diff --git a/hadoop/cacheArchive/other_module.zip b/hadoop/cacheArchive/other_module.zip
new file mode 100644
index 0000000..af99f7d
--- /dev/null
+++ b/hadoop/cacheArchive/other_module.zip
Binary files differ
diff --git a/hadoop/cacheArchive/other_module/__init__.py b/hadoop/cacheArchive/other_module/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/hadoop/cacheArchive/other_module/__init__.py
diff --git a/hadoop/cacheArchive/other_module/other.py b/hadoop/cacheArchive/other_module/other.py
new file mode 100644
index 0000000..fa55d0b
--- /dev/null
+++ b/hadoop/cacheArchive/other_module/other.py
@@ -0,0 +1,3 @@
+def foo(a):
+ print a.strip()
+
diff --git a/hadoop/cacheArchive/output/part-00000 b/hadoop/cacheArchive/output/part-00000
new file mode 100644
index 0000000..89955ae
--- /dev/null
+++ b/hadoop/cacheArchive/output/part-00000
@@ -0,0 +1,12 @@
+a
+a
+a
+b
+b
+b
+c
+c
+c
+d
+d
+d
diff --git a/hadoop/cacheArchive/output/part-00001 b/hadoop/cacheArchive/output/part-00001
new file mode 100644
index 0000000..04576d4
--- /dev/null
+++ b/hadoop/cacheArchive/output/part-00001
@@ -0,0 +1,12 @@
+e
+e
+e
+f
+f
+f
+g
+g
+g
+h
+h
+h
diff --git a/hadoop/cacheArchive/streaming.sh b/hadoop/cacheArchive/streaming.sh
new file mode 100755
index 0000000..6bc9cda
--- /dev/null
+++ b/hadoop/cacheArchive/streaming.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+hadoop dfs -put input input
+hadoop dfs -put my_module.zip my_module.zip
+hadoop dfs -put other_module.zip other_module.zip
+
+IN=input
+OUT=output
+
+$HSTREAMING \
+ -input $IN\
+ -output $OUT \
+ -mapper "python mapper.py" \
+ -reducer "NONE" \
+ -file mapper.py\
+ -cacheArchive 'hdfs:///user/simianer/my_module.zip#my_module' \
+ -cacheArchive 'hdfs:///user/simianer/other_module.zip#other_module' \
+ -jobconf mapred.reduce.tasks=30 #hier mal 30 statt 3
+
+hadoop dfs -get $OUT .
+hadoop dfs -rm $IN
+hadoop dfs -rmr $OUT
+
diff --git a/hadoop/streaming/mapper/input b/hadoop/streaming/mapper/input
new file mode 100644
index 0000000..338fd87
--- /dev/null
+++ b/hadoop/streaming/mapper/input
@@ -0,0 +1,10 @@
+0 1 a c
+0 2 b c
+0 3 c c
+1 4 a c
+1 5 b c
+1 6 c c
+1 7 d c
+2 8 a c
+2 9 b c
+2 10 c c
diff --git a/hadoop/streaming/mapper/mapper_test.py b/hadoop/streaming/mapper/mapper_test.py
new file mode 100644
index 0000000..d358bda
--- /dev/null
+++ b/hadoop/streaming/mapper/mapper_test.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python2
+
+import sys
+
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ print line.upper()
+
diff --git a/hadoop/streaming/mapper/mapper_test.sh b/hadoop/streaming/mapper/mapper_test.sh
new file mode 100755
index 0000000..f0c5da3
--- /dev/null
+++ b/hadoop/streaming/mapper/mapper_test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=mapper_test_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper "python mapper.py" \
+ -file mapper_test.py \
+ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
+ -jobconf mapred.reduce.tasks=3
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/mapper/mapper_test1.py b/hadoop/streaming/mapper/mapper_test1.py
new file mode 100644
index 0000000..79c8aa6
--- /dev/null
+++ b/hadoop/streaming/mapper/mapper_test1.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python2
+
+import sys
+
+
+if __name__ == "__main__":
+ for line in sys.stdin:
+ print "MYKEY\t%s"%line.upper()
+
diff --git a/hadoop/streaming/mapper/mapper_test1.sh b/hadoop/streaming/mapper/mapper_test1.sh
new file mode 100755
index 0000000..475699f
--- /dev/null
+++ b/hadoop/streaming/mapper/mapper_test1.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=mapper_test1_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper "python mapper1.py" \
+ -file mapper_test1.py \
+ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
+ -jobconf mapred.reduce.tasks=3
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/no_reducer/input b/hadoop/streaming/no_reducer/input
new file mode 100644
index 0000000..71ac1b5
--- /dev/null
+++ b/hadoop/streaming/no_reducer/input
@@ -0,0 +1,8 @@
+a
+b
+c
+d
+e
+f
+g
+h
diff --git a/hadoop/streaming/no_reducer/no_reducer.rb b/hadoop/streaming/no_reducer/no_reducer.rb
new file mode 100755
index 0000000..4410b93
--- /dev/null
+++ b/hadoop/streaming/no_reducer/no_reducer.rb
@@ -0,0 +1,9 @@
+#!/usr/bin/env ruby
+
+
+i = 0
+while l = STDIN.gets
+ puts "line #{i} (#{l.strip})"
+ i+=1
+end
+
diff --git a/hadoop/streaming/no_reducer/no_reducer.sh b/hadoop/streaming/no_reducer/no_reducer.sh
new file mode 100755
index 0000000..c32bfdd
--- /dev/null
+++ b/hadoop/streaming/no_reducer/no_reducer.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=no_reducer_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper "no_reducer.rb" \
+ -file "no_reducer.rb" \
+ -reducer NONE
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/partitioner/input b/hadoop/streaming/partitioner/input
new file mode 100644
index 0000000..06c6a28
--- /dev/null
+++ b/hadoop/streaming/partitioner/input
@@ -0,0 +1,8 @@
+1.1 a
+2.2 b
+3.1 c
+4.2 d
+4.1 e
+1.2 x
+2.1 y
+4.3 q
diff --git a/hadoop/streaming/partitioner/partitioner_test.sh b/hadoop/streaming/partitioner/partitioner_test.sh
new file mode 100755
index 0000000..bfb2185
--- /dev/null
+++ b/hadoop/streaming/partitioner/partitioner_test.sh
@@ -0,0 +1,22 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=partitioner_test_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper /bin/cat \
+ -jobconf mapred.reduce.tasks=2
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/partitioner/partitioner_test1.sh b/hadoop/streaming/partitioner/partitioner_test1.sh
new file mode 100755
index 0000000..759f823
--- /dev/null
+++ b/hadoop/streaming/partitioner/partitioner_test1.sh
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=partitioner_test1_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper /bin/cat \
+ -jobconf mapred.reduce.tasks=2 \
+ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
+ -jobconf stream.map.output.field.separator="\t" \
+ -jobconf map.output.key.field.separator=. \
+ -jobconf stream.num.map.output.key.fields=2 \
+ -jobconf mapred.text.key.partitioner.options=-k1
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/secondary_sort/input b/hadoop/streaming/secondary_sort/input
new file mode 100644
index 0000000..5aa7ec9
--- /dev/null
+++ b/hadoop/streaming/secondary_sort/input
@@ -0,0 +1,8 @@
+0-*-1 a:1 a:2 a:3
+0-*-2 a:1 a:2 a:3
+0-*-10 a:1 a:2 a:3
+1-*-2 a:1 a:2 a:3
+2-*-0 a:1 a:2 a:3
+2-*-2 a:1 a:2 a:3
+3-*-3 a:1 a:2 a:3
+10-*-0 a:1 a:2 a:3
diff --git a/hadoop/streaming/secondary_sort/secondary_sort.sh b/hadoop/streaming/secondary_sort/secondary_sort.sh
new file mode 100755
index 0000000..c45f381
--- /dev/null
+++ b/hadoop/streaming/secondary_sort/secondary_sort.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=secondary_sort_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
+ -jobconf map.output.key.field.separator="-*-" \
+ -jobconf mapred.text.key.partitioner.options="-k1,1n" \
+ -mapper /bin/cat \
+ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
+ -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
+ -jobconf stream.num.map.output.key.fields=2 \
+ -jobconf stream.map.output.field.separator="\t" \
+ -jobconf mapred.text.key.comparator.options="-k1,1n -k2,2nr" \
+ -jobconf mapred.reduce.tasks=3
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/streaming/test/input b/hadoop/streaming/test/input
new file mode 100644
index 0000000..338fd87
--- /dev/null
+++ b/hadoop/streaming/test/input
@@ -0,0 +1,10 @@
+0 1 a c
+0 2 b c
+0 3 c c
+1 4 a c
+1 5 b c
+1 6 c c
+1 7 d c
+2 8 a c
+2 9 b c
+2 10 c c
diff --git a/hadoop/streaming/test/test.sh b/hadoop/streaming/test/test.sh
new file mode 100755
index 0000000..2f09bac
--- /dev/null
+++ b/hadoop/streaming/test/test.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+
+
+hadoop dfs -put input input
+
+HADOOP_HOME=/usr/lib/hadoop
+HADOOP_VERSION=0.20.2-cdh3u1
+JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+OUT=test_out
+
+$HSTREAMING \
+ -input input \
+ -output $OUT \
+ -mapper /bin/cat \
+ -jobconf "mapred.reduce.tasks=3" \
+ -reducer org.apache.hadoop.mapred.lib.IdentityReducer
+
+hadoop dfs -get $OUT .
+hadoop dfs -rmr $OUT
+hadoop dfs -rm input
+
diff --git a/hadoop/wordcount/README b/hadoop/wordcount/README
new file mode 100644
index 0000000..b13f384
--- /dev/null
+++ b/hadoop/wordcount/README
@@ -0,0 +1,6 @@
+source http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
+ javac -classpath /usr/lib/hadoop/hadoop-0.20.2-cdh3u1-core.jar -d wordcount_classes WordCount.java
+ mkdir wordcount_classes
+ jar -cvf wordcount.jar -C wordcount_classes/ .
+ hadoop jar wordcount.jar org.myorg.WordCount in/infile out/outfile
+
diff --git a/hadoop/wordcount/WordCount.java b/hadoop/wordcount/WordCount.java
new file mode 100644
index 0000000..8917aee
--- /dev/null
+++ b/hadoop/wordcount/WordCount.java
@@ -0,0 +1,57 @@
+package org.myorg;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+
+public class WordCount {
+
+ // Mapper
+ public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+ public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
+ String line = value.toString(); // TextInputFormat
+ StringTokenizer tokenizer = new StringTokenizer(line);
+ while (tokenizer.hasMoreTokens()) {
+ word.set(tokenizer.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ // Reducer
+ public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
+ public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ // Driver
+ public static void main(String[] args) throws Exception {
+ JobConf conf = new JobConf(WordCount.class);
+ conf.setJobName("wordcount");
+ conf.setNumReduceTasks(200);
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+ conf.setMapperClass(Map.class);
+ conf.setCombinerClass(Reduce.class); // Combiner
+ conf.setReducerClass(Reduce.class);
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+ FileInputFormat.setInputPaths(conf, new Path(args[0]));
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+ JobClient.runJob(conf);
+ }
+
+}
+
diff --git a/hadoop/wordcount/pipes/Makefile b/hadoop/wordcount/pipes/Makefile
new file mode 100644
index 0000000..c892bb8
--- /dev/null
+++ b/hadoop/wordcount/pipes/Makefile
@@ -0,0 +1,12 @@
+CC = g++
+HADOOP_INSTALL = ../hadoop-0.20.205.0
+PLATFORM = Linux-amd64-64
+CPPFLAGS = -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include
+
+wordcount: wordcount.cc
+ $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
+ -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@
+
+wordcountc: wordcountc.cc
+ $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \
+ -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@
diff --git a/hadoop/wordcount/pipes/jobconf.xml b/hadoop/wordcount/pipes/jobconf.xml
new file mode 100644
index 0000000..facdbce
--- /dev/null
+++ b/hadoop/wordcount/pipes/jobconf.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+<configuration>
+ <property>
+ <name>hadoop.pipes.executable</name>
+ <value>path/to/dp_hadoop_pipes_test</value>
+ </property>
+ <property>
+ <name>hadoop.pipes.java.recordreader</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>hadoop.pipes.java.recordwriter</name>
+ <value>true</value>
+ </property>
+</configuration>
+
diff --git a/hadoop/wordcount/pipes/run.sh b/hadoop/wordcount/pipes/run.sh
new file mode 100755
index 0000000..0b02be7
--- /dev/null
+++ b/hadoop/wordcount/pipes/run.sh
@@ -0,0 +1,12 @@
+#!/bin/sh
+
+HADOOP=/usr/lib/hadoop/
+
+$HADOOP/bin/hadoop pipes \
+ -D hadoop.pipes.java.recordreader=true \
+ -D hadoop.pipes.java.recordwriter=true \
+ -D mapred.reduce.tasks=30 \
+ -input in/bible.txt \
+ -output out/bible-wc \
+ -program ./wordcount
+
diff --git a/hadoop/wordcount/pipes/wordcount.cc b/hadoop/wordcount/pipes/wordcount.cc
new file mode 100644
index 0000000..c9394d5
--- /dev/null
+++ b/hadoop/wordcount/pipes/wordcount.cc
@@ -0,0 +1,38 @@
+#include "wordcount.hh"
+
+
+void
+WordcountMapper::map(HadoopPipes::MapContext &context)
+{
+ typedef boost::tokenizer<> tokenizer_t;
+ tokenizer_t tokenizer(context.getInputValue());
+
+ for(tokenizer_t::const_iterator i = tokenizer.begin();
+ tokenizer.end() != i; ++i) {
+ context.emit(boost::to_lower_copy(*i), "1");
+ }
+}
+
+void
+WordcountReducer::reduce(HadoopPipes::ReduceContext &context)
+{
+ uint32_t count(0);
+
+ do {
+ ++count;
+ } while(context.nextValue());
+
+ //std::cout << context.getInputKey() << endl;
+ context.emit(context.getInputKey(),
+ boost::lexical_cast<std::string>(count));
+}
+
+
+int
+main(int argc, char *argv[])
+{
+ HadoopPipes::TemplateFactory2<WordcountMapper,
+ WordcountReducer> factory;
+ return HadoopPipes::runTask(factory);
+}
+
diff --git a/hadoop/wordcount/pipes/wordcount.hh b/hadoop/wordcount/pipes/wordcount.hh
new file mode 100644
index 0000000..629acf6
--- /dev/null
+++ b/hadoop/wordcount/pipes/wordcount.hh
@@ -0,0 +1,34 @@
+#ifndef __WORDCOUNT_HH__
+#define __WORDCOUNT_HH__
+
+
+#include <iostream>
+#include <string>
+
+#include "hadoop/Pipes.hh"
+#include "hadoop/TemplateFactory.hh"
+
+#include <boost/algorithm/string.hpp>
+#include <boost/tokenizer.hpp>
+#include <boost/lexical_cast.hpp>
+
+using namespace std;
+
+
+class WordcountMapper : public HadoopPipes::Mapper
+{
+ public:
+ WordcountMapper(const HadoopPipes::TaskContext &) {};
+ void map(HadoopPipes::MapContext &context);
+};
+
+class WordcountReducer : public HadoopPipes::Reducer
+{
+ public:
+ WordcountReducer(const HadoopPipes::TaskContext &) {};
+ void reduce(HadoopPipes::ReduceContext & context);
+};
+
+
+#endif
+
diff --git a/hadoop/wordcount/run.sh b/hadoop/wordcount/run.sh
new file mode 100755
index 0000000..2906bfb
--- /dev/null
+++ b/hadoop/wordcount/run.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+
+hadoop jar wordcount.jar org.myorg.WordCount in/marec.lc.en out/marec.lc.en-wc
+