diff options
Diffstat (limited to 'nanomsg')
-rw-r--r-- | nanomsg/Makefile | 11 | ||||
-rw-r--r-- | nanomsg/README | 5 | ||||
-rw-r--r-- | nanomsg/bus.rb | 46 | ||||
m--------- | nanomsg/cppnanomsg | 0 | ||||
-rw-r--r-- | nanomsg/feed.cc | 53 | ||||
-rwxr-xr-x | nanomsg/feed.rb | 22 | ||||
-rw-r--r-- | nanomsg/master.rb | 57 | ||||
-rwxr-xr-x | nanomsg/pair | bin | 0 -> 13049 bytes | |||
-rw-r--r-- | nanomsg/pair.c | 73 | ||||
-rwxr-xr-x | nanomsg/pipeline | bin | 0 -> 12805 bytes | |||
-rw-r--r-- | nanomsg/pipeline.c | 52 | ||||
-rw-r--r-- | nanomsg/pipeline.cc | 54 | ||||
-rw-r--r-- | nanomsg/slave.rb | 25 |
13 files changed, 398 insertions, 0 deletions
diff --git a/nanomsg/Makefile b/nanomsg/Makefile new file mode 100644 index 0000000..634c8cc --- /dev/null +++ b/nanomsg/Makefile @@ -0,0 +1,11 @@ +all: pipeline pipelinec pair + +pipeline: + g++ -I cppnanomsg pipeline.cc -o pipeline /usr/lib64/libnanomsg.so + +pipelinec: pipeline.c + gcc pipeline.c -o pipeline /usr/lib64/libnanomsg.so + +pair: pair.c + gcc pair.c -o pair /usr/lib64/libnanomsg.so + diff --git a/nanomsg/README b/nanomsg/README new file mode 100644 index 0000000..31cc054 --- /dev/null +++ b/nanomsg/README @@ -0,0 +1,5 @@ +examples from [1] + + +[1] http://tim.dysinger.net/posts/2013-09-16-getting-started-with-nanomsg.html + diff --git a/nanomsg/bus.rb b/nanomsg/bus.rb new file mode 100644 index 0000000..8a561e3 --- /dev/null +++ b/nanomsg/bus.rb @@ -0,0 +1,46 @@ +require 'nanomsg' + +port = 60000 +srv = NanoMsg::BusSocket.new +srv.bind("tcp://127.0.0.1:#{port}") +port += 1 + +slaves = [] +ports = [] +10.times { + slaves << NanoMsg::BusSocket.new + slaves.last.bind "tcp://127.0.0.1:#{port}" + srv.connect "tcp://127.0.0.1:#{port}" + ports << port + port += 1 +} + +slaves.each_with_index { |n,i| + ports[i+1..ports.size].each { |p| + slaves[i].connect "tcp://127.0.0.1:#{p}" + } +} + +slaves.each_with_index { |n,i| + Thread.new { + me = i + while true + msg = n.recv + to = msg.split.first.split('=')[1].to_i + puts "slave #{i} recv: '#{msg}'" if me==to + sleep 1 + end + } +} + +i = 0 +j = 0 +while line = STDIN.gets + puts "src: sending #{i}" + srv.send("for=#{j} #{i} #{line.strip}") + sleep 1 + i += 1 + j += 1 + j = 0 if j==10 +end + diff --git a/nanomsg/cppnanomsg b/nanomsg/cppnanomsg new file mode 160000 +Subproject a36d44db1827a36bbd3868825c1b82d23f10e49 diff --git a/nanomsg/feed.cc b/nanomsg/feed.cc new file mode 100644 index 0000000..f8923fc --- /dev/null +++ b/nanomsg/feed.cc @@ -0,0 +1,53 @@ +#include <iostream> +#include <string> +#include <sstream> +#include <stdio.h> +#include <unistd.h> + +#include <nanomsg/nn.h> +#include <nanomsg/pair.h> +#include "nn.hpp" + +using namespace std; + +void +recv(nn::socket& sock) +{ + char *buf = NULL; + size_t sz = sock.recv(&buf, NN_MSG, 0); + if (buf) { + string translation(buf, buf+sz); + cout << "got translation '" << translation << "'" << endl << endl; + } +} + +void +send(nn::socket& sock, const string& msg) +{ + cout << "sending source '" << msg << "'" << endl; + sock.send(msg.c_str(), msg.size()+1, 0); +} + +void +loop(nn::socket& sock) +{ + int to = 100; + sock.setsockopt(NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof(to)); + for (string line; getline(cin, line);) { + send(sock, line); + sleep(1); + recv(sock); + } +} + +int main(int argc, char const* argv[]) +{ + nn::socket sock(AF_SP, NN_PAIR); + //string url = "ipc:///tmp/network_decoder.ipc"; + string url = "tcp://127.0.0.1:60666"; + sock.connect(url.c_str()); + loop(sock); + + return 0; +} + diff --git a/nanomsg/feed.rb b/nanomsg/feed.rb new file mode 100755 index 0000000..5ea9737 --- /dev/null +++ b/nanomsg/feed.rb @@ -0,0 +1,22 @@ +#!/usr/bin/env ruby + +require 'nanomsg' + +port = 60666 +sock = NanoMsg::PairSocket.new +addr = "tcp://127.0.0.1:#{port}" +#addr = "ipc:///tmp/network_decoder.ipc" +sock.connect addr + +while true + line = STDIN.gets + if !line + sock.send 'shutdown' + break + end + puts "sending source '#{line.strip}'" + sock.send line.strip + sleep 1 + puts "got translation: #{sock.recv}\n\n" +end + diff --git a/nanomsg/master.rb b/nanomsg/master.rb new file mode 100644 index 0000000..e21f88d --- /dev/null +++ b/nanomsg/master.rb @@ -0,0 +1,57 @@ +require 'nanomsg' + +port = 60000 +socks = [] +m = 1 +m.times { |i| + socks << NanoMsg::PairSocket.new + addr = "tcp://127.0.0.1:#{port}" + socks.last.bind addr + puts "listening on #{addr}" + port += 1 +} + +threads = [] +socks.each_with_index { |n,i| + threads << Thread.new { + puts "sending hello to #{i}" + n.send "hello #{i}" + sleep 1 + n.recv + puts "got hello from #{i}" + } +} + +threads.each { |thr| thr.join } +threads.clear + +socks.each_with_index {|n,i| + threads << Thread.new { + while true + msg = n.recv + puts "message from #{i}: #{msg}" + break if msg == "shutting down" + sleep 1 + end + } +} + +i = 0 +j = 0 +while line = STDIN.gets + puts "sending source #{i} to #{j}" + socks[j].send "#{j} #{i} #{line.strip}" + sleep 1 + i += 1 + j += 1 + j = 0 if j==m +end + +socks.each { |n| + Thread.new { + n.send "shutdown" + } +} + +threads.each { |thr| thr.join } + diff --git a/nanomsg/pair b/nanomsg/pair Binary files differnew file mode 100755 index 0000000..978ab21 --- /dev/null +++ b/nanomsg/pair diff --git a/nanomsg/pair.c b/nanomsg/pair.c new file mode 100644 index 0000000..f252518 --- /dev/null +++ b/nanomsg/pair.c @@ -0,0 +1,73 @@ +#include <assert.h> +#include <unistd.h> +#include <string.h> +#include <nanomsg/nn.h> +#include <nanomsg/pair.h> +#include <stdio.h> + +#define NODE0 "node0" +#define NODE1 "node1" + +int send_name(int sock, const char *name) +{ + printf ("%s: SENDING \"%s\"\n", name, name); + int sz_n = strlen (name) + 1; // '\0' too + return nn_send (sock, name, sz_n, 0); +} + +int recv_name(int sock, const char *name) +{ + char *buf = NULL; + int result = nn_recv (sock, &buf, NN_MSG, 0); + if (result > 0) + { + printf ("%s: RECEIVED \"%s\"\n", name, buf); + nn_freemsg (buf); + } + return result; +} + +int send_recv(int sock, const char *name) +{ + int to = 100; + assert (nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to)) >= 0); + while(1) + { + recv_name(sock, name); + sleep(1); + send_name(sock, name); + } +} + +int node0 (const char *url) +{ + int sock = nn_socket (AF_SP, NN_PAIR); + assert (sock >= 0); + assert (nn_bind (sock, url) >= 0); + send_recv(sock, NODE0); + return nn_shutdown (sock, 0); +} + +int node1 (const char *url) +{ + int sock = nn_socket (AF_SP, NN_PAIR); + assert (sock >= 0); + assert (nn_connect (sock, url) >= 0); + send_recv(sock, NODE1); + return nn_shutdown (sock, 0); +} + +int main (const int argc, const char **argv) +{ + if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1) + return node0 (argv[2]); + else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 1) + return node1 (argv[2]); + else + { + fprintf (stderr, "Usage: pair %s|%s <URL> <ARG> ...\n", + NODE0, NODE1); + return 1; + } +} + diff --git a/nanomsg/pipeline b/nanomsg/pipeline Binary files differnew file mode 100755 index 0000000..cacc226 --- /dev/null +++ b/nanomsg/pipeline diff --git a/nanomsg/pipeline.c b/nanomsg/pipeline.c new file mode 100644 index 0000000..37340fa --- /dev/null +++ b/nanomsg/pipeline.c @@ -0,0 +1,52 @@ +#include <assert.h> +#include <unistd.h> +#include <string.h> +#include <pthread.h> +#include <stdio.h> +#include <nanomsg/nn.h> +#include <nanomsg/pipeline.h> + +#define NODE0 "node0" +#define NODE1 "node1" + +int node0 (const char *url) +{ + int sock = nn_socket (AF_SP, NN_PULL); + assert (sock >= 0); + assert (nn_bind (sock, url) >= 0); + while (1) + { + char *buf = NULL; + int bytes = nn_recv (sock, &buf, NN_MSG, 0); + assert (bytes >= 0); + printf ("NODE0: RECEIVED \"%s\"\n", buf); + nn_freemsg (buf); + } +} + +int node1 (const char *url, const char *msg) +{ + int sz_msg = strlen (msg) + 1; // '\0' too + int sock = nn_socket (AF_SP, NN_PUSH); + assert (sock >= 0); + assert (nn_connect (sock, url) >= 0); + printf ("NODE1: SENDING \"%s\"\n", msg); + int bytes = nn_send (sock, msg, sz_msg, 0); + assert (bytes == sz_msg); + return nn_shutdown (sock, 0); +} + +int main (const int argc, const char **argv) +{ + if (strncmp (NODE0, argv[1], strlen (NODE0)) == 0 && argc > 1) + return node0 (argv[2]); + else if (strncmp (NODE1, argv[1], strlen (NODE1)) == 0 && argc > 2) + return node1 (argv[2], argv[3]); + else + { + fprintf (stderr, "Usage: pipeline %s|%s <URL> <ARG> ...'\n", + NODE0, NODE1); + return 1; + } +} + diff --git a/nanomsg/pipeline.cc b/nanomsg/pipeline.cc new file mode 100644 index 0000000..704f24c --- /dev/null +++ b/nanomsg/pipeline.cc @@ -0,0 +1,54 @@ +/* + * template.cpp + * + * Patrick Simianer <p@simianer.de> + * YYYY-MM-DD + */ + +#include <iostream> +#include <nanomsg/nn.h> +#include <nanomsg/pipeline.h> +#include <nn.hpp> +#include <sstream> + +using namespace std; + +void +receiver(const string url) +{ + nn::socket s(AF_SP, NN_PULL); + s.bind(url.c_str()); + while (1) { + char *buf = NULL; + s.recv(&buf, NN_MSG, 0); + cout << "receiving " << buf << endl; + } +} + +void +send(const string url, const string msg) +{ + + nn::socket s(AF_SP, NN_PUSH); + s.connect(url.c_str()); + cout << "sending " << msg << endl; + s.send(msg.c_str(), msg.size()+1, 0); +} + +int main(int argc, char const* argv[]) +{ + string cmd(argv[1]); + if (cmd == "send") { + ostringstream msg; + string url(argv[2]); + for (size_t i = 3; i < argc; i++) + msg << argv[i]; + send(url, msg.str()); + } else { + string url(argv[1]); + receiver(url); + } + + return 0; +} + diff --git a/nanomsg/slave.rb b/nanomsg/slave.rb new file mode 100644 index 0000000..3183df8 --- /dev/null +++ b/nanomsg/slave.rb @@ -0,0 +1,25 @@ +require 'nanomsg' + +sock = NanoMsg::PairSocket.new +sock.connect "tcp://127.0.0.1:#{ARGV[0]}" + +sock.recv +sleep 1 +sock.send "hello there" + +i = 0 +while true + msg = sock.recv + if msg == "shutdown" + sock.send "shutting down" + break + end + me, id, input = msg.split + sleep 1 + sock.send "#{me} answers #{id}" + sleep 1 + sock.send "----2#{i}----" if i%2==0 + sleep 1 + i += 1 +end + |