diff options
Diffstat (limited to 'training/dtrain/parallelize.rb')
-rwxr-xr-x | training/dtrain/parallelize.rb | 118 |
1 files changed, 83 insertions, 35 deletions
diff --git a/training/dtrain/parallelize.rb b/training/dtrain/parallelize.rb index eb4148f5..9b0923f6 100755 --- a/training/dtrain/parallelize.rb +++ b/training/dtrain/parallelize.rb @@ -1,80 +1,128 @@ #!/usr/bin/env ruby -if ARGV.size != 5 +if ARGV.size != 7 STDERR.write "Usage: " - STDERR.write "ruby parallelize.rb <#shards> <input> <refs> <epochs> <dtrain.ini>\n" + STDERR.write "ruby parallelize.rb <dtrain.ini> <epochs> <rand=true|false> <#shards|predef> <at once> <input> <refs> <qsub>\n" exit end -cdec_dir = '/path/to/cdec_dir' -dtrain_bin = "#{cdec_dir}/training/dtrain/dtrain_local" +dtrain_dir = File.expand_path File.dirname(__FILE__) +dtrain_bin = "#{dtrain_dir}/dtrain" ruby = '/usr/bin/ruby' -lplp_rb = "#{cdec_dir}/training/dtrain/hstreaming/lplp.rb" +lplp_rb = "#{dtrain_dir}/hstreaming/lplp.rb" lplp_args = 'l2 select_k 100000' -gzip = '/bin/gzip' - -num_shards = ARGV[0].to_i -input = ARGV[1] -refs = ARGV[2] -epochs = ARGV[3].to_i -ini = ARGV[4] +cat = '/bin/cat' +ini = ARGV[0] +epochs = ARGV[1].to_i +rand = false +rand = true if ARGV[2]=='true' +predefined_shards = false +if ARGV[3] == 'predef' + predefined_shards = true + num_shards = -1 +else + num_shards = ARGV[3].to_i +end +shards_at_once = ARGV[4].to_i +input = ARGV[5] +refs = ARGV[6] +use_qsub = false +use_qsub = true if ARGV[7] `mkdir work` -def make_shards(input, refs, num_shards) +def make_shards(input, refs, num_shards, epoch, rand) lc = `wc -l #{input}`.split.first.to_i + index = (0..lc-1).to_a + index.reverse! + index.shuffle! if rand shard_sz = lc / num_shards leftover = lc % num_shards in_f = File.new input, 'r' + in_lines = in_f.readlines refs_f = File.new refs, 'r' + refs_lines = refs_f.readlines shard_in_files = [] shard_refs_files = [] + in_fns = [] + refs_fns = [] 0.upto(num_shards-1) { |shard| - shard_in = File.new "work/shard.#{shard}.in", 'w+' - shard_refs = File.new "work/shard.#{shard}.refs", 'w+' + in_fn = "work/shard.#{shard}.#{epoch}.in" + shard_in = File.new in_fn, 'w+' + in_fns << in_fn + refs_fn = "work/shard.#{shard}.#{epoch}.refs" + shard_refs = File.new refs_fn, 'w+' + refs_fns << refs_fn 0.upto(shard_sz-1) { |i| - shard_in.write in_f.gets - shard_refs.write refs_f.gets + j = index.pop + shard_in.write in_lines[j] + shard_refs.write refs_lines[j] } shard_in_files << shard_in shard_refs_files << shard_refs } while leftover > 0 - shard_in_files[-1].write in_f.gets - shard_refs_files[-1].write refs_f.gets + j = index.pop + shard_in_files[-1].write in_lines[j] + shard_refs_files[-1].write refs_lines[j] leftover -= 1 end (shard_in_files + shard_refs_files).each do |f| f.close end in_f.close refs_f.close + return [in_fns, refs_fns] end -make_shards input, refs, num_shards +input_files = [] +refs_files = [] +if predefined_shards + input_files = File.new(input).readlines.map {|i| i.strip } + refs_files = File.new(refs).readlines.map {|i| i.strip } + num_shards = input_files.size +else + input_files, refs_files = make_shards input, refs, num_shards, 0, rand +end 0.upto(epochs-1) { |epoch| + puts "epoch #{epoch+1}" pids = [] input_weights = '' if epoch > 0 then input_weights = "--input_weights work/weights.#{epoch-1}" end weights_files = [] - 0.upto(num_shards-1) { |shard| - pids << Kernel.fork { - `#{dtrain_bin} -c #{ini}\ - --input work/shard.#{shard}.in\ - --refs work/shard.#{shard}.refs #{input_weights}\ - --output work/weights.#{shard}.#{epoch}\ - &> work/out.#{shard}.#{epoch}` + shard = 0 + remaining_shards = num_shards + while remaining_shards > 0 + shards_at_once.times { + qsub_str_start = qsub_str_end = '' + local_end = '' + if use_qsub + qsub_str_start = "qsub -cwd -sync y -b y -j y -o work/out.#{shard}.#{epoch} -N dtrain.#{shard}.#{epoch} \"" + qsub_str_end = "\"" + local_end = '' + else + local_end = "&>work/out.#{shard}.#{epoch}" + end + pids << Kernel.fork { + `#{qsub_str_start}#{dtrain_bin} -c #{ini}\ + --input #{input_files[shard]}\ + --refs #{refs_files[shard]} #{input_weights}\ + --output work/weights.#{shard}.#{epoch}#{qsub_str_end} #{local_end}` + } + weights_files << "work/weights.#{shard}.#{epoch}" + shard += 1 + remaining_shards -= 1 } - weights_files << "work/weights.#{shard}.#{epoch}" - } - pids.each { |pid| Process.wait(pid) } - cat = File.new('work/weights_cat', 'w+') - weights_files.each { |f| cat.write File.new(f, 'r').read } - cat.close - `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat &> work/weights.#{epoch}` + pids.each { |pid| Process.wait(pid) } + pids.clear + end + `#{cat} work/weights.*.#{epoch} > work/weights_cat` + `#{ruby} #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat > work/weights.#{epoch}` + if rand and epoch+1!=epochs + input_files, refs_files = make_shards input, refs, num_shards, epoch+1, rand + end } `rm work/weights_cat` -`#{gzip} work/*` |