summaryrefslogtreecommitdiff
path: root/training/dtrain/parallelize.rb
diff options
context:
space:
mode:
authorPatrick Simianer <p@simianer.de>2016-04-08 23:15:32 +0200
committerPatrick Simianer <p@simianer.de>2016-04-08 23:15:32 +0200
commit17b7d11c8801e86868e61128b4f939fded1410e5 (patch)
tree63fb80dcdf0443573db922195832508ee6a6eabc /training/dtrain/parallelize.rb
parent7e583a0880347caf4e9ec84c2c801d1b280cffdd (diff)
dtrain: parallelize.rb -- always gzip, adadelta support
Diffstat (limited to 'training/dtrain/parallelize.rb')
-rwxr-xr-xtraining/dtrain/parallelize.rb76
1 files changed, 53 insertions, 23 deletions
diff --git a/training/dtrain/parallelize.rb b/training/dtrain/parallelize.rb
index a1c2108f..3159a888 100755
--- a/training/dtrain/parallelize.rb
+++ b/training/dtrain/parallelize.rb
@@ -4,19 +4,20 @@ require 'trollop'
require 'zipf'
conf = Trollop::options do
- opt :conf, "dtrain configuration", :type => :string, :short => '-c'
- opt :input, "input as bitext (f ||| e)", :type => :string, :short => '-i'
- opt :epochs, "number of epochs", :type => :int, :default => 10
- opt :randomize, "randomize shards once", :type => :bool, :default => false, :short => '-z'
- opt :reshard, "randomize after each epoch", :type => :bool, :default => false, :short => '-y'
- opt :shards, "number of shards", :type => :int, :short => '-s'
- opt :weights, "input weights for first epoch", :type => :string, :default => '', :short => '-w'
+ opt :conf, "dtrain configuration", :type => :string, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :short => '-i'
+ opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
+ opt :randomize, "randomize shards once", :type => :bool, :default => false, :short => '-z'
+ opt :reshard, "randomize after each epoch", :type => :bool, :default => false, :short => '-y'
+ opt :shards, "number of shards", :type => :int, :short => '-s'
+ opt :weights, "input weights for first epoch", :type => :string, :default => '', :short => '-w'
opt :lplp_args, "arguments for lplp.rb", :type => :string, :default => "l2 select_k 100000", :short => '-l'
- opt :per_shard_decoder_configs, "give custom decoder config per shard", :type => :string, :short => '-o'
- opt :processes_at_once, "jobs to run at oce", :type => :int, :default => 9999, :short => '-p'
- opt :qsub, "use qsub", :type => :bool, :default => false, :short => '-q'
- opt :qsub_args, "extra args for qsub", :type => :string, :default => "h_vmem=5G", :short => 'r'
- opt :dtrain_binary, "path to dtrain binary", :type => :string, :short => '-d'
+ opt :per_shard_decoder_configs, "give custom decoder config per shard", :type => :string, :short => '-o'
+ opt :processes_at_once, "jobs to run at oce", :type => :int, :default => 9999, :short => '-p'
+ opt :qsub, "use qsub", :type => :bool, :default => false, :short => '-q'
+ opt :qsub_args, "extra args for qsub", :type => :string, :default => "h_vmem=5G", :short => '-r'
+ opt :dtrain_binary, "path to dtrain binary", :type => :string, :short => '-d'
+ opt :adadelta, "use adadelta", :type => :bool, :default => false, :short => '-D'
end
dtrain_dir = File.expand_path File.dirname(__FILE__)
@@ -45,6 +46,7 @@ input = conf[:input]
use_qsub = conf[:qsub]
shards_at_once = conf[:processes_at_once]
first_input_weights = conf[:weights]
+use_adadelta = conf[:adadelta]
`mkdir work`
@@ -64,8 +66,8 @@ def make_shards input, num_shards, epoch, rand
0.upto(num_shards-1) { |shard|
break if index.size==0
real_num_shards += 1
- in_fn = "work/shard.#{shard}.#{epoch}"
- shard_in = File.new in_fn, 'w+'
+ in_fn = "work/shard.#{shard}.#{epoch}.gz"
+ shard_in = WriteFile.new in_fn
in_fns << in_fn
0.upto(shard_sz-1) { |i|
j = index.pop
@@ -101,8 +103,7 @@ end
puts "epoch #{epoch+1}"
pids = []
input_weights = ''
- input_weights = "--input_weights work/weights.#{epoch-1}" if epoch>0
- weights_files = []
+ input_weights = "--input_weights work/weights.#{epoch-1}.gz" if epoch>0
shard = 0
remaining_shards = num_shards
while remaining_shards > 0
@@ -123,29 +124,58 @@ end
else
cdec_conf = ""
end
+ adadelta_input = ""
+ adadelta_output = ""
+ if use_adadelta
+ adadelta_output = "--adadelta_output work/adadelta.#{shard}.#{epoch}"
+ if epoch > 0
+ adadelta_input = "--adadelta_input work/adadelta.#{epoch-1}"
+ end
+ end
if first_input_weights != '' && epoch == 0
input_weights = "--input_weights #{first_input_weights}"
end
pids << Kernel.fork {
`#{qsub_str_start}#{dtrain_bin} -c #{dtrain_conf} #{cdec_conf}\
#{input_weights}\
+ #{adadelta_output} #{adadelta_input}\
--bitext #{input_files[shard]}\
- --output work/weights.#{shard}.#{epoch}#{qsub_str_end} #{local_end}`
+ --output work/weights.#{shard}.#{epoch}.gz#{qsub_str_end} #{local_end}`
}
- weights_files << "work/weights.#{shard}.#{epoch}"
shard += 1
remaining_shards -= 1
}
pids.each { |pid| Process.wait(pid) }
pids.clear
end
- `cat work/weights.*.#{epoch} > work/weights_cat`
- `ruby #{lplp_rb} #{lplp_args} #{num_shards} < work/weights_cat\
- > work/weights.#{epoch}`
+ `zcat work/weights.*.#{epoch}.gz \
+ | ruby #{lplp_rb} #{lplp_args} #{num_shards} \
+ | gzip -c \
+ > work/weights.#{epoch}.gz`
+ if use_adadelta
+ h = {}
+ ReadFile.readlines_strip("work/weights.#{epoch}.gz").map { |line|
+ h[line.split.first] = true
+ }
+ max = (2**(0.size * 8 -2) -1)
+ ["gradient", "update"].each { |i|
+ `zcat work/adadelta.*.#{epoch}.#{i}.gz \
+ | ruby #{lplp_rb} l0 select_k #{max} #{num_shards} \
+ | gzip -c \
+ > work/adadelta_avg.#{i}.gz`
+ o = WriteFile.new "work/adadelta.#{epoch}.#{i}.gz"
+ ReadFile.readlines_strip("work/adadelta_avg.#{i}.gz").each { |line|
+ k,v = line.split
+ if h.has_key? k
+ o.write "#{k} #{v}\n"
+ end
+ }
+ `rm work/adadelta_avg.#{i}.gz`
+ o.close
+ }
+ end
if rand and reshard and epoch+1!=epochs
input_files, num_shards = make_shards input, num_shards, epoch+1, rand
end
}
-`rm work/weights_cat`
-