summaryrefslogtreecommitdiff
path: root/training/dtrain/downpour.rb
blob: f5477988d9f358c822167ec1c5ed59726ad5d119 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env ruby

require 'trollop'
require 'zipf'
require 'socket'
require 'nanomsg'
require 'tempfile'

def l2_select v, k=10000
  return if v.size<=k
  min = v.values.map { |i| i.abs2 }.sort.reverse[k-1]
  v.delete_if { |k,v| v.abs2 < min }
end

conf = Trollop::options do
  opt :conf,          "dtrain configuration",           :type => :string, :required => true, :short => '-c'
  opt :input,         "input as bitext (f ||| e)",      :type => :string, :required => true, :short => '-i'
  opt :epochs,        "number of epochs",               :type => :int,    :default => 10,    :short => '-e'
  opt :learning_rate, "learning rate",                  :type => :float,  :default => 1.0,   :short => '-l'
  opt :slaves,        "number of parallel learners",    :type => :int,    :default => 1,     :short => '-p'
  opt :dtrain_binary, "path to dtrain_net binary",      :type => :string,                    :short => '-d'
  opt :shuffle,       "shuffle data before each epoch",                                      :short => '-z'
  opt :select_freq,   "l2 feature selection: frequency", :type => :int,    :default => 100,  :short => '-f'
  opt :select_k,      "l2 feature selection: k",         :type => :int,    :default => 0,    :short => '-k'
end

dtrain_conf   = conf[:conf]
input         = conf[:input]
epochs        = conf[:epochs]
learning_rate = conf[:learning_rate]
num_slaves    = conf[:slaves]
shuf          = conf[:shuffle]
freq          = conf[:select_freq]
k             = conf[:select_k]
select        = k>0
dtrain_dir    = File.expand_path File.dirname(__FILE__)

if not conf[:dtrain_binary]
  dtrain_bin = "#{dtrain_dir}/dtrain_net"
else
  dtrain_bin = conf[:dtrain_binary]
end

socks      = []
slave_pids = []
#port       = 60666 # last port = port+slaves
#master_ip  = Socket.ip_address_list[0].ip_address

`mkdir work`

socks_files = []
num_slaves.times { |i|
  socks << NanoMsg::PairSocket.new
  #addr = "tcp://#{master_ip}:#{port}"
  socks_files << Tempfile.new('downpour')
  url = "ipc://#{socks_files.last.path}"
  socks.last.bind url
  STDERR.write "listening on #{url}\n"
  slave_pids << Kernel.fork {
    `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
      #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
  }
  #port += 1
}

threads = []
socks.each_with_index { |n,i|
  threads << Thread.new {
    n.recv
    STDERR.write "got hello from slave ##{i}\n"
  }
}
threads.each { |thr| thr.join } # timeout?
threads.clear

def shuffle_file fn_in, fn_out
  a = ReadFile.readlines_strip fn_in
  o = WriteFile.new fn_out
  a.shuffle!
  o.write a.join("\n")+"\n"
  o.close

  return fn_out
end

inf = nil
if shuf
  input = shuffle_file input, "work/input.0.gz"
  inf = ReadFile.new input
else
  inf = ReadFile.new input
end
buf = []
j = 0
m = Mutex.new
n = Mutex.new
w = SparseVector.new
ready = num_slaves.times.map { true }
cma = 1
epochs.times { |epoch|
STDERR.write "---\nepoch #{epoch}\n"
if shuf && epoch>0
  inf.close
  input = shuffle_file input, "work/input.#{epoch}.gz"
  inf = ReadFile.new input
else
  inf.rewind
end
i = 0
while true # round-robin
  d = inf.gets
  break if !d
  d.strip!
  while !ready[j]
    j += 1
    j = 0 if j==num_slaves
  end
  STDERR.write "sending source ##{i} to slave ##{j}\n"
  socks[j].send d
  n.synchronize { ready[j] = false }
  threads << Thread.new {
    me = j
    moment = cma
    update = SparseVector::from_kv socks[me].recv
    STDERR.write "T update from slave ##{me}\n"
    update *= learning_rate
    update -= w
    update /= moment
    m.synchronize { w += update }
    STDERR.write "T sending new weights to slave ##{me}\n"
    socks[me].send w.to_kv
    STDERR.write "T sent new weights to slave ##{me}\n"
    n.synchronize { ready[me] = true }
  }
  sleep 1
  if select && i>0 && (i+1)%freq==0
    before = w.size
    m.synchronize { l2_select w, k }
    STDERR.write "l2 feature selection, before=#{before}, after=#{w.size}\n"
  end
  i += 1
  cma += 1
  j += 1
  j = 0 if j==num_slaves
  threads.delete_if { |thr| !thr.status }
end
wf = WriteFile.new "weights.#{epoch}.gz"
wf.write w.to_kv(" ", "\n")+"\n"
wf.close
}

threads.each { |thr| thr.join }

socks.each { |n|
  Thread.new {
    n.send "shutdown"
  }
}

slave_pids.each { |pid| Process.wait(pid) }

socks_files.each { |f| f.unlink }