1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#!/usr/bin/env ruby
require 'trollop'
require 'zipf'
require 'nanomsg'
require 'tempfile'
conf = Trollop::options do
opt :conf, "dtrain configuration", :type => :string, :required => true, :short => '-c'
opt :input, "input as bitext (f ||| e)", :type => :string, :required => true, :short => '-i'
opt :slaves, "number of parallel learners", :type => :int, :default => 1, :short => '-p'
end
dtrain_conf = conf[:conf]
input = conf[:input]
epochs = conf[:epochs]
learning_rate = conf[:learning_rate]
num_slaves = conf[:slaves]
dtrain_dir = File.expand_path File.dirname(__FILE__)
if not conf[:dtrain_binary]
dtrain_bin = "#{dtrain_dir}/dtrain_net"
else
dtrain_bin = conf[:dtrain_binary]
end
socks = []
slave_pids = []
#port = 60666 # last port = port+slaves
#master_ip = Socket.ip_address_list[0].ip_address
`mkdir work`
socks_files = []
num_slaves.times { |i|
socks << NanoMsg::PairSocket.new
#addr = "tcp://#{master_ip}:#{port}"
socks_files << Tempfile.new('downpour')
url = "ipc://#{socks_files.last.path}"
socks.last.bind url
STDERR.write "listening on #{url}\n"
slave_pids << Kernel.fork {
`LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
#{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
}
#port += 1
}
threads = []
socks.each_with_index { |n,i|
threads << Thread.new {
n.recv
STDERR.write "got hello from slave ##{i}\n"
}
}
threads.each { |thr| thr.join } # timeout?
threads.clear
inf = ReadFile.new input
buf = []
j = 0
m = Mutex.new
n = Mutex.new
w = SparseVector.new
ready = num_slaves.times.map { true }
cma = 1
epochs.times { |epoch|
STDERR.write "---\nepoch #{epoch}\n"
inf.rewind
i = 0
while true # round-robin
d = inf.gets
break if !d
d.strip!
while !ready[j]
j += 1
j = 0 if j==num_slaves
end
STDERR.write "sending source ##{i} to slave ##{j}\n"
socks[j].send d
n.synchronize {
ready[j] = false
}
threads << Thread.new {
me = j
moment = cma
update = SparseVector::from_kv socks[me].recv
STDERR.write "T update from slave ##{me}\n"
update *= learning_rate
update -= w
update /= moment
m.synchronize { w += update }
STDERR.write "T sending new weights to slave ##{me}\n"
socks[me].send w.to_kv
STDERR.write "T sent new weights to slave ##{me}\n"
n.synchronize {
ready[me] = true
}
}
sleep 1
i += 1
cma += 1
j += 1
j = 0 if j==num_slaves
threads.delete_if { |thr| !thr.status }
end
wf = WriteFile.new "weights.#{epoch}.gz"
wf.write w.to_kv(" ", "\n")
wf.close
}
threads.each { |thr| thr.join }
socks.each { |n|
Thread.new {
n.send "shutdown"
}
}
slave_pids.each { |pid| Process.wait(pid) }
socks_files.each { |f| f.unlink }
|