summaryrefslogtreecommitdiff
path: root/decoder/feed.rb
blob: 4ac2073021de57f7dbef3a40aa94368af99fa87d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env ruby

require 'trollop'
require 'zipf'
require 'nanomsg'
require 'tempfile'

conf = Trollop::options do
  opt :conf,          "dtrain configuration",        :type => :string, :required => true, :short => '-c'
  opt :input,         "input as bitext (f ||| e)",   :type => :string, :required => true, :short => '-i'
  opt :slaves,        "number of parallel learners", :type => :int,    :default => 1,     :short => '-p'
end

dtrain_conf   = conf[:conf]
input         = conf[:input]
epochs        = conf[:epochs]
learning_rate = conf[:learning_rate]
num_slaves    = conf[:slaves]
dtrain_dir    = File.expand_path File.dirname(__FILE__)

if not conf[:dtrain_binary]
  dtrain_bin = "#{dtrain_dir}/dtrain_net"
else
  dtrain_bin = conf[:dtrain_binary]
end

socks      = []
slave_pids = []
#port       = 60666 # last port = port+slaves
#master_ip  = Socket.ip_address_list[0].ip_address

`mkdir work`

socks_files = []
num_slaves.times { |i|
  socks << NanoMsg::PairSocket.new
  #addr = "tcp://#{master_ip}:#{port}"
  socks_files << Tempfile.new('downpour')
  url = "ipc://#{socks_files.last.path}"
  socks.last.bind url
  STDERR.write "listening on #{url}\n"
  slave_pids << Kernel.fork {
    `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \
      #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}`
  }
  #port += 1
}

threads = []
socks.each_with_index { |n,i|
  threads << Thread.new {
    n.recv
    STDERR.write "got hello from slave ##{i}\n"
  }
}
threads.each { |thr| thr.join } # timeout?
threads.clear

inf = ReadFile.new input
buf = []
j = 0
m = Mutex.new
n = Mutex.new
w = SparseVector.new
ready = num_slaves.times.map { true }
cma = 1
epochs.times { |epoch|
STDERR.write "---\nepoch #{epoch}\n"
inf.rewind
i = 0
while true # round-robin
  d = inf.gets
  break if !d
  d.strip!
  while !ready[j]
    j += 1
    j = 0 if j==num_slaves
  end
  STDERR.write "sending source ##{i} to slave ##{j}\n"
  socks[j].send d
  n.synchronize {
    ready[j] = false
  }
  threads << Thread.new {
    me = j
    moment = cma
    update = SparseVector::from_kv socks[me].recv
    STDERR.write "T update from slave ##{me}\n"
    update *= learning_rate
    update -= w
    update /= moment
    m.synchronize { w += update }
    STDERR.write "T sending new weights to slave ##{me}\n"
    socks[me].send w.to_kv
    STDERR.write "T sent new weights to slave ##{me}\n"
    n.synchronize {
      ready[me] = true
    }
  }
  sleep 1
  i += 1
  cma += 1
  j += 1
  j = 0 if j==num_slaves
  threads.delete_if { |thr| !thr.status }
end
wf = WriteFile.new "weights.#{epoch}.gz"
wf.write w.to_kv(" ", "\n")
wf.close
}

threads.each { |thr| thr.join }

socks.each { |n|
  Thread.new {
    n.send "shutdown"
  }
}

slave_pids.each { |pid| Process.wait(pid) }

socks_files.each { |f| f.unlink }