From e76d0b1194c912f7329908d6f799eeccf9ab5456 Mon Sep 17 00:00:00 2001 From: Patrick Simianer Date: Wed, 13 May 2015 20:32:08 +0200 Subject: nanomsg --- nanomsg/bus.rb | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 nanomsg/bus.rb (limited to 'nanomsg/bus.rb') diff --git a/nanomsg/bus.rb b/nanomsg/bus.rb new file mode 100644 index 0000000..8a561e3 --- /dev/null +++ b/nanomsg/bus.rb @@ -0,0 +1,46 @@ +require 'nanomsg' + +port = 60000 +srv = NanoMsg::BusSocket.new +srv.bind("tcp://127.0.0.1:#{port}") +port += 1 + +slaves = [] +ports = [] +10.times { + slaves << NanoMsg::BusSocket.new + slaves.last.bind "tcp://127.0.0.1:#{port}" + srv.connect "tcp://127.0.0.1:#{port}" + ports << port + port += 1 +} + +slaves.each_with_index { |n,i| + ports[i+1..ports.size].each { |p| + slaves[i].connect "tcp://127.0.0.1:#{p}" + } +} + +slaves.each_with_index { |n,i| + Thread.new { + me = i + while true + msg = n.recv + to = msg.split.first.split('=')[1].to_i + puts "slave #{i} recv: '#{msg}'" if me==to + sleep 1 + end + } +} + +i = 0 +j = 0 +while line = STDIN.gets + puts "src: sending #{i}" + srv.send("for=#{j} #{i} #{line.strip}") + sleep 1 + i += 1 + j += 1 + j = 0 if j==10 +end + -- cgit v1.2.3