summaryrefslogtreecommitdiff
path: root/training/utils/sentserver.cc
diff options
context:
space:
mode:
authorPaul Baltescu <pauldb89@gmail.com>2013-02-21 14:13:55 +0000
committerPaul Baltescu <pauldb89@gmail.com>2013-02-21 14:13:55 +0000
commitbca26d953a774b8efca12f30407390b3f5eef9d0 (patch)
treefe922de5c89b1844f677d550dcc24e87edd67a55 /training/utils/sentserver.cc
parent54a1c0e2bde259e3acc9c0a8ec8da3c7704e80ca (diff)
parent95c364f2cb002241c4a62bedb1c5ef6f1e9a7f22 (diff)
Merge branch 'master' of https://github.com/pauldb89/cdec
Diffstat (limited to 'training/utils/sentserver.cc')
-rw-r--r--training/utils/sentserver.cc515
1 files changed, 515 insertions, 0 deletions
diff --git a/training/utils/sentserver.cc b/training/utils/sentserver.cc
new file mode 100644
index 00000000..b425955f
--- /dev/null
+++ b/training/utils/sentserver.cc
@@ -0,0 +1,515 @@
+/* Copyright (c) 2001 by David Chiang. All rights reserved.*/
+
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <sched.h>
+#include <pthread.h>
+#include <errno.h>
+
+#include "sentserver.h"
+
+#define MAX_CLIENTS 64
+
+struct clientinfo {
+ int s;
+ struct sockaddr_in sin;
+};
+
+struct line {
+ int id;
+ char *s;
+ int status;
+ struct line *next;
+} *head, **ptail;
+
+int n_sent = 0, n_received=0, n_flushed=0;
+
+#define STATUS_RUNNING 0
+#define STATUS_ABORTED 1
+#define STATUS_FINISHED 2
+
+pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t input_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+int n_clients = 0;
+int s;
+int expect_multiline_output = 0;
+int log_mutex = 0;
+int stay_alive = 0; /* dont panic and die with zero clients */
+
+void queue_finish(struct line *node, char *s, int fid);
+char * read_line(int fd, int multiline);
+void done (int code);
+
+struct line * queue_get(int fid) {
+ struct line *cur;
+ char *s, *synch;
+
+ if (log_mutex) fprintf(stderr, "Getting for data for fid %d\n", fid);
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+
+ /* First, check for aborted sentences. */
+
+ if (log_mutex) fprintf(stderr, " Checking queue for aborted jobs (fid %d)\n", fid);
+ for (cur = head; cur != NULL; cur = cur->next) {
+ if (cur->status == STATUS_ABORTED) {
+ cur->status = STATUS_RUNNING;
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ return cur;
+ }
+ }
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ /* Otherwise, read a new one. */
+ if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid);
+ if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid);
+ pthread_mutex_lock(&input_mutex);
+ s = read_line(0,0);
+
+ while (s) {
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);
+ pthread_mutex_unlock(&input_mutex);
+
+ cur = (line*)malloc(sizeof (struct line));
+ cur->id = n_sent;
+ cur->s = s;
+ cur->next = NULL;
+
+ *ptail = cur;
+ ptail = &cur->next;
+
+ n_sent++;
+
+ if (strcmp(s,"===SYNCH===\n")==0){
+ fprintf(stderr, "Received ===SYNCH=== signal (fid %d)\n", fid);
+ // Note: queue_finish calls free(cur->s).
+ // Therefore we need to create a new string here.
+ synch = (char*)malloc((strlen("===SYNCH===\n")+2) * sizeof (char));
+ synch = strcpy(synch, s);
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+ queue_finish(cur, synch, fid); /* handles its own lock */
+
+ if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid);
+ if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid);
+ pthread_mutex_lock(&input_mutex);
+
+ s = read_line(0,0);
+ } else {
+ if (log_mutex) fprintf(stderr, " Received new data %d (fid %d)\n", cur->id, fid);
+ cur->status = STATUS_RUNNING;
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+ return cur;
+ }
+ }
+
+ if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);
+ pthread_mutex_unlock(&input_mutex);
+ /* Only way to reach this point: no more output */
+
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ if (head == NULL) {
+ fprintf(stderr, "Reached end of file. Exiting.\n");
+ done(0);
+ } else
+ ptail = NULL; /* This serves as a signal that there is no more input */
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ return NULL;
+}
+
+void queue_panic() {
+ struct line *next;
+ while (head && head->status == STATUS_FINISHED) {
+ /* Write out finished sentences */
+ if (head->status == STATUS_FINISHED) {
+ fputs(head->s, stdout);
+ fflush(stdout);
+ }
+ /* Write out blank line for unfinished sentences */
+ if (head->status == STATUS_ABORTED) {
+ fputs("\n", stdout);
+ fflush(stdout);
+ }
+ /* By defition, there cannot be any RUNNING sentences, since
+ function is only called when n_clients == 0 */
+ free(head->s);
+ next = head->next;
+ free(head);
+ head = next;
+ n_flushed++;
+ }
+ fclose(stdout);
+ fprintf(stderr, "All clients died. Panicking, flushing completed sentences and exiting.\n");
+ done(1);
+}
+
+void queue_abort(struct line *node, int fid) {
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ node->status = STATUS_ABORTED;
+ if (n_clients == 0) {
+ if (stay_alive) {
+ fprintf(stderr, "Warning! No live clients detected! Staying alive, will retry soon.\n");
+ } else {
+ queue_panic();
+ }
+ }
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+}
+
+
+void queue_print() {
+ struct line *cur;
+
+ fprintf(stderr, " Queue\n");
+
+ for (cur = head; cur != NULL; cur = cur->next) {
+ switch(cur->status) {
+ case STATUS_RUNNING:
+ fprintf(stderr, " %d running ", cur->id); break;
+ case STATUS_ABORTED:
+ fprintf(stderr, " %d aborted ", cur->id); break;
+ case STATUS_FINISHED:
+ fprintf(stderr, " %d finished ", cur->id); break;
+
+ }
+ fprintf(stderr, "\n");
+ //fprintf(stderr, cur->s);
+ }
+}
+
+void queue_finish(struct line *node, char *s, int fid) {
+ struct line *next;
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+
+ free(node->s);
+ node->s = s;
+ node->status = STATUS_FINISHED;
+ n_received++;
+
+ /* Flush out finished nodes */
+ while (head && head->status == STATUS_FINISHED) {
+
+ if (log_mutex) fprintf(stderr, " Flushing finished node %d\n", head->id);
+
+ fputs(head->s, stdout);
+ fflush(stdout);
+ if (log_mutex) fprintf(stderr, " Flushed node %d\n", head->id);
+ free(head->s);
+
+ next = head->next;
+ free(head);
+
+ head = next;
+
+ n_flushed++;
+
+ if (head == NULL) { /* empty queue */
+ if (ptail == NULL) { /* This can only happen if set in queue_get as signal that there is no more input. */
+ fprintf(stderr, "All sentences finished. Exiting.\n");
+ done(0);
+ } else /* ptail pointed at something which was just popped off the stack -- reset to head*/
+ ptail = &head;
+ }
+ }
+
+ if (log_mutex) fprintf(stderr, " Flushing output %d\n", head->id);
+ fflush(stdout);
+ fprintf(stderr, "%d sentences sent, %d sentences finished, %d sentences flushed\n", n_sent, n_received, n_flushed);
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+}
+
+char * read_line(int fd, int multiline) {
+ int size = 80;
+ char errorbuf[100];
+ char *s = (char*)malloc(size+2);
+ int result, errors=0;
+ int i = 0;
+
+ result = read(fd, s+i, 1);
+
+ while (1) {
+ if (result < 0) {
+ perror("read()");
+ sprintf(errorbuf, "Error code: %d\n", errno);
+ fputs(errorbuf, stderr);
+ errors++;
+ if (errors > 5) {
+ free(s);
+ return NULL;
+ } else {
+ sleep(1); /* retry after delay */
+ }
+ } else if (result == 0) {
+ break;
+ } else if (multiline==0 && s[i] == '\n') {
+ break;
+ } else {
+ if (s[i] == '\n'){
+ /* if we've reached this point,
+ then multiline must be 1, and we're
+ going to poll the fd for an additional
+ line of data. The basic design is to
+ run a select on the filedescriptor fd.
+ Select will return under two conditions:
+ if there is data on the fd, or if a
+ timeout is reached. We'll select on this
+ fd. If select returns because there's data
+ ready, keep going; else assume there's no
+ more and return the data we already have.
+ */
+
+ fd_set set;
+ FD_ZERO(&set);
+ FD_SET(fd, &set);
+
+ struct timeval timeout;
+ timeout.tv_sec = 3; // number of seconds for timeout
+ timeout.tv_usec = 0;
+
+ int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
+ if (ready<1){
+ break; // no more data, stop looping
+ }
+ }
+ i++;
+
+ if (i == size) {
+ size = size*2;
+ s = (char*)realloc(s, size+2);
+ }
+ }
+
+ result = read(fd, s+i, 1);
+ }
+
+ if (result == 0 && i == 0) { /* end of file */
+ free(s);
+ return NULL;
+ }
+
+ s[i] = '\n';
+ s[i+1] = '\0';
+
+ return s;
+}
+
+void * new_client(void *arg) {
+ struct clientinfo *client = (struct clientinfo *)arg;
+ struct line *cur;
+ int result;
+ char *s;
+ char errorbuf[100];
+
+ pthread_mutex_lock(&clients_mutex);
+ n_clients++;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client connected (%d connected)\n", n_clients);
+
+ for (;;) {
+
+ cur = queue_get(client->s);
+
+ if (cur) {
+ /* fprintf(stderr, "Sending to client: %s", cur->s); */
+ fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s);
+ result = write(client->s, cur->s, strlen(cur->s));
+ if (result < strlen(cur->s)){
+ perror("write()");
+ sprintf(errorbuf, "Error code: %d\n", errno);
+ fputs(errorbuf, stderr);
+
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client died (%d connected)\n", n_clients);
+ queue_abort(cur, client->s);
+
+ close(client->s);
+ free(client);
+
+ pthread_exit(NULL);
+ }
+ } else {
+ close(client->s);
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+ fprintf(stderr, "Client dismissed (%d connected)\n", n_clients);
+ pthread_exit(NULL);
+ }
+
+ s = read_line(client->s,expect_multiline_output);
+ if (s) {
+ /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */
+ fprintf(stderr, "Client (fid %d) returned data %d\n", client->s, cur->id);
+// queue_print();
+ queue_finish(cur, s, client->s);
+ } else {
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client died (%d connected)\n", n_clients);
+ queue_abort(cur, client->s);
+
+ close(client->s);
+ free(client);
+
+ pthread_exit(NULL);
+ }
+
+ }
+ return 0;
+}
+
+void done (int code) {
+ close(s);
+ exit(code);
+}
+
+
+
+int main (int argc, char *argv[]) {
+ struct sockaddr_in sin, from;
+ int g;
+ socklen_t len;
+ struct clientinfo *client;
+ int port;
+ int opt;
+ int errors = 0;
+ int argi;
+ char *key = NULL, *client_key;
+ int use_key = 0;
+ /* the key stuff here doesn't provide any
+ real measure of security, it's mainly to keep
+ jobs from bumping into each other. */
+
+ pthread_t tid;
+ port = DEFAULT_PORT;
+
+ for (argi=1; argi < argc; argi++){
+ if (strcmp(argv[argi], "-m")==0){
+ expect_multiline_output = 1;
+ } else if (strcmp(argv[argi], "-k")==0){
+ argi++;
+ if (argi == argc){
+ fprintf(stderr, "Key must be specified after -k\n");
+ exit(1);
+ }
+ key = argv[argi];
+ use_key = 1;
+ } else if (strcmp(argv[argi], "--stay-alive")==0){
+ stay_alive = 1; /* dont panic and die with zero clients */
+ } else {
+ port = atoi(argv[argi]);
+ }
+ }
+
+ /* Initialize data structures */
+ head = NULL;
+ ptail = &head;
+
+ /* Set up listener */
+ s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ opt = 1;
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = htonl(INADDR_ANY);
+ sin.sin_port = htons(port);
+ while (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) {
+ perror("bind()");
+ sleep(1);
+ errors++;
+ if (errors > 100)
+ exit(1);
+ }
+
+ len = sizeof(sin);
+ getsockname(s, (struct sockaddr *) &sin, &len);
+
+ fprintf(stderr, "Listening on port %hu\n", ntohs(sin.sin_port));
+
+ while (listen(s, MAX_CLIENTS) < 0) {
+ perror("listen()");
+ sleep(1);
+ errors++;
+ if (errors > 100)
+ exit(1);
+ }
+
+ for (;;) {
+ len = sizeof(from);
+ g = accept(s, (struct sockaddr *)&from, &len);
+ if (g < 0) {
+ perror("accept()");
+ sleep(1);
+ continue;
+ }
+ client = (clientinfo*)malloc(sizeof(struct clientinfo));
+ client->s = g;
+ bcopy(&from, &client->sin, len);
+
+ if (use_key){
+ fd_set set;
+ FD_ZERO(&set);
+ FD_SET(client->s, &set);
+
+ struct timeval timeout;
+ timeout.tv_sec = 3; // number of seconds for timeout
+ timeout.tv_usec = 0;
+
+ int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
+ if (ready<1){
+ fprintf(stderr, "Prospective client failed to respond with correct key.\n");
+ close(client->s);
+ free(client);
+ } else {
+ client_key = read_line(client->s,0);
+ client_key[strlen(client_key)-1]='\0'; /* chop trailing newline */
+ if (strcmp(key, client_key)==0){
+ pthread_create(&tid, NULL, new_client, client);
+ } else {
+ fprintf(stderr, "Prospective client failed to respond with correct key.\n");
+ close(client->s);
+ free(client);
+ }
+ free(client_key);
+ }
+ } else {
+ pthread_create(&tid, NULL, new_client, client);
+ }
+ }
+
+}
+
+
+