diff options
Diffstat (limited to 'decoder')
| -rw-r--r-- | decoder/Makefile.am | 7 | ||||
| -rwxr-xr-x | decoder/feed.rb | 123 | 
2 files changed, 128 insertions, 2 deletions
diff --git a/decoder/Makefile.am b/decoder/Makefile.am index bcdc34ed..4c05b8f4 100644 --- a/decoder/Makefile.am +++ b/decoder/Makefile.am @@ -27,9 +27,12 @@ minimal_decoder_SOURCES = minimal_decoder.cc  minimal_decoder_LDADD = libcdec.a ../utils/libutils.a  network_decoder_SOURCES = network_decoder.cc nn.hpp -network_decoder_LDADD = libcdec.a ../klm/search/libksearch.a ../mteval/libmteval.a ../utils/libutils.a ../klm/lm/libklm.a ../klm/util/libklm_util.a ../klm/util/double-conversion/libklm_util_double.a /srv/postedit/lib/nanomsg-0.5-beta/lib/libnanomsg.so +network_decoder_LDADD = libcdec.a ../klm/search/libksearch.a ../mteval/libmteval.a ../utils/libutils.a ../klm/lm/libklm.a ../klm/util/libklm_util.a ../klm/util/double-conversion/libklm_util_double.a /usr/local/lib/libnanomsg.so -AM_CPPFLAGS = -DTEST_DATA=\"$(top_srcdir)/decoder/test_data\" -DBOOST_TEST_DYN_LINK -W -Wno-sign-compare -I$(top_srcdir) -I$(top_srcdir)/mteval -I$(top_srcdir)/utils -I$(top_srcdir)/klm -I/srv/postedit/lib/nanomsg-0.5-beta/include -I/srv/postedit/lib/cppnanomsg +local_SOURCES = local.cc nn.hpp +local_LDADD = /usr/lib64/libnanomsg.so + +AM_CPPFLAGS = -DTEST_DATA=\"$(top_srcdir)/decoder/test_data\" -DBOOST_TEST_DYN_LINK -W -Wno-sign-compare -I$(top_srcdir) -I$(top_srcdir)/mteval -I$(top_srcdir)/utils -I$(top_srcdir)/klm -I /usr/include  rule_lexer.cc: rule_lexer.ll  	$(LEX) -s -CF -8 -o$@ $< diff --git a/decoder/feed.rb b/decoder/feed.rb new file mode 100755 index 00000000..4ac20730 --- /dev/null +++ b/decoder/feed.rb @@ -0,0 +1,123 @@ +#!/usr/bin/env ruby + +require 'trollop' +require 'zipf' +require 'nanomsg' +require 'tempfile' + +conf = Trollop::options do +  opt :conf,          "dtrain configuration",        :type => :string, :required => true, :short => '-c' +  opt :input,         "input as bitext (f ||| e)",   :type => :string, :required => true, :short => '-i' +  opt :slaves,        "number of parallel learners", :type => :int,    :default => 1,     :short => '-p' +end + +dtrain_conf   = conf[:conf] +input         = conf[:input] +epochs        = conf[:epochs] +learning_rate = conf[:learning_rate] +num_slaves    = conf[:slaves] +dtrain_dir    = File.expand_path File.dirname(__FILE__) + +if not conf[:dtrain_binary] +  dtrain_bin = "#{dtrain_dir}/dtrain_net" +else +  dtrain_bin = conf[:dtrain_binary] +end + +socks      = [] +slave_pids = [] +#port       = 60666 # last port = port+slaves +#master_ip  = Socket.ip_address_list[0].ip_address + +`mkdir work` + +socks_files = [] +num_slaves.times { |i| +  socks << NanoMsg::PairSocket.new +  #addr = "tcp://#{master_ip}:#{port}" +  socks_files << Tempfile.new('downpour') +  url = "ipc://#{socks_files.last.path}" +  socks.last.bind url +  STDERR.write "listening on #{url}\n" +  slave_pids << Kernel.fork { +    `LD_LIBRARY_PATH=/scratch/simianer/downpour/nanomsg-0.5-beta/lib \ +      #{dtrain_bin} -c #{dtrain_conf} -a #{url} 2>work/out.#{i}` +  } +  #port += 1 +} + +threads = [] +socks.each_with_index { |n,i| +  threads << Thread.new { +    n.recv +    STDERR.write "got hello from slave ##{i}\n" +  } +} +threads.each { |thr| thr.join } # timeout? +threads.clear + +inf = ReadFile.new input +buf = [] +j = 0 +m = Mutex.new +n = Mutex.new +w = SparseVector.new +ready = num_slaves.times.map { true } +cma = 1 +epochs.times { |epoch| +STDERR.write "---\nepoch #{epoch}\n" +inf.rewind +i = 0 +while true # round-robin +  d = inf.gets +  break if !d +  d.strip! +  while !ready[j] +    j += 1 +    j = 0 if j==num_slaves +  end +  STDERR.write "sending source ##{i} to slave ##{j}\n" +  socks[j].send d +  n.synchronize { +    ready[j] = false +  } +  threads << Thread.new { +    me = j +    moment = cma +    update = SparseVector::from_kv socks[me].recv +    STDERR.write "T update from slave ##{me}\n" +    update *= learning_rate +    update -= w +    update /= moment +    m.synchronize { w += update } +    STDERR.write "T sending new weights to slave ##{me}\n" +    socks[me].send w.to_kv +    STDERR.write "T sent new weights to slave ##{me}\n" +    n.synchronize { +      ready[me] = true +    } +  } +  sleep 1 +  i += 1 +  cma += 1 +  j += 1 +  j = 0 if j==num_slaves +  threads.delete_if { |thr| !thr.status } +end +wf = WriteFile.new "weights.#{epoch}.gz" +wf.write w.to_kv(" ", "\n") +wf.close +} + +threads.each { |thr| thr.join } + +socks.each { |n| +  Thread.new { +    n.send "shutdown" +  } +} + +slave_pids.each { |pid| Process.wait(pid) } + +socks_files.each { |f| f.unlink } +  | 
