summaryrefslogtreecommitdiff
path: root/nanomsg/bus.rb
blob: 8a561e305566f759538b30aa19ca3ade8452b9ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
require 'nanomsg'

port = 60000
srv = NanoMsg::BusSocket.new
srv.bind("tcp://127.0.0.1:#{port}")
port += 1

slaves = []
ports = []
10.times {
  slaves << NanoMsg::BusSocket.new
  slaves.last.bind "tcp://127.0.0.1:#{port}"
  srv.connect "tcp://127.0.0.1:#{port}"
  ports << port
  port += 1
}

slaves.each_with_index { |n,i|
  ports[i+1..ports.size].each { |p|
    slaves[i].connect "tcp://127.0.0.1:#{p}"
  }
}

slaves.each_with_index { |n,i|
  Thread.new {
    me = i
    while true
      msg = n.recv
      to = msg.split.first.split('=')[1].to_i
      puts "slave #{i} recv: '#{msg}'" if me==to
      sleep 1
    end
  }
}

i = 0
j = 0
while line = STDIN.gets
  puts "src: sending #{i}"
  srv.send("for=#{j} #{i} #{line.strip}")
  sleep 1
  i += 1
  j += 1
  j = 0 if j==10
end