summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpks <pks@users.noreply.github.com>2019-05-12 20:11:52 +0200
committerGitHub <noreply@github.com>2019-05-12 20:11:52 +0200
commit66b39771d9298c533f1c77fed3e45c12cc7c3421 (patch)
tree0218f41c350a626f5af9909d77406309fa873fdf
parentf64746ac87fc7338629b19de9fa2da0f03fa2790 (diff)
parent4a13b41700f34c15c30b551f98dbea9cb41f67c3 (diff)
Merge pull request #1 from pks/origin/netnet
makefile changes, downpour variants, current downpour version, feed s…
-rw-r--r--decoder/Makefile.am7
-rwxr-xr-xdecoder/feed.rb123
-rw-r--r--training/dtrain/Makefile.am5
-rwxr-xr-xtraining/dtrain/downpour.rb89
-rwxr-xr-xtraining/dtrain/downpour_delayed.rb163
-rwxr-xr-xtraining/dtrain/downpour_no_cma.rb163
-rw-r--r--training/dtrain/dtrain_net.h2
7 files changed, 523 insertions, 29 deletions
diff --git a/decoder/Makefile.am b/decoder/Makefile.am
index bcdc34ed..4c05b8f4 100644
--- a/decoder/Makefile.am
+++ b/decoder/Makefile.am
@@ -27,9 +27,12 @@ minimal_decoder_SOURCES = minimal_decoder.cc
minimal_decoder_LDADD = libcdec.a ../utils/libutils.a
network_decoder_SOURCES = network_decoder.cc nn.hpp
-network_decoder_LDADD = libcdec.a ../klm/search/libksearch.a ../mteval/libmteval.a ../utils/libutils.a ../klm/lm/libklm.a ../klm/util/libklm_util.a ../klm/util/double-conversion/libklm_util_double.a /srv/postedit/lib/nanomsg-0.5-beta/lib/libnanomsg.so
+network_decoder_LDADD = libcdec.a ../klm/search/libksearch.a ../mteval/libmteval.a ../utils/libutils.a ../klm/lm/libklm.a ../klm/util/libklm_util.a ../klm/util/double-conversion/libklm_util_double.a /usr/local/lib/libnanomsg.so
-AM_CPPFLAGS = -DTEST_DATA=\"$(top_srcdir)/decoder/test_data\" -DBOOST_TEST_DYN_LINK -W -Wno-sign-compare -I$(top_srcdir) -I$(top_srcdir)/mteval -I$(top_srcdir)/utils -I$(top_srcdir)/klm -I/srv/postedit/lib/nanomsg-0.5-beta/include -I/srv/postedit/lib/cppnanomsg
+local_SOURCES = local.cc nn.hpp
+local_LDADD = /usr/lib64/libnanomsg.so
+
+AM_CPPFLAGS = -DTEST_DATA=\"$(top_srcdir)/decoder/test_data\" -DBOOST_TEST_DYN_LINK -W -Wno-sign-compare -I$(top_srcdir) -I$(top_srcdir)/mteval -I$(top_srcdir)/utils -I$(top_srcdir)/klm -I /usr/include
rule_lexer.cc: rule_lexer.ll
$(LEX) -s -CF -8 -o$@ $<
diff --git a/decoder/feed.rb b/decoder/feed.rb
new file mode 100755
index 00000000..4ac20730
--- /dev/null
+++ b/decoder/feed.rb
@@ -0,0 +1,123 @@
+#!/usr/bin/env ruby
+
+require 'trollop'
+require 'zipf'
+require 'nanomsg'
+require 'tempfile'
+
+conf = Trollop::options do
+ opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
+ opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
+end
+
+dtrain_conf = conf[:conf]
+input = conf[:input]
+epochs = conf[:epochs]
+learning_rate = conf[:learning_rate]
+num_slaves = conf[:slaves]
+dtrain_dir = File.expand_path File.dirname(__FILE__)
+
+if not conf[:dtrain_binary]
+ dtrain_bin = "#{dtrain_dir}/dtrain_net"
+else
+ dtrain_bin = conf[:dtrain_binary]
+end
+
+socks = []
+slave_pids = []
+#port = 60666 # last port = port+slaves
+#master_ip = Socket.ip_address_list[0].ip_address
+
+`mkdir work`
+
+socks_files = []
+num_slaves.times { |i|
+ socks << NanoMsg::PairSocket.new
+ #addr = "tcp://#{master_ip}:#{port}"
+ socks_files << Tempfile.new('downpour')
+ url = "ipc://#{socks_files.last.path}"
+ socks.last.bind url
+ STDERR.write "listening on #{url}\n"
+ slave_pids << Kernel.fork {
+ `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
+ #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
+ }
+ #port += 1
+}
+
+threads = []
+socks.each_with_index { |n,i|
+ threads << Thread.new {
+ n.recv
+ STDERR.write "got hello from slave ##{i}\n"
+ }
+}
+threads.each { |thr| thr.join } # timeout?
+threads.clear
+
+inf = ReadFile.new input
+buf = []
+j = 0
+m = Mutex.new
+n = Mutex.new
+w = SparseVector.new
+ready = num_slaves.times.map { true }
+cma = 1
+epochs.times { |epoch|
+STDERR.write "---\nepoch #{epoch}\n"
+inf.rewind
+i = 0
+while true # round-robin
+ d = inf.gets
+ break if !d
+ d.strip!
+ while !ready[j]
+ j += 1
+ j = 0 if j==num_slaves
+ end
+ STDERR.write "sending source ##{i} to slave ##{j}\n"
+ socks[j].send d
+ n.synchronize {
+ ready[j] = false
+ }
+ threads << Thread.new {
+ me = j
+ moment = cma
+ update = SparseVector::from_kv socks[me].recv
+ STDERR.write "T update from slave ##{me}\n"
+ update *= learning_rate
+ update -= w
+ update /= moment
+ m.synchronize { w += update }
+ STDERR.write "T sending new weights to slave ##{me}\n"
+ socks[me].send w.to_kv
+ STDERR.write "T sent new weights to slave ##{me}\n"
+ n.synchronize {
+ ready[me] = true
+ }
+ }
+ sleep 1
+ i += 1
+ cma += 1
+ j += 1
+ j = 0 if j==num_slaves
+ threads.delete_if { |thr| !thr.status }
+end
+wf = WriteFile.new "weights.#{epoch}.gz"
+wf.write w.to_kv(" ", "\n")
+wf.close
+}
+
+threads.each { |thr| thr.join }
+
+socks.each { |n|
+ Thread.new {
+ n.send "shutdown"
+ }
+}
+
+slave_pids.each { |pid| Process.wait(pid) }
+
+socks_files.each { |f| f.unlink }
+
diff --git a/training/dtrain/Makefile.am b/training/dtrain/Makefile.am
index d22a8594..74c2a4b2 100644
--- a/training/dtrain/Makefile.am
+++ b/training/dtrain/Makefile.am
@@ -4,11 +4,10 @@ dtrain_SOURCES = dtrain.cc dtrain.h sample.h score.h update.h
dtrain_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a
dtrain_net_SOURCES = dtrain_net.cc dtrain_net.h dtrain.h sample.h score.h update.h
-dtrain_net_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a /srv/postedit/lib/nanomsg-0.5-beta/lib/libnanomsg.so
+dtrain_net_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a /usr/local/lib/libnanomsg.so
dtrain_net_interface_SOURCES = dtrain_net_interface.cc dtrain_net_interface.h dtrain.h sample_net_interface.h score_net_interface.h update.h
dtrain_net_interface_LDFLAGS = -rdynamic
dtrain_net_interface_LDADD = ../../decoder/libcdec.a ../../klm/search/libksearch.a ../../mteval/libmteval.a ../../utils/libutils.a ../../klm/lm/libklm.a ../../klm/util/libklm_util.a ../../klm/util/double-conversion/libklm_util_double.a /srv/postedit/lib/nanomsg-0.5-beta/lib/libnanomsg.so
-AM_CPPFLAGS = -W -Wall -Wno-sign-compare -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval -I/srv/postedit/lib/nanomsg-0.5-beta/include -I/srv/postedit/lib/cppnanomsg
-
+AM_CPPFLAGS = -W -Wall -Wno-sign-compare -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval -I/usr/local/include -I/srv/postedit/lib/cppnanomsg
diff --git a/training/dtrain/downpour.rb b/training/dtrain/downpour.rb
index d6af6707..f5477988 100755
--- a/training/dtrain/downpour.rb
+++ b/training/dtrain/downpour.rb
@@ -4,14 +4,24 @@ require 'trollop'
require 'zipf'
require 'socket'
require 'nanomsg'
+require 'tempfile'
+
+def l2_select v, k=10000
+ return if v.size<=k
+ min = v.values.map { |i| i.abs2 }.sort.reverse[k-1]
+ v.delete_if { |k,v| v.abs2 < min }
+end
conf = Trollop::options do
- opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
- opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
- opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
- opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
- opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
- opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
+ opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
+ opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
+ opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
+ opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :shuffle, "shuffle data before each epoch", :short => '-z'
+ opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f'
+ opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k'
end
dtrain_conf = conf[:conf]
@@ -19,6 +29,10 @@ input = conf[:input]
epochs = conf[:epochs]
learning_rate = conf[:learning_rate]
num_slaves = conf[:slaves]
+shuf = conf[:shuffle]
+freq = conf[:select_freq]
+k = conf[:select_k]
+select = k>0
dtrain_dir = File.expand_path File.dirname(__FILE__)
if not conf[:dtrain_binary]
@@ -28,22 +42,25 @@ else
end
socks = []
-port = 60666 # last port = port+slaves
slave_pids = []
-master_ip = Socket.ip_address_list[0].ip_address
+#port = 60666 # last port = port+slaves
+#master_ip = Socket.ip_address_list[0].ip_address
`mkdir work`
+socks_files = []
num_slaves.times { |i|
socks << NanoMsg::PairSocket.new
- addr = "tcp://#{master_ip}:#{port}"
- socks.last.bind addr
- STDERR.write "listening on #{addr}\n"
+ #addr = "tcp://#{master_ip}:#{port}"
+ socks_files << Tempfile.new('downpour')
+ url = "ipc://#{socks_files.last.path}"
+ socks.last.bind url
+ STDERR.write "listening on #{url}\n"
slave_pids << Kernel.fork {
- cmd = "#{dtrain_bin} -c #{dtrain_conf} -a #{addr} &>work/out.#{i}"
- `#{cmd}`
+ `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
+ #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
}
- port += 1
+ #port += 1
}
threads = []
@@ -56,7 +73,23 @@ socks.each_with_index { |n,i|
threads.each { |thr| thr.join } # timeout?
threads.clear
-inf = ReadFile.new input
+def shuffle_file fn_in, fn_out
+ a = ReadFile.readlines_strip fn_in
+ o = WriteFile.new fn_out
+ a.shuffle!
+ o.write a.join("\n")+"\n"
+ o.close
+
+ return fn_out
+end
+
+inf = nil
+if shuf
+ input = shuffle_file input, "work/input.0.gz"
+ inf = ReadFile.new input
+else
+ inf = ReadFile.new input
+end
buf = []
j = 0
m = Mutex.new
@@ -66,7 +99,13 @@ ready = num_slaves.times.map { true }
cma = 1
epochs.times { |epoch|
STDERR.write "---\nepoch #{epoch}\n"
-inf.rewind
+if shuf && epoch>0
+ inf.close
+ input = shuffle_file input, "work/input.#{epoch}.gz"
+ inf = ReadFile.new input
+else
+ inf.rewind
+end
i = 0
while true # round-robin
d = inf.gets
@@ -78,9 +117,7 @@ while true # round-robin
end
STDERR.write "sending source ##{i} to slave ##{j}\n"
socks[j].send d
- n.synchronize {
- ready[j] = false
- }
+ n.synchronize { ready[j] = false }
threads << Thread.new {
me = j
moment = cma
@@ -93,17 +130,23 @@ while true # round-robin
STDERR.write "T sending new weights to slave ##{me}\n"
socks[me].send w.to_kv
STDERR.write "T sent new weights to slave ##{me}\n"
- n.synchronize {
- ready[me] = true
- }
+ n.synchronize { ready[me] = true }
}
sleep 1
+ if select && i>0 && (i+1)%freq==0
+ before = w.size
+ m.synchronize { l2_select w, k }
+ STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n"
+ end
i += 1
cma += 1
j += 1
j = 0 if j==num_slaves
threads.delete_if { |thr| !thr.status }
end
+wf = WriteFile.new "weights.#{epoch}.gz"
+wf.write w.to_kv(" ", "\n")+"\n"
+wf.close
}
threads.each { |thr| thr.join }
@@ -116,5 +159,5 @@ socks.each { |n|
slave_pids.each { |pid| Process.wait(pid) }
-puts w.to_kv " ", "\n"
+socks_files.each { |f| f.unlink }
diff --git a/training/dtrain/downpour_delayed.rb b/training/dtrain/downpour_delayed.rb
new file mode 100755
index 00000000..f5477988
--- /dev/null
+++ b/training/dtrain/downpour_delayed.rb
@@ -0,0 +1,163 @@
+#!/usr/bin/env ruby
+
+require 'trollop'
+require 'zipf'
+require 'socket'
+require 'nanomsg'
+require 'tempfile'
+
+def l2_select v, k=10000
+ return if v.size<=k
+ min = v.values.map { |i| i.abs2 }.sort.reverse[k-1]
+ v.delete_if { |k,v| v.abs2 < min }
+end
+
+conf = Trollop::options do
+ opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
+ opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
+ opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
+ opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
+ opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :shuffle, "shuffle data before each epoch", :short => '-z'
+ opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f'
+ opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k'
+end
+
+dtrain_conf = conf[:conf]
+input = conf[:input]
+epochs = conf[:epochs]
+learning_rate = conf[:learning_rate]
+num_slaves = conf[:slaves]
+shuf = conf[:shuffle]
+freq = conf[:select_freq]
+k = conf[:select_k]
+select = k>0
+dtrain_dir = File.expand_path File.dirname(__FILE__)
+
+if not conf[:dtrain_binary]
+ dtrain_bin = "#{dtrain_dir}/dtrain_net"
+else
+ dtrain_bin = conf[:dtrain_binary]
+end
+
+socks = []
+slave_pids = []
+#port = 60666 # last port = port+slaves
+#master_ip = Socket.ip_address_list[0].ip_address
+
+`mkdir work`
+
+socks_files = []
+num_slaves.times { |i|
+ socks << NanoMsg::PairSocket.new
+ #addr = "tcp://#{master_ip}:#{port}"
+ socks_files << Tempfile.new('downpour')
+ url = "ipc://#{socks_files.last.path}"
+ socks.last.bind url
+ STDERR.write "listening on #{url}\n"
+ slave_pids << Kernel.fork {
+ `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
+ #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
+ }
+ #port += 1
+}
+
+threads = []
+socks.each_with_index { |n,i|
+ threads << Thread.new {
+ n.recv
+ STDERR.write "got hello from slave ##{i}\n"
+ }
+}
+threads.each { |thr| thr.join } # timeout?
+threads.clear
+
+def shuffle_file fn_in, fn_out
+ a = ReadFile.readlines_strip fn_in
+ o = WriteFile.new fn_out
+ a.shuffle!
+ o.write a.join("\n")+"\n"
+ o.close
+
+ return fn_out
+end
+
+inf = nil
+if shuf
+ input = shuffle_file input, "work/input.0.gz"
+ inf = ReadFile.new input
+else
+ inf = ReadFile.new input
+end
+buf = []
+j = 0
+m = Mutex.new
+n = Mutex.new
+w = SparseVector.new
+ready = num_slaves.times.map { true }
+cma = 1
+epochs.times { |epoch|
+STDERR.write "---\nepoch #{epoch}\n"
+if shuf && epoch>0
+ inf.close
+ input = shuffle_file input, "work/input.#{epoch}.gz"
+ inf = ReadFile.new input
+else
+ inf.rewind
+end
+i = 0
+while true # round-robin
+ d = inf.gets
+ break if !d
+ d.strip!
+ while !ready[j]
+ j += 1
+ j = 0 if j==num_slaves
+ end
+ STDERR.write "sending source ##{i} to slave ##{j}\n"
+ socks[j].send d
+ n.synchronize { ready[j] = false }
+ threads << Thread.new {
+ me = j
+ moment = cma
+ update = SparseVector::from_kv socks[me].recv
+ STDERR.write "T update from slave ##{me}\n"
+ update *= learning_rate
+ update -= w
+ update /= moment
+ m.synchronize { w += update }
+ STDERR.write "T sending new weights to slave ##{me}\n"
+ socks[me].send w.to_kv
+ STDERR.write "T sent new weights to slave ##{me}\n"
+ n.synchronize { ready[me] = true }
+ }
+ sleep 1
+ if select && i>0 && (i+1)%freq==0
+ before = w.size
+ m.synchronize { l2_select w, k }
+ STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n"
+ end
+ i += 1
+ cma += 1
+ j += 1
+ j = 0 if j==num_slaves
+ threads.delete_if { |thr| !thr.status }
+end
+wf = WriteFile.new "weights.#{epoch}.gz"
+wf.write w.to_kv(" ", "\n")+"\n"
+wf.close
+}
+
+threads.each { |thr| thr.join }
+
+socks.each { |n|
+ Thread.new {
+ n.send "shutdown"
+ }
+}
+
+slave_pids.each { |pid| Process.wait(pid) }
+
+socks_files.each { |f| f.unlink }
+
diff --git a/training/dtrain/downpour_no_cma.rb b/training/dtrain/downpour_no_cma.rb
new file mode 100755
index 00000000..6638cdfe
--- /dev/null
+++ b/training/dtrain/downpour_no_cma.rb
@@ -0,0 +1,163 @@
+#!/usr/bin/env ruby
+
+require 'trollop'
+require 'zipf'
+require 'socket'
+require 'nanomsg'
+require 'tempfile'
+
+def l2_select v, k=10000
+ return if v.size<=k
+ min = v.values.map { |i| i.abs2 }.sort.reverse[k-1]
+ v.delete_if { |k,v| v.abs2 < min }
+end
+
+conf = Trollop::options do
+ opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
+ opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
+ opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
+ opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
+ opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :shuffle, "shuffle data before each epoch", :short => '-z'
+ opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f'
+ opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k'
+end
+
+dtrain_conf = conf[:conf]
+input = conf[:input]
+epochs = conf[:epochs]
+learning_rate = conf[:learning_rate]
+num_slaves = conf[:slaves]
+shuf = conf[:shuffle]
+freq = conf[:select_freq]
+k = conf[:select_k]
+select = k>0
+dtrain_dir = File.expand_path File.dirname(__FILE__)
+
+if not conf[:dtrain_binary]
+ dtrain_bin = "#{dtrain_dir}/dtrain_net"
+else
+ dtrain_bin = conf[:dtrain_binary]
+end
+
+socks = []
+slave_pids = []
+#port = 60666 # last port = port+slaves
+#master_ip = Socket.ip_address_list[0].ip_address
+
+`mkdir work`
+
+socks_files = []
+num_slaves.times { |i|
+ socks << NanoMsg::PairSocket.new
+ #addr = "tcp://#{master_ip}:#{port}"
+ socks_files << Tempfile.new('downpour')
+ url = "ipc://#{socks_files.last.path}"
+ socks.last.bind url
+ STDERR.write "listening on #{url}\n"
+ slave_pids << Kernel.fork {
+ `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
+ #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
+ }
+ #port += 1
+}
+
+threads = []
+socks.each_with_index { |n,i|
+ threads << Thread.new {
+ n.recv
+ STDERR.write "got hello from slave ##{i}\n"
+ }
+}
+threads.each { |thr| thr.join } # timeout?
+threads.clear
+
+def shuffle_file fn_in, fn_out
+ a = ReadFile.readlines_strip fn_in
+ o = WriteFile.new fn_out
+ a.shuffle!
+ o.write a.join("\n")+"\n"
+ o.close
+
+ return fn_out
+end
+
+inf = nil
+if shuf
+ input = shuffle_file input, "work/input.0.gz"
+ inf = ReadFile.new input
+else
+ inf = ReadFile.new input
+end
+buf = []
+j = 0
+m = Mutex.new
+n = Mutex.new
+w = SparseVector.new
+ready = num_slaves.times.map { true }
+cma = 1
+epochs.times { |epoch|
+STDERR.write "---\nepoch #{epoch}\n"
+if shuf && epoch>0
+ inf.close
+ input = shuffle_file input, "work/input.#{epoch}.gz"
+ inf = ReadFile.new input
+else
+ inf.rewind
+end
+i = 0
+while true # round-robin
+ d = inf.gets
+ break if !d
+ d.strip!
+ while !ready[j]
+ j += 1
+ j = 0 if j==num_slaves
+ end
+ STDERR.write "sending source ##{i} to slave ##{j}\n"
+ socks[j].send d
+ n.synchronize { ready[j] = false }
+ threads << Thread.new {
+ me = j
+ moment = cma
+ update = SparseVector::from_kv socks[me].recv
+ STDERR.write "T update from slave ##{me}\n"
+ update *= learning_rate
+ #update -= w
+ #update /= moment
+ m.synchronize { w += update }
+ STDERR.write "T sending new weights to slave ##{me}\n"
+ socks[me].send w.to_kv
+ STDERR.write "T sent new weights to slave ##{me}\n"
+ n.synchronize { ready[me] = true }
+ }
+ sleep 1
+ if select && i>0 && (i+1)%freq==0
+ before = w.size
+ m.synchronize { l2_select w, k }
+ STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n"
+ end
+ i += 1
+ cma += 1
+ j += 1
+ j = 0 if j==num_slaves
+ threads.delete_if { |thr| !thr.status }
+end
+wf = WriteFile.new "weights.#{epoch}.gz"
+wf.write w.to_kv(" ", "\n")+"\n"
+wf.close
+}
+
+threads.each { |thr| thr.join }
+
+socks.each { |n|
+ Thread.new {
+ n.send "shutdown"
+ }
+}
+
+slave_pids.each { |pid| Process.wait(pid) }
+
+socks_files.each { |f| f.unlink }
+
diff --git a/training/dtrain/dtrain_net.h b/training/dtrain/dtrain_net.h
index cbf171d6..e0d33d64 100644
--- a/training/dtrain/dtrain_net.h
+++ b/training/dtrain/dtrain_net.h
@@ -24,7 +24,7 @@ inline void
updateVectorFromString(string& s, SparseVector<T>& v)
{
string buf;
- istringstream ss;
+ istringstream ss(s);
while (ss >> buf) {
size_t p = buf.find_last_of("=");
istringstream c(buf.substr(p+1,buf.size()));