From 17b7d11c8801e86868e61128b4f939fded1410e5 Mon Sep 17 00:00:00 2001 From: Patrick Simianer
Date: Fri, 8 Apr 2016 23:15:32 +0200 Subject: dtrain: parallelize.rb -- always gzip, adadelta support --- training/dtrain/parallelize.rb | 76 +++++++++++++++++++++++++++++------------- 1 file changed, 53 insertions(+), 23 deletions(-) diff --git a/training/dtrain/parallelize.rb b/training/dtrain/parallelize.rb index a1c2108f..3159a888 100755 --- a/training/dtrain/parallelize.rb +++ b/training/dtrain/parallelize.rb @@ -4,19 +4,20 @@ require 'trollop' require 'zipf' conf = Trollop::options do - opt :conf, "dtrain configuration", :type => :string, :short => '-c' - opt :input, "input as bitext (f ||| e)", :type => :string, :short => '-i' - opt :epochs, "number of epochs", :type => :int, :default => 10 - opt :randomize, "randomize shards once", :type => :bool, :default => false, :short => '-z' - opt :reshard, "randomize after each epoch", :type => :bool, :default => false, :short => '-y' - opt :shards, "number of shards", :type => :int, :short => '-s' - opt :weights, "input weights for first epoch", :type => :string, :default => '', :short => '-w' + opt :conf, "dtrain configuration", :type => :string, :short => '-c' + opt :input, "input as bitext (f ||| e)", :type => :string, :short => '-i' + opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e' + opt :randomize, "randomize shards once", :type => :bool, :default => false, :short => '-z' + opt :reshard, "randomize after each epoch", :type => :bool, :default => false, :short => '-y' + opt :shards, "number of shards", :type => :int, :short => '-s' + opt :weights, "input weights for first epoch", :type => :string, :default => '', :short => '-w' opt :lplp_args, "arguments for lplp.rb", :type => :string, :default => "l2 select_k 100000", :short => '-l' - opt :per_shard_decoder_configs, "give custom decoder config per shard", :type => :string, :short => '-o' - opt :processes_at_once, "jobs to run at oce", :type => :int, :default => 9999, :short => '-p' - opt :qsub, "use qsub", :type => :bool, :default => false, :short => '-q' - opt :qsub_args, "extra args for qsub", :type => :string, :default => "h_vmem=5G", :short => 'r' - opt :dtrain_binary, "path to dtrain binary", :type => :string, :short => '-d' + opt :per_shard_decoder_configs, "give custom decoder config per shard", :type => :string, :short => '-o' + opt :processes_at_once, "jobs to run at oce", :type => :int, :default => 9999, :short => '-p' + opt :qsub, "use qsub", :type => :bool, :default => false, :short => '-q' + opt :qsub_args, "extra args for qsub", :type => :string, :default => "h_vmem=5G", :short => '-r' + opt :dtrain_binary, "path to dtrain binary", :type => :string, :short => '-d' + opt :adadelta, "use adadelta", :type => :bool, :default => false, :short => '-D' end dtrain_dir = File.expand_path File.dirname(__FILE__) @@ -45,6 +46,7 @@ input = conf[:input] use_qsub = conf[:qsub] shards_at_once = conf[:processes_at_once] first_input_weights = conf[:weights] +use_adadelta = conf[:adadelta] `mkdir work` @@ -64,8 +66,8 @@ def make_shards input, num_shards, epoch, rand 0.upto(num_shards-1) { |shard| break if index.size==0 real_num_shards += 1 - in_fn = "work/shard.#{shard}.#{epoch}" - shard_in = File.new in_fn, 'w+' + in_fn = "work/shard.#{shard}.#{epoch}.gz" + shard_in = WriteFile.new in_fn in_fns << in_fn 0.upto(shard_sz-1) { |i| j = index.pop @@ -101,8 +103,7 @@ end puts "epoch #{epoch+1}" pids = [] input_weights = '' - input_weights = "--input_weights work/weights.#{epoch-1}" if epoch>0 - weights_files = [] + input_weights = "--input_weights work/weights.#{epoch-1}.gz" if epoch>0 shard = 0 remaining_shards = num_shards while remaining_shards > 0 @@ -123,29 +124,58 @@ end else cdec_conf = "" end + adadelta_input = "" + adadelta_output = "" + if use_adadelta + adadelta_output = "--adadelta_output work/adadelta.#{shard}.#{epoch}" + if epoch > 0 + adadelta_input = "--adadelta_input work/adadelta.#{epoch-1}" + end + end if first_input_weights != '' && epoch == 0 input_weights = "--input_weights #{first_input_weights}" end pids << Kernel.fork { `#{qsub_str_start}#{dtrain_bin} -c #{dtrain_conf} #{cdec_conf}\ #{input_weights}\ + #{adadelta_output} #{adadelta_input}\ --bitext #{input_files[shard]}\ - --output work/weights.#{shard}.#{epoch}#{qsub_str_end} #{local_end}` + --output work/weights.#{shard}.#{epoch}.gz#{qsub_str_end} #{local_end}` } - weights_files << "work/weights.#{shard}.#{epoch}" shard += 1 remaining_shards -= 1 } pids.each { |pid| Process.wait(pid) } pids.clear end - `cat work/weights.*.#{epoch} > work/weights_cat` - `ruby #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat\ - > work/weights.#{epoch}` + `zcat work/weights.*.#{epoch}.gz \ + | ruby #{lplp_rb} #{lplp_args} #{num_shards} \ + | gzip -c \ + > work/weights.#{epoch}.gz` + if use_adadelta + h = {} + ReadFile.readlines_strip("work/weights.#{epoch}.gz").map { |line| + h[line.split.first] = true + } + max = (2**(0.size * 8 -2) -1) + ["gradient", "update"].each { |i| + `zcat work/adadelta.*.#{epoch}.#{i}.gz \ + | ruby #{lplp_rb} l0 select_k #{max} #{num_shards} \ + | gzip -c \ + > work/adadelta_avg.#{i}.gz` + o = WriteFile.new "work/adadelta.#{epoch}.#{i}.gz" + ReadFile.readlines_strip("work/adadelta_avg.#{i}.gz").each { |line| + k,v = line.split + if h.has_key? k + o.write "#{k} #{v}\n" + end + } + `rm work/adadelta_avg.#{i}.gz` + o.close + } + end if rand and reshard and epoch+1!=epochs input_files, num_shards = make_shards input, num_shards, epoch+1, rand end } -`rm work/weights_cat` - -- cgit v1.2.3