summaryrefslogtreecommitdiff
path: root/dtrain/hstreaming
diff options
context:
space:
mode:
Diffstat (limited to 'dtrain/hstreaming')
-rwxr-xr-xdtrain/hstreaming/avg.rb32
-rw-r--r--dtrain/hstreaming/cdec.ini22
-rw-r--r--dtrain/hstreaming/dtrain.ini15
-rwxr-xr-xdtrain/hstreaming/dtrain.sh9
-rwxr-xr-xdtrain/hstreaming/hadoop-streaming-job.sh30
-rwxr-xr-xdtrain/hstreaming/lplp.rb131
-rw-r--r--dtrain/hstreaming/red-test9
7 files changed, 248 insertions, 0 deletions
diff --git a/dtrain/hstreaming/avg.rb b/dtrain/hstreaming/avg.rb
new file mode 100755
index 00000000..2599c732
--- /dev/null
+++ b/dtrain/hstreaming/avg.rb
@@ -0,0 +1,32 @@
+#!/usr/bin/env ruby
+# first arg may be an int of custom shard count
+
+shard_count_key = "__SHARD_COUNT__"
+
+STDIN.set_encoding 'utf-8'
+STDOUT.set_encoding 'utf-8'
+
+w = {}
+c = {}
+w.default = 0
+c.default = 0
+while line = STDIN.gets
+ key, val = line.split /\s/
+ w[key] += val.to_f
+ c[key] += 1
+end
+
+if ARGV.size == 0
+ shard_count = w["__SHARD_COUNT__"]
+else
+ shard_count = ARGV[0].to_f
+end
+w.each_key { |k|
+ if k == shard_count_key
+ next
+ else
+ puts "#{k}\t#{w[k]/shard_count}"
+ #puts "# #{c[k]}"
+ end
+}
+
diff --git a/dtrain/hstreaming/cdec.ini b/dtrain/hstreaming/cdec.ini
new file mode 100644
index 00000000..d4f5cecd
--- /dev/null
+++ b/dtrain/hstreaming/cdec.ini
@@ -0,0 +1,22 @@
+formalism=scfg
+add_pass_through_rules=true
+scfg_max_span_limit=15
+intersection_strategy=cube_pruning
+cubepruning_pop_limit=30
+feature_function=WordPenalty
+feature_function=KLanguageModel nc-wmt11.en.srilm.gz
+#feature_function=ArityPenalty
+#feature_function=CMR2008ReorderingFeatures
+#feature_function=Dwarf
+#feature_function=InputIndicator
+#feature_function=LexNullJump
+#feature_function=NewJump
+#feature_function=NgramFeatures
+#feature_function=NonLatinCount
+#feature_function=OutputIndicator
+#feature_function=RuleIdentityFeatures
+#feature_function=RuleNgramFeatures
+#feature_function=RuleShape
+#feature_function=SourceSpanSizeFeatures
+#feature_function=SourceWordPenalty
+#feature_function=SpanFeatures
diff --git a/dtrain/hstreaming/dtrain.ini b/dtrain/hstreaming/dtrain.ini
new file mode 100644
index 00000000..a2c219a1
--- /dev/null
+++ b/dtrain/hstreaming/dtrain.ini
@@ -0,0 +1,15 @@
+input=-
+output=-
+decoder_config=cdec.ini
+tmp=/var/hadoop/mapred/local/
+epochs=1
+k=100
+N=4
+learning_rate=0.0001
+gamma=0
+scorer=stupid_bleu
+sample_from=kbest
+filter=uniq
+pair_sampling=XYX
+pair_threshold=0
+select_weights=last
diff --git a/dtrain/hstreaming/dtrain.sh b/dtrain/hstreaming/dtrain.sh
new file mode 100755
index 00000000..877ff94c
--- /dev/null
+++ b/dtrain/hstreaming/dtrain.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+# script to run dtrain with a task id
+
+pushd . &>/dev/null
+cd ..
+ID=$(basename $(pwd)) # attempt_...
+popd &>/dev/null
+./dtrain -c dtrain.ini --hstreaming $ID
+
diff --git a/dtrain/hstreaming/hadoop-streaming-job.sh b/dtrain/hstreaming/hadoop-streaming-job.sh
new file mode 100755
index 00000000..92419956
--- /dev/null
+++ b/dtrain/hstreaming/hadoop-streaming-job.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+
+EXP=a_simple_test
+
+# change these vars to fit your hadoop installation
+HADOOP_HOME=/usr/lib/hadoop-0.20
+JAR=contrib/streaming/hadoop-streaming-0.20.2-cdh3u1.jar
+HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
+
+ IN=input_on_hdfs
+OUT=output_weights_on_hdfs
+
+# you can -reducer to NONE if you want to
+# do feature selection/averaging locally (e.g. to
+# keep weights of all epochs)
+$HSTREAMING \
+ -mapper "dtrain.sh" \
+ -reducer "ruby lplp.rb l2 select_k 100000" \
+ -input $IN \
+ -output $OUT \
+ -file dtrain.sh \
+ -file lplp.rb \
+ -file ../dtrain \
+ -file dtrain.ini \
+ -file cdec.ini \
+ -file ../test/example/nc-wmt11.en.srilm.gz \
+ -jobconf mapred.reduce.tasks=30 \
+ -jobconf mapred.max.map.failures.percent=0 \
+ -jobconf mapred.job.name="dtrain $EXP"
+
diff --git a/dtrain/hstreaming/lplp.rb b/dtrain/hstreaming/lplp.rb
new file mode 100755
index 00000000..f0cd58c5
--- /dev/null
+++ b/dtrain/hstreaming/lplp.rb
@@ -0,0 +1,131 @@
+# lplp.rb
+
+# norms
+def l0(feature_column, n)
+ if feature_column.size >= n then return 1 else return 0 end
+end
+
+def l1(feature_column, n=-1)
+ return feature_column.map { |i| i.abs }.reduce { |sum,i| sum+i }
+end
+
+def l2(feature_column, n=-1)
+ return Math.sqrt feature_column.map { |i| i.abs2 }.reduce { |sum,i| sum+i }
+end
+
+def linfty(feature_column, n=-1)
+ return feature_column.map { |i| i.abs }.max
+end
+
+# stats
+def median(feature_column, n)
+ return feature_column.concat(0.step(n-feature_column.size-1).map{|i|0}).sort[feature_column.size/2]
+end
+
+def mean(feature_column, n)
+ return feature_column.reduce { |sum, i| sum+i } / n
+end
+
+# selection
+def select_k(weights, norm_fun, n, k=10000)
+ weights.sort{|a,b| norm_fun.call(b[1], n) <=> norm_fun.call(a[1], n)}.each { |p|
+ puts "#{p[0]}\t#{mean(p[1], n)}"
+ k -= 1
+ if k == 0 then break end
+ }
+end
+
+def cut(weights, norm_fun, n, epsilon=0.0001)
+ weights.each { |k,v|
+ if norm_fun.call(v, n).abs >= epsilon
+ puts "#{k}\t#{mean(v, n)}"
+ end
+ }
+end
+
+# test
+def _test()
+ puts
+ w = {}
+ w["a"] = [1, 2, 3]
+ w["b"] = [1, 2]
+ w["c"] = [66]
+ w["d"] = [10, 20, 30]
+ n = 3
+ puts w.to_s
+ puts
+ puts "select_k"
+ puts "l0 expect ad"
+ select_k(w, method(:l0), n, 2)
+ puts "l1 expect cd"
+ select_k(w, method(:l1), n, 2)
+ puts "l2 expect c"
+ select_k(w, method(:l2), n, 1)
+ puts
+ puts "cut"
+ puts "l1 expect cd"
+ cut(w, method(:l1), n, 7)
+ puts
+ puts "median"
+ a = [1,2,3,4,5]
+ puts a.to_s
+ puts median(a, 5)
+ puts
+ puts "#{median(a, 7)} <- that's because we add missing 0s:"
+ puts a.concat(0.step(7-a.size-1).map{|i|0}).to_s
+ puts
+ puts "mean expect bc"
+ w.clear
+ w["a"] = [2]
+ w["b"] = [2.1]
+ w["c"] = [2.2]
+ cut(w, method(:mean), 1, 2.05)
+ exit
+end
+#_test()
+
+# actually do something
+def usage()
+ puts "lplp.rb <l0,l1,l2,linfty,mean,median> <cut|select_k> <k|threshold> [n] < <input>"
+ puts " l0...: norms for selection"
+ puts "select_k: only output top k (according to the norm of their column vector) features"
+ puts " cut: output features with weight >= threshold"
+ puts " n: if we do not have a shard count use this number for averaging"
+ exit
+end
+
+if ARGV.size < 3 then usage end
+norm_fun = method(ARGV[0].to_sym)
+type = ARGV[1]
+x = ARGV[2].to_f
+
+shard_count_key = "__SHARD_COUNT__"
+
+STDIN.set_encoding 'utf-8'
+STDOUT.set_encoding 'utf-8'
+
+w = {}
+shard_count = 0
+while line = STDIN.gets
+ key, val = line.split /\s+/
+ if key == shard_count_key
+ shard_count += 1
+ next
+ end
+ if w.has_key? key
+ w[key].push val.to_f
+ else
+ w[key] = [val.to_f]
+ end
+end
+
+if ARGV.size == 4 then shard_count = ARGV[3].to_f end
+
+if type == 'cut'
+ cut(w, norm_fun, shard_count, x)
+elsif type == 'select_k'
+ select_k(w, norm_fun, shard_count, x)
+else
+ puts "oh oh"
+end
+
diff --git a/dtrain/hstreaming/red-test b/dtrain/hstreaming/red-test
new file mode 100644
index 00000000..2623d697
--- /dev/null
+++ b/dtrain/hstreaming/red-test
@@ -0,0 +1,9 @@
+a 1
+b 2
+c 3.5
+a 1
+b 2
+c 3.5
+d 1
+e 2
+__SHARD_COUNT__ 2