summaryrefslogtreecommitdiff
path: root/training/dtrain/downpour.rb
diff options
context:
space:
mode:
Diffstat (limited to 'training/dtrain/downpour.rb')
-rwxr-xr-xtraining/dtrain/downpour.rb89
1 files changed, 66 insertions, 23 deletions
diff --git a/training/dtrain/downpour.rb b/training/dtrain/downpour.rb
index d6af6707..f5477988 100755
--- a/training/dtrain/downpour.rb
+++ b/training/dtrain/downpour.rb
@@ -4,14 +4,24 @@ require 'trollop'
require 'zipf'
require 'socket'
require 'nanomsg'
+require 'tempfile'
+
+def l2_select v, k=10000
+ return if v.size<=k
+ min = v.values.map { |i| i.abs2 }.sort.reverse[k-1]
+ v.delete_if { |k,v| v.abs2 < min }
+end
conf = Trollop::options do
- opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
- opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
- opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
- opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
- opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
- opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
+ opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
+ opt :epochs, "number of epochs", :type => :int, :default => 10, :short => '-e'
+ opt :learning_rate, "learning rate", :type => :float, :default => 1.0, :short => '-l'
+ opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
+ opt :dtrain_binary, "path to dtrain_net binary", :type => :string, :short => '-d'
+ opt :shuffle, "shuffle data before each epoch", :short => '-z'
+ opt :select_freq, "l2 feature selection: frequency", :type => :int, :default => 100, :short => '-f'
+ opt :select_k, "l2 feature selection: k", :type => :int, :default => 0, :short => '-k'
end
dtrain_conf = conf[:conf]
@@ -19,6 +29,10 @@ input = conf[:input]
epochs = conf[:epochs]
learning_rate = conf[:learning_rate]
num_slaves = conf[:slaves]
+shuf = conf[:shuffle]
+freq = conf[:select_freq]
+k = conf[:select_k]
+select = k>0
dtrain_dir = File.expand_path File.dirname(__FILE__)
if not conf[:dtrain_binary]
@@ -28,22 +42,25 @@ else
end
socks = []
-port = 60666 # last port = port+slaves
slave_pids = []
-master_ip = Socket.ip_address_list[0].ip_address
+#port = 60666 # last port = port+slaves
+#master_ip = Socket.ip_address_list[0].ip_address
`mkdir work`
+socks_files = []
num_slaves.times { |i|
socks << NanoMsg::PairSocket.new
- addr = "tcp://#{master_ip}:#{port}"
- socks.last.bind addr
- STDERR.write "listening on #{addr}\n"
+ #addr = "tcp://#{master_ip}:#{port}"
+ socks_files << Tempfile.new('downpour')
+ url = "ipc://#{socks_files.last.path}"
+ socks.last.bind url
+ STDERR.write "listening on #{url}\n"
slave_pids << Kernel.fork {
- cmd = "#{dtrain_bin} -c #{dtrain_conf} -a #{addr} &>work/out.#{i}"
- `#{cmd}`
+ `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
+ #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
}
- port += 1
+ #port += 1
}
threads = []
@@ -56,7 +73,23 @@ socks.each_with_index { |n,i|
threads.each { |thr| thr.join } # timeout?
threads.clear
-inf = ReadFile.new input
+def shuffle_file fn_in, fn_out
+ a = ReadFile.readlines_strip fn_in
+ o = WriteFile.new fn_out
+ a.shuffle!
+ o.write a.join("\n")+"\n"
+ o.close
+
+ return fn_out
+end
+
+inf = nil
+if shuf
+ input = shuffle_file input, "work/input.0.gz"
+ inf = ReadFile.new input
+else
+ inf = ReadFile.new input
+end
buf = []
j = 0
m = Mutex.new
@@ -66,7 +99,13 @@ ready = num_slaves.times.map { true }
cma = 1
epochs.times { |epoch|
STDERR.write "---\nepoch #{epoch}\n"
-inf.rewind
+if shuf && epoch>0
+ inf.close
+ input = shuffle_file input, "work/input.#{epoch}.gz"
+ inf = ReadFile.new input
+else
+ inf.rewind
+end
i = 0
while true # round-robin
d = inf.gets
@@ -78,9 +117,7 @@ while true # round-robin
end
STDERR.write "sending source ##{i} to slave ##{j}\n"
socks[j].send d
- n.synchronize {
- ready[j] = false
- }
+ n.synchronize { ready[j] = false }
threads << Thread.new {
me = j
moment = cma
@@ -93,17 +130,23 @@ while true # round-robin
STDERR.write "T sending new weights to slave ##{me}\n"
socks[me].send w.to_kv
STDERR.write "T sent new weights to slave ##{me}\n"
- n.synchronize {
- ready[me] = true
- }
+ n.synchronize { ready[me] = true }
}
sleep 1
+ if select && i>0 && (i+1)%freq==0
+ before = w.size
+ m.synchronize { l2_select w, k }
+ STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n"
+ end
i += 1
cma += 1
j += 1
j = 0 if j==num_slaves
threads.delete_if { |thr| !thr.status }
end
+wf = WriteFile.new "weights.#{epoch}.gz"
+wf.write w.to_kv(" ", "\n")+"\n"
+wf.close
}
threads.each { |thr| thr.join }
@@ -116,5 +159,5 @@ socks.each { |n|
slave_pids.each { |pid| Process.wait(pid) }
-puts w.to_kv " ", "\n"
+socks_files.each { |f| f.unlink }