summaryrefslogtreecommitdiff
path: root/nanomsg
diff options
context:
space:
mode:
Diffstat (limited to 'nanomsg')
-rw-r--r--nanomsg/Makefile11
-rw-r--r--nanomsg/README5
-rw-r--r--nanomsg/bus.rb46
m---------nanomsg/cppnanomsg0
-rw-r--r--nanomsg/feed.cc53
-rwxr-xr-xnanomsg/feed.rb22
-rw-r--r--nanomsg/master.rb57
-rwxr-xr-xnanomsg/pairbin0 -> 13049 bytes
-rw-r--r--nanomsg/pair.c73
-rwxr-xr-xnanomsg/pipelinebin0 -> 12805 bytes
-rw-r--r--nanomsg/pipeline.c52
-rw-r--r--nanomsg/pipeline.cc54
-rw-r--r--nanomsg/slave.rb25
13 files changed, 398 insertions, 0 deletions
diff --git a/nanomsg/Makefile b/nanomsg/Makefile
new file mode 100644
index 0000000..634c8cc
--- /dev/null
+++ b/nanomsg/Makefile
@@ -0,0 +1,11 @@
+all: pipeline pipelinec pair
+
+pipeline:
+ g++ -I cppnanomsg pipeline.cc -o pipeline /usr/lib64/libnanomsg.so
+
+pipelinec: pipeline.c
+ gcc pipeline.c -o pipeline /usr/lib64/libnanomsg.so
+
+pair: pair.c
+ gcc pair.c -o pair /usr/lib64/libnanomsg.so
+
diff --git a/nanomsg/README b/nanomsg/README
new file mode 100644
index 0000000..31cc054
--- /dev/null
+++ b/nanomsg/README
@@ -0,0 +1,5 @@
+examples from [1]
+
+
+[1] http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html
+
diff --git a/nanomsg/bus.rb b/nanomsg/bus.rb
new file mode 100644
index 0000000..8a561e3
--- /dev/null
+++ b/nanomsg/bus.rb
@@ -0,0 +1,46 @@
+require 'nanomsg'
+
+port = 60000
+srv = NanoMsg::BusSocket.new
+srv.bind("tcp://127.0.0.1:#{port}")
+port += 1
+
+slaves = []
+ports = []
+10.times {
+ slaves << NanoMsg::BusSocket.new
+ slaves.last.bind "tcp://127.0.0.1:#{port}"
+ srv.connect "tcp://127.0.0.1:#{port}"
+ ports << port
+ port += 1
+}
+
+slaves.each_with_index { |n,i|
+ ports[i+1..ports.size].each { |p|
+ slaves[i].connect "tcp://127.0.0.1:#{p}"
+ }
+}
+
+slaves.each_with_index { |n,i|
+ Thread.new {
+ me = i
+ while true
+ msg = n.recv
+ to = msg.split.first.split('=')[1].to_i
+ puts "slave #{i} recv: '#{msg}'" if me==to
+ sleep 1
+ end
+ }
+}
+
+i = 0
+j = 0
+while line = STDIN.gets
+ puts "src: sending #{i}"
+ srv.send("for=#{j} #{i} #{line.strip}")
+ sleep 1
+ i += 1
+ j += 1
+ j = 0 if j==10
+end
+
diff --git a/nanomsg/cppnanomsg b/nanomsg/cppnanomsg
new file mode 160000
+Subproject a36d44db1827a36bbd3868825c1b82d23f10e49
diff --git a/nanomsg/feed.cc b/nanomsg/feed.cc
new file mode 100644
index 0000000..f8923fc
--- /dev/null
+++ b/nanomsg/feed.cc
@@ -0,0 +1,53 @@
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include "nn.hpp"
+
+using namespace std;
+
+void
+recv(nn::socket& sock)
+{
+ char *buf = NULL;
+ size_t sz = sock.recv(&buf, NN_MSG, 0);
+ if (buf) {
+ string translation(buf, buf+sz);
+ cout << "got translation '" << translation << "'" << endl << endl;
+ }
+}
+
+void
+send(nn::socket& sock, const string& msg)
+{
+ cout << "sending source '" << msg << "'" << endl;
+ sock.send(msg.c_str(), msg.size()+1, 0);
+}
+
+void
+loop(nn::socket& sock)
+{
+ int to = 100;
+ sock.setsockopt(NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to));
+ for (string line; getline(cin, line);) {
+ send(sock, line);
+ sleep(1);
+ recv(sock);
+ }
+}
+
+int main(int argc, char const* argv[])
+{
+ nn::socket sock(AF_SP, NN_PAIR);
+ //string url = "ipc:///tmp/network_decoder.ipc";
+ string url = "tcp://127.0.0.1:60666";
+ sock.connect(url.c_str());
+ loop(sock);
+
+ return 0;
+}
+
diff --git a/nanomsg/feed.rb b/nanomsg/feed.rb
new file mode 100755
index 0000000..5ea9737
--- /dev/null
+++ b/nanomsg/feed.rb
@@ -0,0 +1,22 @@
+#!/usr/bin/env ruby
+
+require 'nanomsg'
+
+port = 60666
+sock = NanoMsg::PairSocket.new
+addr = "tcp://127.0.0.1:#{port}"
+#addr = "ipc:///tmp/network_decoder.ipc"
+sock.connect addr
+
+while true
+ line = STDIN.gets
+ if !line
+ sock.send 'shutdown'
+ break
+ end
+ puts "sending source '#{line.strip}'"
+ sock.send line.strip
+ sleep 1
+ puts "got translation: #{sock.recv}\n\n"
+end
+
diff --git a/nanomsg/master.rb b/nanomsg/master.rb
new file mode 100644
index 0000000..e21f88d
--- /dev/null
+++ b/nanomsg/master.rb
@@ -0,0 +1,57 @@
+require 'nanomsg'
+
+port = 60000
+socks = []
+m = 1
+m.times { |i|
+ socks << NanoMsg::PairSocket.new
+ addr = "tcp://127.0.0.1:#{port}"
+ socks.last.bind addr
+ puts "listening on #{addr}"
+ port += 1
+}
+
+threads = []
+socks.each_with_index { |n,i|
+ threads << Thread.new {
+ puts "sending hello to #{i}"
+ n.send "hello #{i}"
+ sleep 1
+ n.recv
+ puts "got hello from #{i}"
+ }
+}
+
+threads.each { |thr| thr.join }
+threads.clear
+
+socks.each_with_index {|n,i|
+ threads << Thread.new {
+ while true
+ msg = n.recv
+ puts "message from #{i}: #{msg}"
+ break if msg == "shutting down"
+ sleep 1
+ end
+ }
+}
+
+i = 0
+j = 0
+while line = STDIN.gets
+ puts "sending source #{i} to #{j}"
+ socks[j].send "#{j} #{i} #{line.strip}"
+ sleep 1
+ i += 1
+ j += 1
+ j = 0 if j==m
+end
+
+socks.each { |n|
+ Thread.new {
+ n.send "shutdown"
+ }
+}
+
+threads.each { |thr| thr.join }
+
diff --git a/nanomsg/pair b/nanomsg/pair
new file mode 100755
index 0000000..978ab21
--- /dev/null
+++ b/nanomsg/pair
Binary files differ
diff --git a/nanomsg/pair.c b/nanomsg/pair.c
new file mode 100644
index 0000000..f252518
--- /dev/null
+++ b/nanomsg/pair.c
@@ -0,0 +1,73 @@
+#include <assert.h>
+#include <unistd.h>
+#include <string.h>
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include <stdio.h>
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+int send_name(int sock, const char *name)
+{
+ printf ("%s: SENDING \"%s\"\n", name, name);
+ int sz_n = strlen (name) + 1; // '\0' too
+ return nn_send (sock, name, sz_n, 0);
+}
+
+int recv_name(int sock, const char *name)
+{
+ char *buf = NULL;
+ int result = nn_recv (sock, &buf, NN_MSG, 0);
+ if (result > 0)
+ {
+ printf ("%s: RECEIVED \"%s\"\n", name, buf);
+ nn_freemsg (buf);
+ }
+ return result;
+}
+
+int send_recv(int sock, const char *name)
+{
+ int to = 100;
+ assert (nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0);
+ while(1)
+ {
+ recv_name(sock, name);
+ sleep(1);
+ send_name(sock, name);
+ }
+}
+
+int node0 (const char *url)
+{
+ int sock = nn_socket (AF_SP, NN_PAIR);
+ assert (sock >= 0);
+ assert (nn_bind (sock, url) >= 0);
+ send_recv(sock, NODE0);
+ return nn_shutdown (sock, 0);
+}
+
+int node1 (const char *url)
+{
+ int sock = nn_socket (AF_SP, NN_PAIR);
+ assert (sock >= 0);
+ assert (nn_connect (sock, url) >= 0);
+ send_recv(sock, NODE1);
+ return nn_shutdown (sock, 0);
+}
+
+int main (const int argc, const char **argv)
+{
+ if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1)
+ return node0 (argv[2]);
+ else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 1)
+ return node1 (argv[2]);
+ else
+ {
+ fprintf (stderr, "Usage: pair %s|%s <URL> <ARG> ...\n",
+ NODE0, NODE1);
+ return 1;
+ }
+}
+
diff --git a/nanomsg/pipeline b/nanomsg/pipeline
new file mode 100755
index 0000000..cacc226
--- /dev/null
+++ b/nanomsg/pipeline
Binary files differ
diff --git a/nanomsg/pipeline.c b/nanomsg/pipeline.c
new file mode 100644
index 0000000..37340fa
--- /dev/null
+++ b/nanomsg/pipeline.c
@@ -0,0 +1,52 @@
+#include <assert.h>
+#include <unistd.h>
+#include <string.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <nanomsg/nn.h>
+#include <nanomsg/pipeline.h>
+
+#define NODE0 "node0"
+#define NODE1 "node1"
+
+int node0 (const char *url)
+{
+ int sock = nn_socket (AF_SP, NN_PULL);
+ assert (sock >= 0);
+ assert (nn_bind (sock, url) >= 0);
+ while (1)
+ {
+ char *buf = NULL;
+ int bytes = nn_recv (sock, &buf, NN_MSG, 0);
+ assert (bytes >= 0);
+ printf ("NODE0: RECEIVED \"%s\"\n", buf);
+ nn_freemsg (buf);
+ }
+}
+
+int node1 (const char *url, const char *msg)
+{
+ int sz_msg = strlen (msg) + 1; // '\0' too
+ int sock = nn_socket (AF_SP, NN_PUSH);
+ assert (sock >= 0);
+ assert (nn_connect (sock, url) >= 0);
+ printf ("NODE1: SENDING \"%s\"\n", msg);
+ int bytes = nn_send (sock, msg, sz_msg, 0);
+ assert (bytes == sz_msg);
+ return nn_shutdown (sock, 0);
+}
+
+int main (const int argc, const char **argv)
+{
+ if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1)
+ return node0 (argv[2]);
+ else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 2)
+ return node1 (argv[2], argv[3]);
+ else
+ {
+ fprintf (stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n",
+ NODE0, NODE1);
+ return 1;
+ }
+}
+
diff --git a/nanomsg/pipeline.cc b/nanomsg/pipeline.cc
new file mode 100644
index 0000000..704f24c
--- /dev/null
+++ b/nanomsg/pipeline.cc
@@ -0,0 +1,54 @@
+/*
+ * template.cpp
+ *
+ * Patrick Simianer <p@simianer.de>
+ * YYYY-MM-DD
+ */
+
+#include <iostream>
+#include <nanomsg/nn.h>
+#include <nanomsg/pipeline.h>
+#include <nn.hpp>
+#include <sstream>
+
+using namespace std;
+
+void
+receiver(const string url)
+{
+ nn::socket s(AF_SP, NN_PULL);
+ s.bind(url.c_str());
+ while (1) {
+ char *buf = NULL;
+ s.recv(&buf, NN_MSG, 0);
+ cout << "receiving " << buf << endl;
+ }
+}
+
+void
+send(const string url, const string msg)
+{
+
+ nn::socket s(AF_SP, NN_PUSH);
+ s.connect(url.c_str());
+ cout << "sending " << msg << endl;
+ s.send(msg.c_str(), msg.size()+1, 0);
+}
+
+int main(int argc, char const* argv[])
+{
+ string cmd(argv[1]);
+ if (cmd == "send") {
+ ostringstream msg;
+ string url(argv[2]);
+ for (size_t i = 3; i < argc; i++)
+ msg << argv[i];
+ send(url, msg.str());
+ } else {
+ string url(argv[1]);
+ receiver(url);
+ }
+
+ return 0;
+}
+
diff --git a/nanomsg/slave.rb b/nanomsg/slave.rb
new file mode 100644
index 0000000..3183df8
--- /dev/null
+++ b/nanomsg/slave.rb
@@ -0,0 +1,25 @@
+require 'nanomsg'
+
+sock = NanoMsg::PairSocket.new
+sock.connect "tcp://127.0.0.1:#{ARGV[0]}"
+
+sock.recv
+sleep 1
+sock.send "hello there"
+
+i = 0
+while true
+ msg = sock.recv
+ if msg == "shutdown"
+ sock.send "shutting down"
+ break
+ end
+ me, id, input = msg.split
+ sleep 1
+ sock.send "#{me} answers #{id}"
+ sleep 1
+ sock.send "----2#{i}----" if i%2==0
+ sleep 1
+ i += 1
+end
+