summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Simianer <p@simianer.de>2015-05-13 20:32:08 +0200
committerPatrick Simianer <p@simianer.de>2015-05-13 20:32:08 +0200
commite76d0b1194c912f7329908d6f799eeccf9ab5456 (patch)
tree7f99524a31e86cb11e3661b97f3caa2c33230fca
parent717bead50c8b209dac3f6ac8cb2c081613850d26 (diff)
nanomsg
-rw-r--r--nanomsg/bus.rb46
-rw-r--r--nanomsg/feed.cc53
-rwxr-xr-xnanomsg/feed.rb22
-rw-r--r--nanomsg/master.rb57
-rw-r--r--nanomsg/slave.rb25
5 files changed, 203 insertions, 0 deletions
diff --git a/nanomsg/bus.rb b/nanomsg/bus.rb
new file mode 100644
index 0000000..8a561e3
--- /dev/null
+++ b/nanomsg/bus.rb
@@ -0,0 +1,46 @@
+require 'nanomsg'
+
+port = 60000
+srv = NanoMsg::BusSocket.new
+srv.bind("tcp://127.0.0.1:#{port}")
+port += 1
+
+slaves = []
+ports = []
+10.times {
+ slaves << NanoMsg::BusSocket.new
+ slaves.last.bind "tcp://127.0.0.1:#{port}"
+ srv.connect "tcp://127.0.0.1:#{port}"
+ ports << port
+ port += 1
+}
+
+slaves.each_with_index { |n,i|
+ ports[i+1..ports.size].each { |p|
+ slaves[i].connect "tcp://127.0.0.1:#{p}"
+ }
+}
+
+slaves.each_with_index { |n,i|
+ Thread.new {
+ me = i
+ while true
+ msg = n.recv
+ to = msg.split.first.split('=')[1].to_i
+ puts "slave #{i} recv: '#{msg}'" if me==to
+ sleep 1
+ end
+ }
+}
+
+i = 0
+j = 0
+while line = STDIN.gets
+ puts "src: sending #{i}"
+ srv.send("for=#{j} #{i} #{line.strip}")
+ sleep 1
+ i += 1
+ j += 1
+ j = 0 if j==10
+end
+
diff --git a/nanomsg/feed.cc b/nanomsg/feed.cc
new file mode 100644
index 0000000..f8923fc
--- /dev/null
+++ b/nanomsg/feed.cc
@@ -0,0 +1,53 @@
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include "nn.hpp"
+
+using namespace std;
+
+void
+recv(nn::socket& sock)
+{
+ char *buf = NULL;
+ size_t sz = sock.recv(&buf, NN_MSG, 0);
+ if (buf) {
+ string translation(buf, buf+sz);
+ cout << "got translation '" << translation << "'" << endl << endl;
+ }
+}
+
+void
+send(nn::socket& sock, const string& msg)
+{
+ cout << "sending source '" << msg << "'" << endl;
+ sock.send(msg.c_str(), msg.size()+1, 0);
+}
+
+void
+loop(nn::socket& sock)
+{
+ int to = 100;
+ sock.setsockopt(NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to));
+ for (string line; getline(cin, line);) {
+ send(sock, line);
+ sleep(1);
+ recv(sock);
+ }
+}
+
+int main(int argc, char const* argv[])
+{
+ nn::socket sock(AF_SP, NN_PAIR);
+ //string url = "ipc:///tmp/network_decoder.ipc";
+ string url = "tcp://127.0.0.1:60666";
+ sock.connect(url.c_str());
+ loop(sock);
+
+ return 0;
+}
+
diff --git a/nanomsg/feed.rb b/nanomsg/feed.rb
new file mode 100755
index 0000000..5ea9737
--- /dev/null
+++ b/nanomsg/feed.rb
@@ -0,0 +1,22 @@
+#!/usr/bin/env ruby
+
+require 'nanomsg'
+
+port = 60666
+sock = NanoMsg::PairSocket.new
+addr = "tcp://127.0.0.1:#{port}"
+#addr = "ipc:///tmp/network_decoder.ipc"
+sock.connect addr
+
+while true
+ line = STDIN.gets
+ if !line
+ sock.send 'shutdown'
+ break
+ end
+ puts "sending source '#{line.strip}'"
+ sock.send line.strip
+ sleep 1
+ puts "got translation: #{sock.recv}\n\n"
+end
+
diff --git a/nanomsg/master.rb b/nanomsg/master.rb
new file mode 100644
index 0000000..e21f88d
--- /dev/null
+++ b/nanomsg/master.rb
@@ -0,0 +1,57 @@
+require 'nanomsg'
+
+port = 60000
+socks = []
+m = 1
+m.times { |i|
+ socks << NanoMsg::PairSocket.new
+ addr = "tcp://127.0.0.1:#{port}"
+ socks.last.bind addr
+ puts "listening on #{addr}"
+ port += 1
+}
+
+threads = []
+socks.each_with_index { |n,i|
+ threads << Thread.new {
+ puts "sending hello to #{i}"
+ n.send "hello #{i}"
+ sleep 1
+ n.recv
+ puts "got hello from #{i}"
+ }
+}
+
+threads.each { |thr| thr.join }
+threads.clear
+
+socks.each_with_index {|n,i|
+ threads << Thread.new {
+ while true
+ msg = n.recv
+ puts "message from #{i}: #{msg}"
+ break if msg == "shutting down"
+ sleep 1
+ end
+ }
+}
+
+i = 0
+j = 0
+while line = STDIN.gets
+ puts "sending source #{i} to #{j}"
+ socks[j].send "#{j} #{i} #{line.strip}"
+ sleep 1
+ i += 1
+ j += 1
+ j = 0 if j==m
+end
+
+socks.each { |n|
+ Thread.new {
+ n.send "shutdown"
+ }
+}
+
+threads.each { |thr| thr.join }
+
diff --git a/nanomsg/slave.rb b/nanomsg/slave.rb
new file mode 100644
index 0000000..3183df8
--- /dev/null
+++ b/nanomsg/slave.rb
@@ -0,0 +1,25 @@
+require 'nanomsg'
+
+sock = NanoMsg::PairSocket.new
+sock.connect "tcp://127.0.0.1:#{ARGV[0]}"
+
+sock.recv
+sleep 1
+sock.send "hello there"
+
+i = 0
+while true
+ msg = sock.recv
+ if msg == "shutdown"
+ sock.send "shutting down"
+ break
+ end
+ me, id, input = msg.split
+ sleep 1
+ sock.send "#{me} answers #{id}"
+ sleep 1
+ sock.send "----2#{i}----" if i%2==0
+ sleep 1
+ i += 1
+end
+