From e76d0b1194c912f7329908d6f799eeccf9ab5456 Mon Sep 17 00:00:00 2001 From: Patrick Simianer Date: Wed, 13 May 2015 20:32:08 +0200 Subject: nanomsg --- nanomsg/bus.rb | 46 ++++++++++++++++++++++++++++++++++++++++++++ nanomsg/feed.cc | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++ nanomsg/feed.rb | 22 +++++++++++++++++++++ nanomsg/master.rb | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ nanomsg/slave.rb | 25 ++++++++++++++++++++++++ 5 files changed, 203 insertions(+) create mode 100644 nanomsg/bus.rb create mode 100644 nanomsg/feed.cc create mode 100755 nanomsg/feed.rb create mode 100644 nanomsg/master.rb create mode 100644 nanomsg/slave.rb (limited to 'nanomsg') diff --git a/nanomsg/bus.rb b/nanomsg/bus.rb new file mode 100644 index 0000000..8a561e3 --- /dev/null +++ b/nanomsg/bus.rb @@ -0,0 +1,46 @@ +require 'nanomsg' + +port = 60000 +srv = NanoMsg::BusSocket.new +srv.bind("tcp://127.0.0.1:#{port}") +port += 1 + +slaves = [] +ports = [] +10.times { + slaves << NanoMsg::BusSocket.new + slaves.last.bind "tcp://127.0.0.1:#{port}" + srv.connect "tcp://127.0.0.1:#{port}" + ports << port + port += 1 +} + +slaves.each_with_index { |n,i| + ports[i+1..ports.size].each { |p| + slaves[i].connect "tcp://127.0.0.1:#{p}" + } +} + +slaves.each_with_index { |n,i| + Thread.new { + me = i + while true + msg = n.recv + to = msg.split.first.split('=')[1].to_i + puts "slave #{i} recv: '#{msg}'" if me==to + sleep 1 + end + } +} + +i = 0 +j = 0 +while line = STDIN.gets + puts "src: sending #{i}" + srv.send("for=#{j} #{i} #{line.strip}") + sleep 1 + i += 1 + j += 1 + j = 0 if j==10 +end + diff --git a/nanomsg/feed.cc b/nanomsg/feed.cc new file mode 100644 index 0000000..f8923fc --- /dev/null +++ b/nanomsg/feed.cc @@ -0,0 +1,53 @@ +#include +#include +#include +#include +#include + +#include +#include +#include "nn.hpp" + +using namespace std; + +void +recv(nn::socket& sock) +{ + char *buf = NULL; + size_t sz = sock.recv(&buf, NN_MSG, 0); + if (buf) { + string translation(buf, buf+sz); + cout << "got translation '" << translation << "'" << endl << endl; + } +} + +void +send(nn::socket& sock, const string& msg) +{ + cout << "sending source '" << msg << "'" << endl; + sock.send(msg.c_str(), msg.size()+1, 0); +} + +void +loop(nn::socket& sock) +{ + int to = 100; + sock.setsockopt(NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to)); + for (string line; getline(cin, line);) { + send(sock, line); + sleep(1); + recv(sock); + } +} + +int main(int argc, char const* argv[]) +{ + nn::socket sock(AF_SP, NN_PAIR); + //string url = "ipc:///tmp/network_decoder.ipc"; + string url = "tcp://127.0.0.1:60666"; + sock.connect(url.c_str()); + loop(sock); + + return 0; +} + diff --git a/nanomsg/feed.rb b/nanomsg/feed.rb new file mode 100755 index 0000000..5ea9737 --- /dev/null +++ b/nanomsg/feed.rb @@ -0,0 +1,22 @@ +#!/usr/bin/env ruby + +require 'nanomsg' + +port = 60666 +sock = NanoMsg::PairSocket.new +addr = "tcp://127.0.0.1:#{port}" +#addr = "ipc:///tmp/network_decoder.ipc" +sock.connect addr + +while true + line = STDIN.gets + if !line + sock.send 'shutdown' + break + end + puts "sending source '#{line.strip}'" + sock.send line.strip + sleep 1 + puts "got translation: #{sock.recv}\n\n" +end + diff --git a/nanomsg/master.rb b/nanomsg/master.rb new file mode 100644 index 0000000..e21f88d --- /dev/null +++ b/nanomsg/master.rb @@ -0,0 +1,57 @@ +require 'nanomsg' + +port = 60000 +socks = [] +m = 1 +m.times { |i| + socks << NanoMsg::PairSocket.new + addr = "tcp://127.0.0.1:#{port}" + socks.last.bind addr + puts "listening on #{addr}" + port += 1 +} + +threads = [] +socks.each_with_index { |n,i| + threads << Thread.new { + puts "sending hello to #{i}" + n.send "hello #{i}" + sleep 1 + n.recv + puts "got hello from #{i}" + } +} + +threads.each { |thr| thr.join } +threads.clear + +socks.each_with_index {|n,i| + threads << Thread.new { + while true + msg = n.recv + puts "message from #{i}: #{msg}" + break if msg == "shutting down" + sleep 1 + end + } +} + +i = 0 +j = 0 +while line = STDIN.gets + puts "sending source #{i} to #{j}" + socks[j].send "#{j} #{i} #{line.strip}" + sleep 1 + i += 1 + j += 1 + j = 0 if j==m +end + +socks.each { |n| + Thread.new { + n.send "shutdown" + } +} + +threads.each { |thr| thr.join } + diff --git a/nanomsg/slave.rb b/nanomsg/slave.rb new file mode 100644 index 0000000..3183df8 --- /dev/null +++ b/nanomsg/slave.rb @@ -0,0 +1,25 @@ +require 'nanomsg' + +sock = NanoMsg::PairSocket.new +sock.connect "tcp://127.0.0.1:#{ARGV[0]}" + +sock.recv +sleep 1 +sock.send "hello there" + +i = 0 +while true + msg = sock.recv + if msg == "shutdown" + sock.send "shutting down" + break + end + me, id, input = msg.split + sleep 1 + sock.send "#{me} answers #{id}" + sleep 1 + sock.send "----2#{i}----" if i%2==0 + sleep 1 + i += 1 +end + -- cgit v1.2.3