diff options
Diffstat (limited to 'dtrain/hstreaming')
-rwxr-xr-x | dtrain/hstreaming/avg.rb (renamed from dtrain/hstreaming/red-avg.rb) | 14 | ||||
-rwxr-xr-x | dtrain/hstreaming/lplp.rb | 112 | ||||
-rwxr-xr-x | dtrain/hstreaming/red-all.rb | 26 |
3 files changed, 78 insertions, 74 deletions
diff --git a/dtrain/hstreaming/red-avg.rb b/dtrain/hstreaming/avg.rb index 9326ffbe..e0899144 100755 --- a/dtrain/hstreaming/red-avg.rb +++ b/dtrain/hstreaming/avg.rb @@ -1,4 +1,4 @@ -#!/usr/bin/env ruby1.9.1 +# avg.rb shard_count_key = "__SHARD_COUNT__" @@ -10,18 +10,22 @@ c = {} w.default = 0 c.default = 0 while line = STDIN.gets - key, val = line.split /\t/ + key, val = line.split /\s/ w[key] += val.to_f c[key] += 1 end -puts "# dtrain reducer: average" -shard_count = w["__SHARD_COUNT__"] +if ARGV.size == 0 + shard_count = w["__SHARD_COUNT__"] +else + shard_count = ARGV[0].to_f +end w.each_key { |k| if k == shard_count_key puts "# shard count: #{shard_count.to_i}" else - puts "#{k}\t#{w[k]/shard_count}\t# #{c[k]}" + puts "#{k}\t#{w[k]/shard_count}" + puts "# #{c[k]}" end } diff --git a/dtrain/hstreaming/lplp.rb b/dtrain/hstreaming/lplp.rb index edb93e77..0ec21a46 100755 --- a/dtrain/hstreaming/lplp.rb +++ b/dtrain/hstreaming/lplp.rb @@ -2,15 +2,15 @@ # norms def l0(feature_column, n) - if feature_column.size == n then return 1 else return 0 end + if feature_column.size >= n then return 1 else return 0 end end def l1(feature_column, n=-1) - return feature_column.reduce { |sum, i| i.abs } + return feature_column.map { |i| i.abs }.reduce { |sum,i| sum+i } end def l2(feature_column, n=-1) - return Math.sqrt feature_column.reduce { |sum, i| i**2 } + return Math.sqrt feature_column.map { |i| i.abs2 }.reduce { |sum,i| sum+i } end def linfty(feature_column, n=-1) @@ -18,7 +18,7 @@ def linfty(feature_column, n=-1) end # stats -def M(feature_column, n) +def median(feature_column, n) return feature_column.concat(0.step(n-feature_column.size-1).map{|i|0}).sort[feature_column.size/2] end @@ -27,22 +27,73 @@ def mean(feature_column, n) end # selection -def select_k(weights, normfn, n, k=10000) - weights.sort{|a,b| normfn.call(b[1], n) <=> normfn.call(a[1], n)}.each { |p| +def select_k(weights, norm_fun, n, k=10000) + weights.sort{|a,b| norm_fun.call(b[1], n) <=> norm_fun.call(a[1], n)}.each { |p| puts "#{p[0]}\t#{mean(p[1], n)}" k -= 1 if k == 0 then break end } end -def cut(weights, normfn, n, epsilon=0.0001) +def cut(weights, norm_fun, n, epsilon=0.0001) weights.each { |k,v| - if normfn.call(v).abs > epsilon + if norm_fun.call(v, n).abs > epsilon puts "#{k}\t#{mean(v, n)}" end } end +# test +def _test() + puts + w = {} + w["a"] = [1, 2, 3] + w["b"] = [1, 2] + w["c"] = [66] + w["d"] = [10, 20, 30] + n = 3 + puts w.to_s + puts + puts "select_k" + puts "l0 expect ad" + select_k(w, method(:l0), n, 2) + puts "l1 expect cd" + select_k(w, method(:l1), n, 2) + puts "l2 expect c" + select_k(w, method(:l2), n, 1) + puts + puts "cut" + puts "l1 expect cd" + cut(w, method(:l1), n, 7) + puts + puts "median" + a = [1,2,3,4,5] + puts a.to_s + puts median(a, 5) + puts + puts "#{median(a, 7)} <- that's because we add missing 0s:" + puts a.concat(0.step(7-a.size-1).map{|i|0}).to_s + puts + puts "mean expect bc" + w.clear + w["a"] = [2] + w["b"] = [2.1] + w["c"] = [2.2] + cut(w, method(:mean), 1, 2.05) + exit +end +_test() + +# actually do something +def usage() + puts "lplp.rb <l0,l1,l2,linfty,mean,median> <cut|select_k> <k|threshold> [n] < <input>" + exit +end + +if ARGV.size < 3 then usage end +norm_fun = method(ARGV[0].to_sym) +type = ARGV[1] +x = ARGV[2].to_f shard_count_key = "__SHARD_COUNT__" @@ -53,49 +104,24 @@ w = {} shard_count = 0 while line = STDIN.gets key, val = line.split /\t/ - if k = shard_count_key + if key == shard_count_key shard_count += 1 next end if w.has_key? key - w[key].push val + w[key].push val.to_f else - w[key] = [val] + w[key] = [val.to_f] end end -select_k(w, method(:l1), shard_count, 100000) +if ARGV.size == 4 then shard_count = ARGV[3].to_f end -def _test() -puts -w = {} -w["a"] = [1, 2, 3] -w["b"] = [1, 2] -w["c"] = [66] -w["d"] = [10, 20, 30] -n = 3 -puts w.to_s -puts -puts "select_k" -puts "l0 expect ad" -select_k(w, method(:l0), n, 2) -puts "l1 expect c" -select_k(w, method(:l1), n, 1) -puts "l2 expect d" -select_k(w, method(:l2), n, 1) -puts -puts "cut" -puts "l1 expect cd" -cut(w, method(:l1), n, 7) -puts -puts "M" -a = [1,3,4,5,6] -puts a.to_s -puts M(a, 7) -puts "that's because we add missing 0s" -puts a.concat(0.step(7-a.size-1).map{|i|0}).to_s -puts +if type == 'cut' + cut(w, norm_fun, shard_count, x) +elsif type == 'select_k' + select_k(w, norm_fun, shard_count, x) +else + puts "oh oh" end -#_test() - diff --git a/dtrain/hstreaming/red-all.rb b/dtrain/hstreaming/red-all.rb deleted file mode 100755 index bbc65945..00000000 --- a/dtrain/hstreaming/red-all.rb +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env ruby1.9.1 - - -shard_count_key = "__SHARD_COUNT__" - -STDIN.set_encoding 'utf-8' -STDOUT.set_encoding 'utf-8' - -w = {} -c = {} -w.default = 0 -c.default = 0 -while line = STDIN.gets - key, val = line.split /\t/ - w[key] += val.to_f - c[key] += 1 -end - -puts "# dtrain reducer: active on all" -shard_count = w["__SHARD_COUNT__"] -puts "shard count #{shard_count}" -w.each_key { |k| - if k == shard_count_key then next end - if c[k] == shard_count then puts "#{k}\t#{w[k]/shard_count}" end -} - |