diff options
Diffstat (limited to 'hadoop')
35 files changed, 507 insertions, 0 deletions
diff --git a/hadoop/cacheArchive/input b/hadoop/cacheArchive/input new file mode 100644 index 0000000..71ac1b5 --- /dev/null +++ b/hadoop/cacheArchive/input @@ -0,0 +1,8 @@ +a +b +c +d +e +f +g +h diff --git a/hadoop/cacheArchive/mapper.py b/hadoop/cacheArchive/mapper.py new file mode 100644 index 0000000..a7dd9f4 --- /dev/null +++ b/hadoop/cacheArchive/mapper.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python2 + +import sys + +sys.path.append('.') +import my_module, other_module +from my_module import mod_a +from my_module import mod_b +from other_module import other + +for line in sys.stdin: + mod_a.bla(line) + mod_b.blubb(line) + other.foo(line) + diff --git a/hadoop/cacheArchive/my_module.zip b/hadoop/cacheArchive/my_module.zip Binary files differnew file mode 100644 index 0000000..65d960b --- /dev/null +++ b/hadoop/cacheArchive/my_module.zip diff --git a/hadoop/cacheArchive/my_module/__init__.py b/hadoop/cacheArchive/my_module/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/hadoop/cacheArchive/my_module/__init__.py diff --git a/hadoop/cacheArchive/my_module/mod_a.py b/hadoop/cacheArchive/my_module/mod_a.py new file mode 100644 index 0000000..ab65e6e --- /dev/null +++ b/hadoop/cacheArchive/my_module/mod_a.py @@ -0,0 +1,3 @@ +def bla(a): + print a.strip() + diff --git a/hadoop/cacheArchive/my_module/mod_b.py b/hadoop/cacheArchive/my_module/mod_b.py new file mode 100644 index 0000000..d684220 --- /dev/null +++ b/hadoop/cacheArchive/my_module/mod_b.py @@ -0,0 +1,3 @@ +def blubb(a): + print a.strip() + diff --git a/hadoop/cacheArchive/other_module.zip b/hadoop/cacheArchive/other_module.zip Binary files differnew file mode 100644 index 0000000..af99f7d --- /dev/null +++ b/hadoop/cacheArchive/other_module.zip diff --git a/hadoop/cacheArchive/other_module/__init__.py b/hadoop/cacheArchive/other_module/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/hadoop/cacheArchive/other_module/__init__.py diff --git a/hadoop/cacheArchive/other_module/other.py b/hadoop/cacheArchive/other_module/other.py new file mode 100644 index 0000000..fa55d0b --- /dev/null +++ b/hadoop/cacheArchive/other_module/other.py @@ -0,0 +1,3 @@ +def foo(a): + print a.strip() + diff --git a/hadoop/cacheArchive/output/part-00000 b/hadoop/cacheArchive/output/part-00000 new file mode 100644 index 0000000..89955ae --- /dev/null +++ b/hadoop/cacheArchive/output/part-00000 @@ -0,0 +1,12 @@ +a +a +a +b +b +b +c +c +c +d +d +d diff --git a/hadoop/cacheArchive/output/part-00001 b/hadoop/cacheArchive/output/part-00001 new file mode 100644 index 0000000..04576d4 --- /dev/null +++ b/hadoop/cacheArchive/output/part-00001 @@ -0,0 +1,12 @@ +e +e +e +f +f +f +g +g +g +h +h +h diff --git a/hadoop/cacheArchive/streaming.sh b/hadoop/cacheArchive/streaming.sh new file mode 100755 index 0000000..6bc9cda --- /dev/null +++ b/hadoop/cacheArchive/streaming.sh @@ -0,0 +1,28 @@ +#!/bin/sh + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +hadoop dfs -put input input +hadoop dfs -put my_module.zip my_module.zip +hadoop dfs -put other_module.zip other_module.zip + +IN=input +OUT=output + +$HSTREAMING \ + -input $IN\ + -output $OUT \ + -mapper "python mapper.py" \ + -reducer "NONE" \ + -file mapper.py\ + -cacheArchive 'hdfs:///user/simianer/my_module.zip#my_module' \ + -cacheArchive 'hdfs:///user/simianer/other_module.zip#other_module' \ + -jobconf mapred.reduce.tasks=30 #hier mal 30 statt 3 + +hadoop dfs -get $OUT . +hadoop dfs -rm $IN +hadoop dfs -rmr $OUT + diff --git a/hadoop/streaming/mapper/input b/hadoop/streaming/mapper/input new file mode 100644 index 0000000..338fd87 --- /dev/null +++ b/hadoop/streaming/mapper/input @@ -0,0 +1,10 @@ +0 1 a c +0 2 b c +0 3 c c +1 4 a c +1 5 b c +1 6 c c +1 7 d c +2 8 a c +2 9 b c +2 10 c c diff --git a/hadoop/streaming/mapper/mapper_test.py b/hadoop/streaming/mapper/mapper_test.py new file mode 100644 index 0000000..d358bda --- /dev/null +++ b/hadoop/streaming/mapper/mapper_test.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python2 + +import sys + + +if __name__ == "__main__": + for line in sys.stdin: + print line.upper() + diff --git a/hadoop/streaming/mapper/mapper_test.sh b/hadoop/streaming/mapper/mapper_test.sh new file mode 100755 index 0000000..f0c5da3 --- /dev/null +++ b/hadoop/streaming/mapper/mapper_test.sh @@ -0,0 +1,23 @@ +#!/bin/sh + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=mapper_test_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper "python mapper.py" \ + -file mapper_test.py \ + -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ + -jobconf mapred.reduce.tasks=3 + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/mapper/mapper_test1.py b/hadoop/streaming/mapper/mapper_test1.py new file mode 100644 index 0000000..79c8aa6 --- /dev/null +++ b/hadoop/streaming/mapper/mapper_test1.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python2 + +import sys + + +if __name__ == "__main__": + for line in sys.stdin: + print "MYKEY\t%s"%line.upper() + diff --git a/hadoop/streaming/mapper/mapper_test1.sh b/hadoop/streaming/mapper/mapper_test1.sh new file mode 100755 index 0000000..475699f --- /dev/null +++ b/hadoop/streaming/mapper/mapper_test1.sh @@ -0,0 +1,24 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=mapper_test1_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper "python mapper1.py" \ + -file mapper_test1.py \ + -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ + -jobconf mapred.reduce.tasks=3 + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/no_reducer/input b/hadoop/streaming/no_reducer/input new file mode 100644 index 0000000..71ac1b5 --- /dev/null +++ b/hadoop/streaming/no_reducer/input @@ -0,0 +1,8 @@ +a +b +c +d +e +f +g +h diff --git a/hadoop/streaming/no_reducer/no_reducer.rb b/hadoop/streaming/no_reducer/no_reducer.rb new file mode 100755 index 0000000..4410b93 --- /dev/null +++ b/hadoop/streaming/no_reducer/no_reducer.rb @@ -0,0 +1,9 @@ +#!/usr/bin/env ruby + + +i = 0 +while l = STDIN.gets + puts "line #{i} (#{l.strip})" + i+=1 +end + diff --git a/hadoop/streaming/no_reducer/no_reducer.sh b/hadoop/streaming/no_reducer/no_reducer.sh new file mode 100755 index 0000000..c32bfdd --- /dev/null +++ b/hadoop/streaming/no_reducer/no_reducer.sh @@ -0,0 +1,23 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=no_reducer_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper "no_reducer.rb" \ + -file "no_reducer.rb" \ + -reducer NONE + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/partitioner/input b/hadoop/streaming/partitioner/input new file mode 100644 index 0000000..06c6a28 --- /dev/null +++ b/hadoop/streaming/partitioner/input @@ -0,0 +1,8 @@ +1.1 a +2.2 b +3.1 c +4.2 d +4.1 e +1.2 x +2.1 y +4.3 q diff --git a/hadoop/streaming/partitioner/partitioner_test.sh b/hadoop/streaming/partitioner/partitioner_test.sh new file mode 100755 index 0000000..bfb2185 --- /dev/null +++ b/hadoop/streaming/partitioner/partitioner_test.sh @@ -0,0 +1,22 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=partitioner_test_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper /bin/cat \ + -jobconf mapred.reduce.tasks=2 + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/partitioner/partitioner_test1.sh b/hadoop/streaming/partitioner/partitioner_test1.sh new file mode 100755 index 0000000..759f823 --- /dev/null +++ b/hadoop/streaming/partitioner/partitioner_test1.sh @@ -0,0 +1,27 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=partitioner_test1_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper /bin/cat \ + -jobconf mapred.reduce.tasks=2 \ + -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ + -jobconf stream.map.output.field.separator="\t" \ + -jobconf map.output.key.field.separator=. \ + -jobconf stream.num.map.output.key.fields=2 \ + -jobconf mapred.text.key.partitioner.options=-k1 + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/secondary_sort/input b/hadoop/streaming/secondary_sort/input new file mode 100644 index 0000000..5aa7ec9 --- /dev/null +++ b/hadoop/streaming/secondary_sort/input @@ -0,0 +1,8 @@ +0-*-1 a:1 a:2 a:3 +0-*-2 a:1 a:2 a:3 +0-*-10 a:1 a:2 a:3 +1-*-2 a:1 a:2 a:3 +2-*-0 a:1 a:2 a:3 +2-*-2 a:1 a:2 a:3 +3-*-3 a:1 a:2 a:3 +10-*-0 a:1 a:2 a:3 diff --git a/hadoop/streaming/secondary_sort/secondary_sort.sh b/hadoop/streaming/secondary_sort/secondary_sort.sh new file mode 100755 index 0000000..c45f381 --- /dev/null +++ b/hadoop/streaming/secondary_sort/secondary_sort.sh @@ -0,0 +1,30 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=secondary_sort_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ + -jobconf map.output.key.field.separator="-*-" \ + -jobconf mapred.text.key.partitioner.options="-k1,1n" \ + -mapper /bin/cat \ + -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ + -jobconf mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ + -jobconf stream.num.map.output.key.fields=2 \ + -jobconf stream.map.output.field.separator="\t" \ + -jobconf mapred.text.key.comparator.options="-k1,1n -k2,2nr" \ + -jobconf mapred.reduce.tasks=3 + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/streaming/test/input b/hadoop/streaming/test/input new file mode 100644 index 0000000..338fd87 --- /dev/null +++ b/hadoop/streaming/test/input @@ -0,0 +1,10 @@ +0 1 a c +0 2 b c +0 3 c c +1 4 a c +1 5 b c +1 6 c c +1 7 d c +2 8 a c +2 9 b c +2 10 c c diff --git a/hadoop/streaming/test/test.sh b/hadoop/streaming/test/test.sh new file mode 100755 index 0000000..2f09bac --- /dev/null +++ b/hadoop/streaming/test/test.sh @@ -0,0 +1,23 @@ +#!/bin/sh + + +hadoop dfs -put input input + +HADOOP_HOME=/usr/lib/hadoop +HADOOP_VERSION=0.20.2-cdh3u1 +JAR=contrib/streaming/hadoop-streaming-$HADOOP_VERSION.jar +HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR" + +OUT=test_out + +$HSTREAMING \ + -input input \ + -output $OUT \ + -mapper /bin/cat \ + -jobconf "mapred.reduce.tasks=3" \ + -reducer org.apache.hadoop.mapred.lib.IdentityReducer + +hadoop dfs -get $OUT . +hadoop dfs -rmr $OUT +hadoop dfs -rm input + diff --git a/hadoop/wordcount/README b/hadoop/wordcount/README new file mode 100644 index 0000000..b13f384 --- /dev/null +++ b/hadoop/wordcount/README @@ -0,0 +1,6 @@ +source http://hadoop.apache.org/common/docs/current/mapred_tutorial.html + javac -classpath /usr/lib/hadoop/hadoop-0.20.2-cdh3u1-core.jar -d wordcount_classes WordCount.java + mkdir wordcount_classes + jar -cvf wordcount.jar -C wordcount_classes/ . + hadoop jar wordcount.jar org.myorg.WordCount in/infile out/outfile + diff --git a/hadoop/wordcount/WordCount.java b/hadoop/wordcount/WordCount.java new file mode 100644 index 0000000..8917aee --- /dev/null +++ b/hadoop/wordcount/WordCount.java @@ -0,0 +1,57 @@ +package org.myorg; + +import java.io.IOException; +import java.util.*; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.util.*; + +public class WordCount { + + // Mapper + public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { + String line = value.toString(); // TextInputFormat + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + output.collect(word, one); + } + } + } + + // Reducer + public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { + public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + output.collect(key, new IntWritable(sum)); + } + } + + // Driver + public static void main(String[] args) throws Exception { + JobConf conf = new JobConf(WordCount.class); + conf.setJobName("wordcount"); + conf.setNumReduceTasks(200); + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + conf.setMapperClass(Map.class); + conf.setCombinerClass(Reduce.class); // Combiner + conf.setReducerClass(Reduce.class); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + FileInputFormat.setInputPaths(conf, new Path(args[0])); + FileOutputFormat.setOutputPath(conf, new Path(args[1])); + JobClient.runJob(conf); + } + +} + diff --git a/hadoop/wordcount/pipes/Makefile b/hadoop/wordcount/pipes/Makefile new file mode 100644 index 0000000..c892bb8 --- /dev/null +++ b/hadoop/wordcount/pipes/Makefile @@ -0,0 +1,12 @@ +CC = g++ +HADOOP_INSTALL = ../hadoop-0.20.205.0 +PLATFORM = Linux-amd64-64 +CPPFLAGS = -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include + +wordcount: wordcount.cc + $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ + -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@ + +wordcountc: wordcountc.cc + $(CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ + -lhadooputils -lcrypto -lpthread -g -O2 -static -o $@ diff --git a/hadoop/wordcount/pipes/jobconf.xml b/hadoop/wordcount/pipes/jobconf.xml new file mode 100644 index 0000000..facdbce --- /dev/null +++ b/hadoop/wordcount/pipes/jobconf.xml @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<configuration> + <property> + <name>hadoop.pipes.executable</name> + <value>path/to/dp_hadoop_pipes_test</value> + </property> + <property> + <name>hadoop.pipes.java.recordreader</name> + <value>true</value> + </property> + <property> + <name>hadoop.pipes.java.recordwriter</name> + <value>true</value> + </property> +</configuration> + diff --git a/hadoop/wordcount/pipes/run.sh b/hadoop/wordcount/pipes/run.sh new file mode 100755 index 0000000..0b02be7 --- /dev/null +++ b/hadoop/wordcount/pipes/run.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +HADOOP=/usr/lib/hadoop/ + +$HADOOP/bin/hadoop pipes \ + -D hadoop.pipes.java.recordreader=true \ + -D hadoop.pipes.java.recordwriter=true \ + -D mapred.reduce.tasks=30 \ + -input in/bible.txt \ + -output out/bible-wc \ + -program ./wordcount + diff --git a/hadoop/wordcount/pipes/wordcount.cc b/hadoop/wordcount/pipes/wordcount.cc new file mode 100644 index 0000000..c9394d5 --- /dev/null +++ b/hadoop/wordcount/pipes/wordcount.cc @@ -0,0 +1,38 @@ +#include "wordcount.hh" + + +void +WordcountMapper::map(HadoopPipes::MapContext &context) +{ + typedef boost::tokenizer<> tokenizer_t; + tokenizer_t tokenizer(context.getInputValue()); + + for(tokenizer_t::const_iterator i = tokenizer.begin(); + tokenizer.end() != i; ++i) { + context.emit(boost::to_lower_copy(*i), "1"); + } +} + +void +WordcountReducer::reduce(HadoopPipes::ReduceContext &context) +{ + uint32_t count(0); + + do { + ++count; + } while(context.nextValue()); + + //std::cout << context.getInputKey() << endl; + context.emit(context.getInputKey(), + boost::lexical_cast<std::string>(count)); +} + + +int +main(int argc, char *argv[]) +{ + HadoopPipes::TemplateFactory2<WordcountMapper, + WordcountReducer> factory; + return HadoopPipes::runTask(factory); +} + diff --git a/hadoop/wordcount/pipes/wordcount.hh b/hadoop/wordcount/pipes/wordcount.hh new file mode 100644 index 0000000..629acf6 --- /dev/null +++ b/hadoop/wordcount/pipes/wordcount.hh @@ -0,0 +1,34 @@ +#ifndef __WORDCOUNT_HH__ +#define __WORDCOUNT_HH__ + + +#include <iostream> +#include <string> + +#include "hadoop/Pipes.hh" +#include "hadoop/TemplateFactory.hh" + +#include <boost/algorithm/string.hpp> +#include <boost/tokenizer.hpp> +#include <boost/lexical_cast.hpp> + +using namespace std; + + +class WordcountMapper : public HadoopPipes::Mapper +{ + public: + WordcountMapper(const HadoopPipes::TaskContext &) {}; + void map(HadoopPipes::MapContext &context); +}; + +class WordcountReducer : public HadoopPipes::Reducer +{ + public: + WordcountReducer(const HadoopPipes::TaskContext &) {}; + void reduce(HadoopPipes::ReduceContext & context); +}; + + +#endif + diff --git a/hadoop/wordcount/run.sh b/hadoop/wordcount/run.sh new file mode 100755 index 0000000..2906bfb --- /dev/null +++ b/hadoop/wordcount/run.sh @@ -0,0 +1,5 @@ +#!/bin/sh + + +hadoop jar wordcount.jar org.myorg.WordCount in/marec.lc.en out/marec.lc.en-wc + |