diff options
Diffstat (limited to 'training/dtrain')
-rw-r--r-- | training/dtrain/Makefile.am | 4 | ||||
-rwxr-xr-x | training/dtrain/downpour.rb | 89 | ||||
-rwxr-xr-x | training/dtrain/downpour_delayed.rb | 163 | ||||
-rwxr-xr-x | training/dtrain/downpour_no_cma.rb | 163 | ||||
-rw-r--r-- | training/dtrain/dtrain_net.h | 2 |
5 files changed, 395 insertions, 26 deletions
diff --git a/training/dtrain/Makefile.am b/training/dtrain/Makefile.am index 590218be..82aac988 100644 --- a/training/dtrain/Makefile.am +++ b/training/dtrain/Makefile.am @@ -4,7 +4,7 @@ dtrain_SOURCES = dtrain.cc dtrain.h sample.h score.h update.h dtrain_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a dtrain_net_SOURCES = dtrain_net.cc dtrain_net.h dtrain.h sample.h score.h update.h -dtrain_net_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a /usr/lib64/libnanomsg.so +dtrain_net_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a /usr/local/lib/libnanomsg.so -AM_CPPFLAGS = -W -Wall -Wno-sign-compare -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval +AM_CPPFLAGS = -W -Wall -Wno-sign-compare -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval -I /usr/local/include diff --git a/training/dtrain/downpour.rb b/training/dtrain/downpour.rb index d6af6707..f5477988 100755 --- a/training/dtrain/downpour.rb +++ b/training/dtrain/downpour.rb @@ -4,14 +4,24 @@ require 'trollop' require 'zipf' require 'socket' require 'nanomsg' +require 'tempfile' + +def l2_select v, k=10000 + return if v.size<=k + min = v.values.map { |i| i.abs2 }.sort.reverse[k-1] + v.delete_if { |k,v| v.abs2 < min } +end conf = Trollop::options do - opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c' - opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i' - opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e' - opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l' - opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p' - opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d' + opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c' + opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i' + opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e' + opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l' + opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p' + opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d' + opt :shuffle, "shuffle data before each epoch", :short => '-z' + opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f' + opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k' end dtrain_conf = conf[:conf] @@ -19,6 +29,10 @@ input = conf[:input] epochs = conf[:epochs] learning_rate = conf[:learning_rate] num_slaves = conf[:slaves] +shuf = conf[:shuffle] +freq = conf[:select_freq] +k = conf[:select_k] +select = k>0 dtrain_dir = File.expand_path File.dirname(__FILE__) if not conf[:dtrain_binary] @@ -28,22 +42,25 @@ else end socks = [] -port = 60666 # last port = port+slaves slave_pids = [] -master_ip = Socket.ip_address_list[0].ip_address +#port = 60666 # last port = port+slaves +#master_ip = Socket.ip_address_list[0].ip_address `mkdir work` +socks_files = [] num_slaves.times { |i| socks << NanoMsg::PairSocket.new - addr = "tcp://#{master_ip}:#{port}" - socks.last.bind addr - STDERR.write "listening on #{addr}\n" + #addr = "tcp://#{master_ip}:#{port}" + socks_files << Tempfile.new('downpour') + url = "ipc://#{socks_files.last.path}" + socks.last.bind url + STDERR.write "listening on #{url}\n" slave_pids << Kernel.fork { - cmd = "#{dtrain_bin} -c #{dtrain_conf} -a #{addr} &>work/out.#{i}" - `#{cmd}` + `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \ + #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}` } - port += 1 + #port += 1 } threads = [] @@ -56,7 +73,23 @@ socks.each_with_index { |n,i| threads.each { |thr| thr.join } # timeout? threads.clear -inf = ReadFile.new input +def shuffle_file fn_in, fn_out + a = ReadFile.readlines_strip fn_in + o = WriteFile.new fn_out + a.shuffle! + o.write a.join("\n")+"\n" + o.close + + return fn_out +end + +inf = nil +if shuf + input = shuffle_file input, "work/input.0.gz" + inf = ReadFile.new input +else + inf = ReadFile.new input +end buf = [] j = 0 m = Mutex.new @@ -66,7 +99,13 @@ ready = num_slaves.times.map { true } cma = 1 epochs.times { |epoch| STDERR.write "---\nepoch #{epoch}\n" -inf.rewind +if shuf && epoch>0 + inf.close + input = shuffle_file input, "work/input.#{epoch}.gz" + inf = ReadFile.new input +else + inf.rewind +end i = 0 while true # round-robin d = inf.gets @@ -78,9 +117,7 @@ while true # round-robin end STDERR.write "sending source ##{i} to slave ##{j}\n" socks[j].send d - n.synchronize { - ready[j] = false - } + n.synchronize { ready[j] = false } threads << Thread.new { me = j moment = cma @@ -93,17 +130,23 @@ while true # round-robin STDERR.write "T sending new weights to slave ##{me}\n" socks[me].send w.to_kv STDERR.write "T sent new weights to slave ##{me}\n" - n.synchronize { - ready[me] = true - } + n.synchronize { ready[me] = true } } sleep 1 + if select && i>0 && (i+1)%freq==0 + before = w.size + m.synchronize { l2_select w, k } + STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n" + end i += 1 cma += 1 j += 1 j = 0 if j==num_slaves threads.delete_if { |thr| !thr.status } end +wf = WriteFile.new "weights.#{epoch}.gz" +wf.write w.to_kv(" ", "\n")+"\n" +wf.close } threads.each { |thr| thr.join } @@ -116,5 +159,5 @@ socks.each { |n| slave_pids.each { |pid| Process.wait(pid) } -puts w.to_kv " ", "\n" +socks_files.each { |f| f.unlink } diff --git a/training/dtrain/downpour_delayed.rb b/training/dtrain/downpour_delayed.rb new file mode 100755 index 00000000..f5477988 --- /dev/null +++ b/training/dtrain/downpour_delayed.rb @@ -0,0 +1,163 @@ +#!/usr/bin/env ruby + +require 'trollop' +require 'zipf' +require 'socket' +require 'nanomsg' +require 'tempfile' + +def l2_select v, k=10000 + return if v.size<=k + min = v.values.map { |i| i.abs2 }.sort.reverse[k-1] + v.delete_if { |k,v| v.abs2 < min } +end + +conf = Trollop::options do + opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c' + opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i' + opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e' + opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l' + opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p' + opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d' + opt :shuffle, "shuffle data before each epoch", :short => '-z' + opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f' + opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k' +end + +dtrain_conf = conf[:conf] +input = conf[:input] +epochs = conf[:epochs] +learning_rate = conf[:learning_rate] +num_slaves = conf[:slaves] +shuf = conf[:shuffle] +freq = conf[:select_freq] +k = conf[:select_k] +select = k>0 +dtrain_dir = File.expand_path File.dirname(__FILE__) + +if not conf[:dtrain_binary] + dtrain_bin = "#{dtrain_dir}/dtrain_net" +else + dtrain_bin = conf[:dtrain_binary] +end + +socks = [] +slave_pids = [] +#port = 60666 # last port = port+slaves +#master_ip = Socket.ip_address_list[0].ip_address + +`mkdir work` + +socks_files = [] +num_slaves.times { |i| + socks << NanoMsg::PairSocket.new + #addr = "tcp://#{master_ip}:#{port}" + socks_files << Tempfile.new('downpour') + url = "ipc://#{socks_files.last.path}" + socks.last.bind url + STDERR.write "listening on #{url}\n" + slave_pids << Kernel.fork { + `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \ + #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}` + } + #port += 1 +} + +threads = [] +socks.each_with_index { |n,i| + threads << Thread.new { + n.recv + STDERR.write "got hello from slave ##{i}\n" + } +} +threads.each { |thr| thr.join } # timeout? +threads.clear + +def shuffle_file fn_in, fn_out + a = ReadFile.readlines_strip fn_in + o = WriteFile.new fn_out + a.shuffle! + o.write a.join("\n")+"\n" + o.close + + return fn_out +end + +inf = nil +if shuf + input = shuffle_file input, "work/input.0.gz" + inf = ReadFile.new input +else + inf = ReadFile.new input +end +buf = [] +j = 0 +m = Mutex.new +n = Mutex.new +w = SparseVector.new +ready = num_slaves.times.map { true } +cma = 1 +epochs.times { |epoch| +STDERR.write "---\nepoch #{epoch}\n" +if shuf && epoch>0 + inf.close + input = shuffle_file input, "work/input.#{epoch}.gz" + inf = ReadFile.new input +else + inf.rewind +end +i = 0 +while true # round-robin + d = inf.gets + break if !d + d.strip! + while !ready[j] + j += 1 + j = 0 if j==num_slaves + end + STDERR.write "sending source ##{i} to slave ##{j}\n" + socks[j].send d + n.synchronize { ready[j] = false } + threads << Thread.new { + me = j + moment = cma + update = SparseVector::from_kv socks[me].recv + STDERR.write "T update from slave ##{me}\n" + update *= learning_rate + update -= w + update /= moment + m.synchronize { w += update } + STDERR.write "T sending new weights to slave ##{me}\n" + socks[me].send w.to_kv + STDERR.write "T sent new weights to slave ##{me}\n" + n.synchronize { ready[me] = true } + } + sleep 1 + if select && i>0 && (i+1)%freq==0 + before = w.size + m.synchronize { l2_select w, k } + STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n" + end + i += 1 + cma += 1 + j += 1 + j = 0 if j==num_slaves + threads.delete_if { |thr| !thr.status } +end +wf = WriteFile.new "weights.#{epoch}.gz" +wf.write w.to_kv(" ", "\n")+"\n" +wf.close +} + +threads.each { |thr| thr.join } + +socks.each { |n| + Thread.new { + n.send "shutdown" + } +} + +slave_pids.each { |pid| Process.wait(pid) } + +socks_files.each { |f| f.unlink } + diff --git a/training/dtrain/downpour_no_cma.rb b/training/dtrain/downpour_no_cma.rb new file mode 100755 index 00000000..6638cdfe --- /dev/null +++ b/training/dtrain/downpour_no_cma.rb @@ -0,0 +1,163 @@ +#!/usr/bin/env ruby + +require 'trollop' +require 'zipf' +require 'socket' +require 'nanomsg' +require 'tempfile' + +def l2_select v, k=10000 + return if v.size<=k + min = v.values.map { |i| i.abs2 }.sort.reverse[k-1] + v.delete_if { |k,v| v.abs2 < min } +end + +conf = Trollop::options do + opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c' + opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i' + opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e' + opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l' + opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p' + opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d' + opt :shuffle, "shuffle data before each epoch", :short => '-z' + opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f' + opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k' +end + +dtrain_conf = conf[:conf] +input = conf[:input] +epochs = conf[:epochs] +learning_rate = conf[:learning_rate] +num_slaves = conf[:slaves] +shuf = conf[:shuffle] +freq = conf[:select_freq] +k = conf[:select_k] +select = k>0 +dtrain_dir = File.expand_path File.dirname(__FILE__) + +if not conf[:dtrain_binary] + dtrain_bin = "#{dtrain_dir}/dtrain_net" +else + dtrain_bin = conf[:dtrain_binary] +end + +socks = [] +slave_pids = [] +#port = 60666 # last port = port+slaves +#master_ip = Socket.ip_address_list[0].ip_address + +`mkdir work` + +socks_files = [] +num_slaves.times { |i| + socks << NanoMsg::PairSocket.new + #addr = "tcp://#{master_ip}:#{port}" + socks_files << Tempfile.new('downpour') + url = "ipc://#{socks_files.last.path}" + socks.last.bind url + STDERR.write "listening on #{url}\n" + slave_pids << Kernel.fork { + `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \ + #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}` + } + #port += 1 +} + +threads = [] +socks.each_with_index { |n,i| + threads << Thread.new { + n.recv + STDERR.write "got hello from slave ##{i}\n" + } +} +threads.each { |thr| thr.join } # timeout? +threads.clear + +def shuffle_file fn_in, fn_out + a = ReadFile.readlines_strip fn_in + o = WriteFile.new fn_out + a.shuffle! + o.write a.join("\n")+"\n" + o.close + + return fn_out +end + +inf = nil +if shuf + input = shuffle_file input, "work/input.0.gz" + inf = ReadFile.new input +else + inf = ReadFile.new input +end +buf = [] +j = 0 +m = Mutex.new +n = Mutex.new +w = SparseVector.new +ready = num_slaves.times.map { true } +cma = 1 +epochs.times { |epoch| +STDERR.write "---\nepoch #{epoch}\n" +if shuf && epoch>0 + inf.close + input = shuffle_file input, "work/input.#{epoch}.gz" + inf = ReadFile.new input +else + inf.rewind +end +i = 0 +while true # round-robin + d = inf.gets + break if !d + d.strip! + while !ready[j] + j += 1 + j = 0 if j==num_slaves + end + STDERR.write "sending source ##{i} to slave ##{j}\n" + socks[j].send d + n.synchronize { ready[j] = false } + threads << Thread.new { + me = j + moment = cma + update = SparseVector::from_kv socks[me].recv + STDERR.write "T update from slave ##{me}\n" + update *= learning_rate + #update -= w + #update /= moment + m.synchronize { w += update } + STDERR.write "T sending new weights to slave ##{me}\n" + socks[me].send w.to_kv + STDERR.write "T sent new weights to slave ##{me}\n" + n.synchronize { ready[me] = true } + } + sleep 1 + if select && i>0 && (i+1)%freq==0 + before = w.size + m.synchronize { l2_select w, k } + STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n" + end + i += 1 + cma += 1 + j += 1 + j = 0 if j==num_slaves + threads.delete_if { |thr| !thr.status } +end +wf = WriteFile.new "weights.#{epoch}.gz" +wf.write w.to_kv(" ", "\n")+"\n" +wf.close +} + +threads.each { |thr| thr.join } + +socks.each { |n| + Thread.new { + n.send "shutdown" + } +} + +slave_pids.each { |pid| Process.wait(pid) } + +socks_files.each { |f| f.unlink } + diff --git a/training/dtrain/dtrain_net.h b/training/dtrain/dtrain_net.h index ecacf3ee..24f95500 100644 --- a/training/dtrain/dtrain_net.h +++ b/training/dtrain/dtrain_net.h @@ -24,7 +24,7 @@ inline void updateVectorFromString(string& s, SparseVector<T>& v) { string buf; - istringstream ss; + istringstream ss(s); while (ss >> buf) { size_t p = buf.find_last_of("="); istringstream c(buf.substr(p+1,buf.size())); |