summaryrefslogtreecommitdiff
path: root/dtrain/parallelize.rb
diff options
context:
space:
mode:
authorChris Dyer <cdyer@cs.cmu.edu>2012-11-05 21:34:22 -0500
committerChris Dyer <cdyer@cs.cmu.edu>2012-11-05 21:34:22 -0500
commit709bf16fbf7e88d15c9a9f3356c63e9ff38fa05d (patch)
tree3ec67e4084b6be65555d70766ebfe5d2b18cd410 /dtrain/parallelize.rb
parent782fb27af98ed98256cc25c832131c59c8e9ce9c (diff)
parent2eb77b77d28f6d9dc88ecb6ca999994bdb555445 (diff)
Merge branch 'master' of https://github.com/redpony/cdec
Diffstat (limited to 'dtrain/parallelize.rb')
-rwxr-xr-xdtrain/parallelize.rb79
1 files changed, 79 insertions, 0 deletions
diff --git a/dtrain/parallelize.rb b/dtrain/parallelize.rb
new file mode 100755
index 00000000..1d277ff6
--- /dev/null
+++ b/dtrain/parallelize.rb
@@ -0,0 +1,79 @@
+#!/usr/bin/env ruby
+
+
+if ARGV.size != 5
+ STDERR.write "Usage: "
+ STDERR.write "ruby parallelize.rb <#shards> <input> <refs> <epochs> <dtrain.ini>\n"
+ exit
+end
+
+dtrain_bin = '/home/pks/bin/dtrain_local'
+ruby = '/usr/bin/ruby'
+lplp_rb = '/home/pks/mt/cdec-dtrain/dtrain/hstreaming/lplp.rb'
+lplp_args = 'l2 select_k 100000'
+gzip = '/bin/gzip'
+
+num_shards = ARGV[0].to_i
+input = ARGV[1]
+refs = ARGV[2]
+epochs = ARGV[3].to_i
+ini = ARGV[4]
+
+
+`mkdir work`
+
+def make_shards(input, refs, num_shards)
+ lc = `wc -l #{input}`.split.first.to_i
+ shard_sz = lc / num_shards
+ leftover = lc % num_shards
+ in_f = File.new input, 'r'
+ refs_f = File.new refs, 'r'
+ shard_in_files = []
+ shard_refs_files = []
+ 0.upto(num_shards-1) { |shard|
+ shard_in = File.new "work/shard.#{shard}.in", 'w+'
+ shard_refs = File.new "work/shard.#{shard}.refs", 'w+'
+ 0.upto(shard_sz-1) { |i|
+ shard_in.write in_f.gets
+ shard_refs.write refs_f.gets
+ }
+ shard_in_files << shard_in
+ shard_refs_files << shard_refs
+ }
+ while leftover > 0
+ shard_in_files[-1].write in_f.gets
+ shard_refs_files[-1].write refs_f.gets
+ leftover -= 1
+ end
+ (shard_in_files + shard_refs_files).each do |f| f.close end
+ in_f.close
+ refs_f.close
+end
+
+make_shards input, refs, num_shards
+
+0.upto(epochs-1) { |epoch|
+ pids = []
+ input_weights = ''
+ if epoch > 0 then input_weights = "--input_weights work/weights.#{epoch-1}" end
+ weights_files = []
+ 0.upto(num_shards-1) { |shard|
+ pids << Kernel.fork {
+ `#{dtrain_bin} -c #{ini}\
+ --input work/shard.#{shard}.in\
+ --refs work/shard.#{shard}.refs #{input_weights}\
+ --output work/weights.#{shard}.#{epoch}\
+ &> work/out.#{shard}.#{epoch}`
+ }
+ weights_files << "work/weights.#{shard}.#{epoch}"
+ }
+ pids.each { |pid| Process.wait(pid) }
+ cat = File.new('work/weights_cat', 'w+')
+ weights_files.each { |f| cat.write File.new(f, 'r').read }
+ cat.close
+ `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat &> work/weights.#{epoch}`
+}
+
+`rm work/weights_cat`
+`#{gzip} work/*`
+