summaryrefslogtreecommitdiff
path: root/dtrain/hstreaming
diff options
context:
space:
mode:
Diffstat (limited to 'dtrain/hstreaming')
-rwxr-xr-xdtrain/hstreaming/avg.rb (renamed from dtrain/hstreaming/red-avg.rb)14
-rwxr-xr-xdtrain/hstreaming/lplp.rb112
-rwxr-xr-xdtrain/hstreaming/red-all.rb26
3 files changed, 78 insertions, 74 deletions
diff --git a/dtrain/hstreaming/red-avg.rb b/dtrain/hstreaming/avg.rb
index 9326ffbe..e0899144 100755
--- a/dtrain/hstreaming/red-avg.rb
+++ b/dtrain/hstreaming/avg.rb
@@ -1,4 +1,4 @@
-#!/usr/bin/env ruby1.9.1
+# avg.rb
shard_count_key = "__SHARD_COUNT__"
@@ -10,18 +10,22 @@ c = {}
w.default = 0
c.default = 0
while line = STDIN.gets
- key, val = line.split /\t/
+ key, val = line.split /\s/
w[key] += val.to_f
c[key] += 1
end
-puts "# dtrain reducer: average"
-shard_count = w["__SHARD_COUNT__"]
+if ARGV.size == 0
+ shard_count = w["__SHARD_COUNT__"]
+else
+ shard_count = ARGV[0].to_f
+end
w.each_key { |k|
if k == shard_count_key
puts "# shard count: #{shard_count.to_i}"
else
- puts "#{k}\t#{w[k]/shard_count}\t# #{c[k]}"
+ puts "#{k}\t#{w[k]/shard_count}"
+ puts "# #{c[k]}"
end
}
diff --git a/dtrain/hstreaming/lplp.rb b/dtrain/hstreaming/lplp.rb
index edb93e77..0ec21a46 100755
--- a/dtrain/hstreaming/lplp.rb
+++ b/dtrain/hstreaming/lplp.rb
@@ -2,15 +2,15 @@
# norms
def l0(feature_column, n)
- if feature_column.size == n then return 1 else return 0 end
+ if feature_column.size >= n then return 1 else return 0 end
end
def l1(feature_column, n=-1)
- return feature_column.reduce { |sum, i| i.abs }
+ return feature_column.map { |i| i.abs }.reduce { |sum,i| sum+i }
end
def l2(feature_column, n=-1)
- return Math.sqrt feature_column.reduce { |sum, i| i**2 }
+ return Math.sqrt feature_column.map { |i| i.abs2 }.reduce { |sum,i| sum+i }
end
def linfty(feature_column, n=-1)
@@ -18,7 +18,7 @@ def linfty(feature_column, n=-1)
end
# stats
-def M(feature_column, n)
+def median(feature_column, n)
return feature_column.concat(0.step(n-feature_column.size-1).map{|i|0}).sort[feature_column.size/2]
end
@@ -27,22 +27,73 @@ def mean(feature_column, n)
end
# selection
-def select_k(weights, normfn, n, k=10000)
- weights.sort{|a,b| normfn.call(b[1], n) <=> normfn.call(a[1], n)}.each { |p|
+def select_k(weights, norm_fun, n, k=10000)
+ weights.sort{|a,b| norm_fun.call(b[1], n) <=> norm_fun.call(a[1], n)}.each { |p|
puts "#{p[0]}\t#{mean(p[1], n)}"
k -= 1
if k == 0 then break end
}
end
-def cut(weights, normfn, n, epsilon=0.0001)
+def cut(weights, norm_fun, n, epsilon=0.0001)
weights.each { |k,v|
- if normfn.call(v).abs > epsilon
+ if norm_fun.call(v, n).abs > epsilon
puts "#{k}\t#{mean(v, n)}"
end
}
end
+# test
+def _test()
+ puts
+ w = {}
+ w["a"] = [1, 2, 3]
+ w["b"] = [1, 2]
+ w["c"] = [66]
+ w["d"] = [10, 20, 30]
+ n = 3
+ puts w.to_s
+ puts
+ puts "select_k"
+ puts "l0 expect ad"
+ select_k(w, method(:l0), n, 2)
+ puts "l1 expect cd"
+ select_k(w, method(:l1), n, 2)
+ puts "l2 expect c"
+ select_k(w, method(:l2), n, 1)
+ puts
+ puts "cut"
+ puts "l1 expect cd"
+ cut(w, method(:l1), n, 7)
+ puts
+ puts "median"
+ a = [1,2,3,4,5]
+ puts a.to_s
+ puts median(a, 5)
+ puts
+ puts "#{median(a, 7)} <- that's because we add missing 0s:"
+ puts a.concat(0.step(7-a.size-1).map{|i|0}).to_s
+ puts
+ puts "mean expect bc"
+ w.clear
+ w["a"] = [2]
+ w["b"] = [2.1]
+ w["c"] = [2.2]
+ cut(w, method(:mean), 1, 2.05)
+ exit
+end
+_test()
+
+# actually do something
+def usage()
+ puts "lplp.rb <l0,l1,l2,linfty,mean,median> <cut|select_k> <k|threshold> [n] < <input>"
+ exit
+end
+
+if ARGV.size < 3 then usage end
+norm_fun = method(ARGV[0].to_sym)
+type = ARGV[1]
+x = ARGV[2].to_f
shard_count_key = "__SHARD_COUNT__"
@@ -53,49 +104,24 @@ w = {}
shard_count = 0
while line = STDIN.gets
key, val = line.split /\t/
- if k = shard_count_key
+ if key == shard_count_key
shard_count += 1
next
end
if w.has_key? key
- w[key].push val
+ w[key].push val.to_f
else
- w[key] = [val]
+ w[key] = [val.to_f]
end
end
-select_k(w, method(:l1), shard_count, 100000)
+if ARGV.size == 4 then shard_count = ARGV[3].to_f end
-def _test()
-puts
-w = {}
-w["a"] = [1, 2, 3]
-w["b"] = [1, 2]
-w["c"] = [66]
-w["d"] = [10, 20, 30]
-n = 3
-puts w.to_s
-puts
-puts "select_k"
-puts "l0 expect ad"
-select_k(w, method(:l0), n, 2)
-puts "l1 expect c"
-select_k(w, method(:l1), n, 1)
-puts "l2 expect d"
-select_k(w, method(:l2), n, 1)
-puts
-puts "cut"
-puts "l1 expect cd"
-cut(w, method(:l1), n, 7)
-puts
-puts "M"
-a = [1,3,4,5,6]
-puts a.to_s
-puts M(a, 7)
-puts "that's because we add missing 0s"
-puts a.concat(0.step(7-a.size-1).map{|i|0}).to_s
-puts
+if type == 'cut'
+ cut(w, norm_fun, shard_count, x)
+elsif type == 'select_k'
+ select_k(w, norm_fun, shard_count, x)
+else
+ puts "oh oh"
end
-#_test()
-
diff --git a/dtrain/hstreaming/red-all.rb b/dtrain/hstreaming/red-all.rb
deleted file mode 100755
index bbc65945..00000000
--- a/dtrain/hstreaming/red-all.rb
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/usr/bin/env ruby1.9.1
-
-
-shard_count_key = "__SHARD_COUNT__"
-
-STDIN.set_encoding 'utf-8'
-STDOUT.set_encoding 'utf-8'
-
-w = {}
-c = {}
-w.default = 0
-c.default = 0
-while line = STDIN.gets
- key, val = line.split /\t/
- w[key] += val.to_f
- c[key] += 1
-end
-
-puts "# dtrain reducer: active on all"
-shard_count = w["__SHARD_COUNT__"]
-puts "shard count #{shard_count}"
-w.each_key { |k|
- if k == shard_count_key then next end
- if c[k] == shard_count then puts "#{k}\t#{w[k]/shard_count}" end
-}
-