summaryrefslogtreecommitdiff
path: root/training/dtrain/parallelize.rb
diff options
context:
space:
mode:
Diffstat (limited to 'training/dtrain/parallelize.rb')
-rwxr-xr-xtraining/dtrain/parallelize.rb118
1 files changed, 83 insertions, 35 deletions
diff --git a/training/dtrain/parallelize.rb b/training/dtrain/parallelize.rb
index eb4148f5..9b0923f6 100755
--- a/training/dtrain/parallelize.rb
+++ b/training/dtrain/parallelize.rb
@@ -1,80 +1,128 @@
#!/usr/bin/env ruby
-if ARGV.size != 5
+if ARGV.size != 7
STDERR.write "Usage: "
- STDERR.write "ruby parallelize.rb <#shards> <input> <refs> <epochs> <dtrain.ini>\n"
+ STDERR.write "ruby parallelize.rb <dtrain.ini> <epochs> <rand=true|false> <#shards|predef> <at once> <input> <refs> <qsub>\n"
exit
end
-cdec_dir = '/path/to/cdec_dir'
-dtrain_bin = "#{cdec_dir}/training/dtrain/dtrain_local"
+dtrain_dir = File.expand_path File.dirname(__FILE__)
+dtrain_bin = "#{dtrain_dir}/dtrain"
ruby = '/usr/bin/ruby'
-lplp_rb = "#{cdec_dir}/training/dtrain/hstreaming/lplp.rb"
+lplp_rb = "#{dtrain_dir}/hstreaming/lplp.rb"
lplp_args = 'l2 select_k 100000'
-gzip = '/bin/gzip'
-
-num_shards = ARGV[0].to_i
-input = ARGV[1]
-refs = ARGV[2]
-epochs = ARGV[3].to_i
-ini = ARGV[4]
+cat = '/bin/cat'
+ini = ARGV[0]
+epochs = ARGV[1].to_i
+rand = false
+rand = true if ARGV[2]=='true'
+predefined_shards = false
+if ARGV[3] == 'predef'
+ predefined_shards = true
+ num_shards = -1
+else
+ num_shards = ARGV[3].to_i
+end
+shards_at_once = ARGV[4].to_i
+input = ARGV[5]
+refs = ARGV[6]
+use_qsub = false
+use_qsub = true if ARGV[7]
`mkdir work`
-def make_shards(input, refs, num_shards)
+def make_shards(input, refs, num_shards, epoch, rand)
lc = `wc -l #{input}`.split.first.to_i
+ index = (0..lc-1).to_a
+ index.reverse!
+ index.shuffle! if rand
shard_sz = lc / num_shards
leftover = lc % num_shards
in_f = File.new input, 'r'
+ in_lines = in_f.readlines
refs_f = File.new refs, 'r'
+ refs_lines = refs_f.readlines
shard_in_files = []
shard_refs_files = []
+ in_fns = []
+ refs_fns = []
0.upto(num_shards-1) { |shard|
- shard_in = File.new "work/shard.#{shard}.in", 'w+'
- shard_refs = File.new "work/shard.#{shard}.refs", 'w+'
+ in_fn = "work/shard.#{shard}.#{epoch}.in"
+ shard_in = File.new in_fn, 'w+'
+ in_fns << in_fn
+ refs_fn = "work/shard.#{shard}.#{epoch}.refs"
+ shard_refs = File.new refs_fn, 'w+'
+ refs_fns << refs_fn
0.upto(shard_sz-1) { |i|
- shard_in.write in_f.gets
- shard_refs.write refs_f.gets
+ j = index.pop
+ shard_in.write in_lines[j]
+ shard_refs.write refs_lines[j]
}
shard_in_files << shard_in
shard_refs_files << shard_refs
}
while leftover > 0
- shard_in_files[-1].write in_f.gets
- shard_refs_files[-1].write refs_f.gets
+ j = index.pop
+ shard_in_files[-1].write in_lines[j]
+ shard_refs_files[-1].write refs_lines[j]
leftover -= 1
end
(shard_in_files + shard_refs_files).each do |f| f.close end
in_f.close
refs_f.close
+ return [in_fns, refs_fns]
end
-make_shards input, refs, num_shards
+input_files = []
+refs_files = []
+if predefined_shards
+ input_files = File.new(input).readlines.map {|i| i.strip }
+ refs_files = File.new(refs).readlines.map {|i| i.strip }
+ num_shards = input_files.size
+else
+ input_files, refs_files = make_shards input, refs, num_shards, 0, rand
+end
0.upto(epochs-1) { |epoch|
+ puts "epoch #{epoch+1}"
pids = []
input_weights = ''
if epoch > 0 then input_weights = "--input_weights work/weights.#{epoch-1}" end
weights_files = []
- 0.upto(num_shards-1) { |shard|
- pids << Kernel.fork {
- `#{dtrain_bin} -c #{ini}\
- --input work/shard.#{shard}.in\
- --refs work/shard.#{shard}.refs #{input_weights}\
- --output work/weights.#{shard}.#{epoch}\
- &> work/out.#{shard}.#{epoch}`
+ shard = 0
+ remaining_shards = num_shards
+ while remaining_shards > 0
+ shards_at_once.times {
+ qsub_str_start = qsub_str_end = ''
+ local_end = ''
+ if use_qsub
+ qsub_str_start = "qsub -cwd -sync y -b y -j y -o work/out.#{shard}.#{epoch} -N dtrain.#{shard}.#{epoch} \""
+ qsub_str_end = "\""
+ local_end = ''
+ else
+ local_end = "&>work/out.#{shard}.#{epoch}"
+ end
+ pids << Kernel.fork {
+ `#{qsub_str_start}#{dtrain_bin} -c #{ini}\
+ --input #{input_files[shard]}\
+ --refs #{refs_files[shard]} #{input_weights}\
+ --output work/weights.#{shard}.#{epoch}#{qsub_str_end} #{local_end}`
+ }
+ weights_files << "work/weights.#{shard}.#{epoch}"
+ shard += 1
+ remaining_shards -= 1
}
- weights_files << "work/weights.#{shard}.#{epoch}"
- }
- pids.each { |pid| Process.wait(pid) }
- cat = File.new('work/weights_cat', 'w+')
- weights_files.each { |f| cat.write File.new(f, 'r').read }
- cat.close
- `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat &> work/weights.#{epoch}`
+ pids.each { |pid| Process.wait(pid) }
+ pids.clear
+ end
+ `#{cat} work/weights.*.#{epoch} > work/weights_cat`
+ `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat > work/weights.#{epoch}`
+ if rand and epoch+1!=epochs
+ input_files, refs_files = make_shards input, refs, num_shards, epoch+1, rand
+ end
}
`rm work/weights_cat`
-`#{gzip} work/*`