summaryrefslogtreecommitdiff
path: root/vest
diff options
context:
space:
mode:
Diffstat (limited to 'vest')
-rw-r--r--vest/Makefile.am8
-rwxr-xr-xvest/dist-vest.pl22
-rwxr-xr-xvest/parallelize.pl255
-rw-r--r--vest/sentclient.c76
-rw-r--r--vest/sentserver.c515
-rw-r--r--vest/sentserver.h6
6 files changed, 870 insertions, 12 deletions
diff --git a/vest/Makefile.am b/vest/Makefile.am
index 6815ae75..99bd6430 100644
--- a/vest/Makefile.am
+++ b/vest/Makefile.am
@@ -3,7 +3,9 @@ bin_PROGRAMS = \
mr_vest_map \
mr_vest_reduce \
mr_vest_generate_mapper_input \
- fast_score
+ fast_score \
+ sentserver \
+ sentclient
if HAVE_GTEST
noinst_PROGRAMS = \
@@ -11,6 +13,10 @@ noinst_PROGRAMS = \
lo_test
endif
+sentserver_SOURCES = sentserver.c
+
+sentclient_SOURCES = sentclient.c
+
mbr_kbest_SOURCES = mbr_kbest.cc ter.cc comb_scorer.cc aer_scorer.cc scorer.cc viterbi_envelope.cc
mbr_kbest_LDADD = $(top_srcdir)/decoder/libcdec.a -lz
diff --git a/vest/dist-vest.pl b/vest/dist-vest.pl
index dca2f06b..22d6478a 100755
--- a/vest/dist-vest.pl
+++ b/vest/dist-vest.pl
@@ -6,6 +6,8 @@ use Getopt::Long;
use IPC::Open2;
use strict;
use POSIX ":sys_wait_h";
+#my $QSUB_FLAGS = "-q batch -l pmem=3000mb,walltime=5:00:00";
+my $QSUB_FLAGS = "-l mem_free=9G";
# Default settings
my $srcFile;
@@ -17,12 +19,13 @@ die "Can't execute $FAST_SCORE" unless -x $FAST_SCORE;
my $MAPINPUT = "$bin_dir/mr_vest_generate_mapper_input";
my $MAPPER = "$bin_dir/mr_vest_map";
my $REDUCER = "$bin_dir/mr_vest_reduce";
+my $parallelize = "$bin_dir/parallelize.pl";
my $SCORER = $FAST_SCORE;
die "Can't find $MAPPER" unless -x $MAPPER;
my $cdec = "$bin_dir/../decoder/cdec";
die "Can't find decoder in $cdec" unless -x $cdec;
+die "Can't find $parallelize" unless -x $parallelize;
my $decoder = $cdec;
-my $DISCARD_FORESTS = 0;
my $lines_per_mapper = 400;
my $rand_directions = 15;
my $iteration = 1;
@@ -157,7 +160,6 @@ $SIG{HUP} = "cleanup";
my $decoderBase = `basename $decoder`; chomp $decoderBase;
my $newIniFile = "$dir/$decoderBase.ini";
-my $parallelize = '/chomes/redpony/svn-trunk/sa-utils/parallelize.pl';
my $inputFileName = "$dir/input";
my $user = $ENV{"USER"};
@@ -254,12 +256,8 @@ while (1){
}
# run optimizer
- print LOGFILE "\nUNION FORESTS\n";
print LOGFILE `date`;
my $mergeLog="$logdir/prune-merge.log.$iteration";
- if ($DISCARD_FORESTS) {
- `rm -f $dir/hgs/*gz`;
- }
my $score = 0;
my $icc = 0;
@@ -314,7 +312,7 @@ while (1){
die;
}
} else {
- my $todo = "qsub -q batch -l pmem=3000mb,walltime=5:00:00 -N $client_name -o /dev/null -e $logdir/$client_name.ER";
+ my $todo = "qsub $QSUB_FLAGS -S /bin/bash -N $client_name -o /dev/null -e $logdir/$client_name.ER";
local(*QOUT, *QIN);
open2(\*QOUT, \*QIN, $todo);
print QIN $script;
@@ -323,9 +321,10 @@ while (1){
$nmappers++;
while (my $jobid=<QOUT>){
chomp $jobid;
- push(@cleanupcmds, "`qdel $jobid 2> /dev/null`");
$jobid =~ s/^(\d+)(.*?)$/\1/g;
- print STDERR "short job id $jobid\n";
+ $jobid =~ s/^Your job (\d+) .*$/\1/;
+ push(@cleanupcmds, "`qdel $jobid 2> /dev/null`");
+ print LOGFILE " $jobid";
if ($joblist == "") { $joblist = $jobid; }
else {$joblist = $joblist . "\|" . $jobid; }
}
@@ -334,7 +333,8 @@ while (1){
}
if ($run_local) {
} else {
- print LOGFILE "Launched $nmappers mappers.\n";
+ print LOGFILE "\nLaunched $nmappers mappers.\n";
+ sleep 10;
print LOGFILE "Waiting for mappers to complete...\n";
while ($nmappers > 0) {
sleep 5;
@@ -353,7 +353,7 @@ while (1){
die "$mo: output lines ($olines) doesn't match input lines ($ilines)" unless $olines==$ilines;
}
print LOGFILE "Results for $tol/$til lines\n";
- print LOGFILE "\nSORTING AND RUNNING FMERT REDUCER\n";
+ print LOGFILE "\nSORTING AND RUNNING VEST REDUCER\n";
print LOGFILE `date`;
$cmd="sort -t \$'\\t' -k 1 @mapoutputs | $REDUCER -l $metric > $dir/redoutput.$im1";
print LOGFILE "COMMAND:\n$cmd\n";
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
new file mode 100755
index 00000000..f01232c8
--- /dev/null
+++ b/vest/parallelize.pl
@@ -0,0 +1,255 @@
+#!/usr/bin/env perl
+
+# Author: Adam Lopez
+#
+# This script takes a command that processes input
+# from stdin one-line-at-time, and parallelizes it
+# on the cluster using David Chiang's sentserver/
+# sentclient architecture.
+#
+# Prerequisites: the command *must* read each line
+# without waiting for subsequent lines of input
+# (for instance, a command which must read all lines
+# of input before processing will not work) and
+# return it to the output *without* buffering
+# multiple lines.
+
+use Getopt::Long;
+use IPC::Open2;
+use strict;
+use POSIX ":sys_wait_h";
+
+my $recycle_clients; # spawn new clients when previous ones terminate
+my $stay_alive; # dont let server die when having zero clients
+my $joblist = "";
+my $errordir;
+my $multiline;
+my @files_to_stage;
+my $verbose = 1;
+my $nodelist;
+my $user = $ENV{"USER"};
+my $pmem = "2g";
+
+sub print_help;
+
+# Process command-line options
+if (GetOptions(
+ "stay-alive" => \$stay_alive,
+ "recycle-clients" => \$recycle_clients,
+ "error-dir=s" => \$errordir,
+ "multi-line" => \$multiline,
+ "file=s" => \@files_to_stage,
+ "verbose" => \$verbose,
+ "nodelist=s" => \$nodelist,
+ "pmem=s" => \$pmem
+) == 0 || @ARGV == 0){
+ print_help();
+}
+
+my @nodes = grep(/^[cd]\d\d$/, split(/\n/, `pbsnodes -a`));
+if ($nodelist){
+ @nodes = split(/ /, $nodelist);
+}
+
+if ($verbose){
+ print STDERR "Compute nodes: @nodes\n";
+}
+
+my $cmd = "";
+for my $arg (@ARGV){
+ if ($arg=~ /\s/){
+ $cmd .= "\"$arg\" ";
+ } else {
+ $cmd .= "$arg "
+ }
+}
+
+if ($errordir){
+ `mkdir -p $errordir`;
+}
+
+if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; }
+
+# set cleanup handler
+my @cleanup_cmds;
+sub cleanup;
+sub cleanup_and_die;
+$SIG{INT} = "cleanup_and_die";
+$SIG{TERM} = "cleanup_and_die";
+$SIG{HUP} = "cleanup_and_die";
+
+# other subs:
+sub numof_live_jobs;
+sub launch_job_on_node;
+
+
+# vars
+my $mydir = `dirname $0`; chomp $mydir;
+my $sentserver = "$mydir/sentserver";
+my $sentclient = "$mydir/sentclient";
+my $host = `hostname`;
+chomp $host;
+
+my $executable = $cmd;
+$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
+$executable=`basename $executable`;
+chomp $executable;
+
+# find open port
+my $port = 50300;
+if ($verbose){ print STDERR "Testing port $port...";}
+while (`netstat -l | grep $port`){
+ if ($verbose){ print STDERR "port is busy\n";}
+ $port++;
+ if ($port > 50400){
+ die "Unable to find open port\n";
+ }
+ if ($verbose){ print STDERR "Testing port $port... "; }
+}
+if ($verbose){
+ print STDERR "port is available\n";
+}
+
+srand;
+my $key = int(rand()*1000000);
+
+my $multiflag = "";
+if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; }
+my $stay_alive_flag = "";
+if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
+
+my %node_count;
+my $script = "";
+
+# fork == one thread runs the sentserver, while the
+# other spawns the sentclient commands.
+if (my $pid = fork){
+ sleep 5; # give other thread time to start sentserver
+ $script .= "wait\n";
+ $script .= "$sentclient $host:$port:$key $cmd\n";
+ if ($verbose){
+ print STDERR "Client script:\n====\n";
+ print STDERR $script;
+ print STDERR "====\n";
+ }
+ for my $node (@nodes){
+ launch_job_on_node($node);
+ }
+ if ($recycle_clients) {
+ my $ret;
+ my $livejobs;
+ while (1) {
+ $ret = waitpid($pid, WNOHANG);
+ #print STDERR "waitpid $pid ret = $ret \n";
+ if ($ret != 0) {last; } # break
+ $livejobs = numof_live_jobs();
+ if ( $#nodes >= $livejobs ) { # a client terminated
+ my $numof_nodes = scalar @nodes;
+ print STDERR "num of requested nodes = $numof_nodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
+ launch_job_on_node(1); # TODO: support named nodes
+ } else {
+ sleep (60);
+ }
+ }
+ }
+ waitpid($pid, 0);
+ cleanup();
+} else {
+# my $todo = "$sentserver -k $key $multiflag $port ";
+ my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ my $rc = system($todo);
+ if ($rc){
+ die "Error: sentserver returned code $rc\n";
+ }
+}
+
+sub numof_live_jobs {
+ my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
+ return ($#livejobs + 1);
+}
+
+sub launch_job_on_node {
+ my $node = $_[0];
+
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ unless (exists($node_count{$node})){
+ $node_count{$node} = 0;
+ }
+ my $node_count = $node_count{$node};
+ $node_count{$node}++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ }
+ my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ local(*QOUT, *QIN);
+ open2(\*QOUT, \*QIN, $todo);
+ print QIN $script;
+ close QIN;
+ while (my $jobid=<QOUT>){
+ chomp $jobid;
+ if ($verbose){ print STDERR "Launched client job: $jobid"; }
+ push(@cleanup_cmds, "`qdel $jobid 2> /dev/null`");
+ $jobid =~ s/^(\d+)(.*?)$/\1/g;
+ print STDERR "short job id $jobid\n";
+ if ($joblist == "") { $joblist = $jobid; }
+ else {$joblist = $joblist . "\|" . $jobid; }
+ }
+ close QOUT;
+}
+
+
+sub cleanup_and_die {
+ cleanup();
+ die "\n";
+}
+
+sub cleanup {
+ if ($verbose){ print STDERR "Cleaning up...\n"; }
+ for $cmd (@cleanup_cmds){
+ if ($verbose){ print STDERR " $cmd\n"; }
+ eval $cmd;
+ }
+ if ($verbose){ print STDERR "Cleanup finished.\n"; }
+}
+
+sub print_help
+{
+ my $name = `basename $0`; chomp $name;
+ print << "Help";
+
+usage: $name [options]
+
+ Automatic black-box parallelization of commands.
+
+options:
+
+ -e, --error-dir <dir>
+ Retain output files from jobs in <dir>, rather
+ than silently deleting them.
+
+ -m, --multi-line
+ Expect that command may produce multiple output
+ lines for a single input line. $name makes a
+ reasonable attempt to obtain all output before
+ processing additional inputs. However, use of this
+ option is inherently unsafe.
+
+ -v, --verbose
+ Print diagnostic informatoin on stderr.
+
+ -n, --nodelist
+ Space-delimited list of nodes to request. There is
+ one qsub command per node.
+
+ -p, --pmem
+ pmem setting for each job.
+
+Help
+}
diff --git a/vest/sentclient.c b/vest/sentclient.c
new file mode 100644
index 00000000..91d994ab
--- /dev/null
+++ b/vest/sentclient.c
@@ -0,0 +1,76 @@
+/* Copyright (c) 2001 by David Chiang. All rights reserved.*/
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <string.h>
+
+#include "sentserver.h"
+
+int main (int argc, char *argv[]) {
+ int sock, port;
+ char *s, *key;
+ struct hostent *hp;
+ struct sockaddr_in server;
+ int errors = 0;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: sentclient host[:port[:key]] command [args ...]\n");
+ exit(1);
+ }
+
+ s = strchr(argv[1], ':');
+ key = NULL;
+
+ if (s == NULL) {
+ port = DEFAULT_PORT;
+ } else {
+ *s = '\0';
+ s+=1;
+ /* dumb hack */
+ key = strchr(s, ':');
+ if (key != NULL){
+ *key = '\0';
+ key += 1;
+ }
+ port = atoi(s);
+ }
+
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+
+ hp = gethostbyname(argv[1]);
+ if (hp == NULL) {
+ fprintf(stderr, "unknown host %s\n", argv[1]);
+ exit(1);
+ }
+
+ bzero((char *)&server, sizeof(server));
+ bcopy(hp->h_addr, (char *)&server.sin_addr, hp->h_length);
+ server.sin_family = hp->h_addrtype;
+ server.sin_port = htons(port);
+
+ while (connect(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
+ perror("connect()");
+ sleep(1);
+ errors++;
+ if (errors > 5)
+ exit(1);
+ }
+
+ close(0);
+ close(1);
+ dup2(sock, 0);
+ dup2(sock, 1);
+
+ if (key != NULL){
+ write(1, key, strlen(key));
+ write(1, "\n", 1);
+ }
+
+ execvp(argv[2], argv+2);
+ return 0;
+}
diff --git a/vest/sentserver.c b/vest/sentserver.c
new file mode 100644
index 00000000..f7458b6d
--- /dev/null
+++ b/vest/sentserver.c
@@ -0,0 +1,515 @@
+/* Copyright (c) 2001 by David Chiang. All rights reserved.*/
+
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <stdio.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <netinet/in.h>
+#include <sched.h>
+#include <pthread.h>
+#include <errno.h>
+
+#include "sentserver.h"
+
+#define MAX_CLIENTS 32
+
+struct clientinfo {
+ int s;
+ struct sockaddr_in sin;
+};
+
+struct line {
+ int id;
+ char *s;
+ int status;
+ struct line *next;
+} *head, **ptail;
+
+int n_sent = 0, n_received=0, n_flushed=0;
+
+#define STATUS_RUNNING 0
+#define STATUS_ABORTED 1
+#define STATUS_FINISHED 2
+
+pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t input_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+int n_clients = 0;
+int s;
+int expect_multiline_output = 0;
+int log_mutex = 0;
+int stay_alive = 0; /* dont panic and die with zero clients */
+
+void queue_finish(struct line *node, char *s, int fid);
+char * read_line(int fd, int multiline);
+void done (int code);
+
+struct line * queue_get(int fid) {
+ struct line *cur;
+ char *s, *synch;
+
+ if (log_mutex) fprintf(stderr, "Getting for data for fid %d\n", fid);
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+
+ /* First, check for aborted sentences. */
+
+ if (log_mutex) fprintf(stderr, " Checking queue for aborted jobs (fid %d)\n", fid);
+ for (cur = head; cur != NULL; cur = cur->next) {
+ if (cur->status == STATUS_ABORTED) {
+ cur->status = STATUS_RUNNING;
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ return cur;
+ }
+ }
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ /* Otherwise, read a new one. */
+ if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid);
+ if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid);
+ pthread_mutex_lock(&input_mutex);
+ s = read_line(0,0);
+
+ while (s) {
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);
+ pthread_mutex_unlock(&input_mutex);
+
+ cur = malloc(sizeof (struct line));
+ cur->id = n_sent;
+ cur->s = s;
+ cur->next = NULL;
+
+ *ptail = cur;
+ ptail = &cur->next;
+
+ n_sent++;
+
+ if (strcmp(s,"===SYNCH===\n")==0){
+ fprintf(stderr, "Received ===SYNCH=== signal (fid %d)\n", fid);
+ // Note: queue_finish calls free(cur->s).
+ // Therefore we need to create a new string here.
+ synch = malloc((strlen("===SYNCH===\n")+2) * sizeof (char));
+ synch = strcpy(synch, s);
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+ queue_finish(cur, synch, fid); /* handles its own lock */
+
+ if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid);
+ if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid);
+ pthread_mutex_lock(&input_mutex);
+
+ s = read_line(0,0);
+ } else {
+ if (log_mutex) fprintf(stderr, " Received new data %d (fid %d)\n", cur->id, fid);
+ cur->status = STATUS_RUNNING;
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+ return cur;
+ }
+ }
+
+ if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);
+ pthread_mutex_unlock(&input_mutex);
+ /* Only way to reach this point: no more output */
+
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ if (head == NULL) {
+ fprintf(stderr, "Reached end of file. Exiting.\n");
+ done(0);
+ } else
+ ptail = NULL; /* This serves as a signal that there is no more input */
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+ return NULL;
+}
+
+void queue_panic() {
+ struct line *next;
+ while (head && head->status == STATUS_FINISHED) {
+ /* Write out finished sentences */
+ if (head->status == STATUS_FINISHED) {
+ fputs(head->s, stdout);
+ fflush(stdout);
+ }
+ /* Write out blank line for unfinished sentences */
+ if (head->status == STATUS_ABORTED) {
+ fputs("\n", stdout);
+ fflush(stdout);
+ }
+ /* By defition, there cannot be any RUNNING sentences, since
+ function is only called when n_clients == 0 */
+ free(head->s);
+ next = head->next;
+ free(head);
+ head = next;
+ n_flushed++;
+ }
+ fclose(stdout);
+ fprintf(stderr, "All clients died. Panicking, flushing completed sentences and exiting.\n");
+ done(1);
+}
+
+void queue_abort(struct line *node, int fid) {
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+ node->status = STATUS_ABORTED;
+ if (n_clients == 0) {
+ if (stay_alive) {
+ fprintf(stderr, "Warning! No live clients detected! Staying alive, will retry soon.\n");
+ } else {
+ queue_panic();
+ }
+ }
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+}
+
+
+void queue_print() {
+ struct line *cur;
+
+ fprintf(stderr, " Queue\n");
+
+ for (cur = head; cur != NULL; cur = cur->next) {
+ switch(cur->status) {
+ case STATUS_RUNNING:
+ fprintf(stderr, " %d running ", cur->id); break;
+ case STATUS_ABORTED:
+ fprintf(stderr, " %d aborted ", cur->id); break;
+ case STATUS_FINISHED:
+ fprintf(stderr, " %d finished ", cur->id); break;
+
+ }
+ fprintf(stderr, "\n");
+ //fprintf(stderr, cur->s);
+ }
+}
+
+void queue_finish(struct line *node, char *s, int fid) {
+ struct line *next;
+ if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid);
+ pthread_mutex_lock(&queue_mutex);
+
+ free(node->s);
+ node->s = s;
+ node->status = STATUS_FINISHED;
+ n_received++;
+
+ /* Flush out finished nodes */
+ while (head && head->status == STATUS_FINISHED) {
+
+ if (log_mutex) fprintf(stderr, " Flushing finished node %d\n", head->id);
+
+ fputs(head->s, stdout);
+ fflush(stdout);
+ if (log_mutex) fprintf(stderr, " Flushed node %d\n", head->id);
+ free(head->s);
+
+ next = head->next;
+ free(head);
+
+ head = next;
+
+ n_flushed++;
+
+ if (head == NULL) { /* empty queue */
+ if (ptail == NULL) { /* This can only happen if set in queue_get as signal that there is no more input. */
+ fprintf(stderr, "All sentences finished. Exiting.\n");
+ done(0);
+ } else /* ptail pointed at something which was just popped off the stack -- reset to head*/
+ ptail = &head;
+ }
+ }
+
+ if (log_mutex) fprintf(stderr, " Flushing output %d\n", head->id);
+ fflush(stdout);
+ fprintf(stderr, "%d sentences sent, %d sentences finished, %d sentences flushed\n", n_sent, n_received, n_flushed);
+
+ if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid);
+ pthread_mutex_unlock(&queue_mutex);
+
+}
+
+char * read_line(int fd, int multiline) {
+ int size = 80;
+ char errorbuf[100];
+ char *s = malloc(size+2);
+ int result, errors=0;
+ int i = 0;
+
+ result = read(fd, s+i, 1);
+
+ while (1) {
+ if (result < 0) {
+ perror("read()");
+ sprintf(errorbuf, "Error code: %d\n", errno);
+ fprintf(stderr, errorbuf);
+ errors++;
+ if (errors > 5) {
+ free(s);
+ return NULL;
+ } else {
+ sleep(1); /* retry after delay */
+ }
+ } else if (result == 0) {
+ break;
+ } else if (multiline==0 && s[i] == '\n') {
+ break;
+ } else {
+ if (s[i] == '\n'){
+ /* if we've reached this point,
+ then multiline must be 1, and we're
+ going to poll the fd for an additional
+ line of data. The basic design is to
+ run a select on the filedescriptor fd.
+ Select will return under two conditions:
+ if there is data on the fd, or if a
+ timeout is reached. We'll select on this
+ fd. If select returns because there's data
+ ready, keep going; else assume there's no
+ more and return the data we already have.
+ */
+
+ fd_set set;
+ FD_ZERO(&set);
+ FD_SET(fd, &set);
+
+ struct timeval timeout;
+ timeout.tv_sec = 3; // number of seconds for timeout
+ timeout.tv_usec = 0;
+
+ int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
+ if (ready<1){
+ break; // no more data, stop looping
+ }
+ }
+ i++;
+
+ if (i == size) {
+ size = size*2;
+ s = realloc(s, size+2);
+ }
+ }
+
+ result = read(fd, s+i, 1);
+ }
+
+ if (result == 0 && i == 0) { /* end of file */
+ free(s);
+ return NULL;
+ }
+
+ s[i] = '\n';
+ s[i+1] = '\0';
+
+ return s;
+}
+
+void * new_client(void *arg) {
+ struct clientinfo *client = (struct clientinfo *)arg;
+ struct line *cur;
+ int result;
+ char *s;
+ char errorbuf[100];
+
+ pthread_mutex_lock(&clients_mutex);
+ n_clients++;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client connected (%d connected)\n", n_clients);
+
+ for (;;) {
+
+ cur = queue_get(client->s);
+
+ if (cur) {
+ /* fprintf(stderr, "Sending to client: %s", cur->s); */
+ fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s);
+ result = write(client->s, cur->s, strlen(cur->s));
+ if (result < strlen(cur->s)){
+ perror("write()");
+ sprintf(errorbuf, "Error code: %d\n", errno);
+ fprintf(stderr, errorbuf);
+
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client died (%d connected)\n", n_clients);
+ queue_abort(cur, client->s);
+
+ close(client->s);
+ free(client);
+
+ pthread_exit(NULL);
+ }
+ } else {
+ close(client->s);
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+ fprintf(stderr, "Client dismissed (%d connected)\n", n_clients);
+ pthread_exit(NULL);
+ }
+
+ s = read_line(client->s,expect_multiline_output);
+ if (s) {
+ /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */
+ fprintf(stderr, "Client (fid %d) returned data %d\n", client->s, cur->id);
+// queue_print();
+ queue_finish(cur, s, client->s);
+ } else {
+ pthread_mutex_lock(&clients_mutex);
+ n_clients--;
+ pthread_mutex_unlock(&clients_mutex);
+
+ fprintf(stderr, "Client died (%d connected)\n", n_clients);
+ queue_abort(cur, client->s);
+
+ close(client->s);
+ free(client);
+
+ pthread_exit(NULL);
+ }
+
+ }
+
+}
+
+void done (int code) {
+ close(s);
+ exit(code);
+}
+
+
+
+int main (int argc, char *argv[]) {
+ struct sockaddr_in sin, from;
+ int g;
+ socklen_t len;
+ struct clientinfo *client;
+ int port;
+ int opt;
+ int errors = 0;
+ int argi;
+ char *key = NULL, *client_key;
+ int use_key = 0;
+ /* the key stuff here doesn't provide any
+ real measure of security, it's mainly to keep
+ jobs from bumping into each other. */
+
+ pthread_t tid;
+ port = DEFAULT_PORT;
+
+ for (argi=1; argi < argc; argi++){
+ if (strcmp(argv[argi], "-m")==0){
+ expect_multiline_output = 1;
+ } else if (strcmp(argv[argi], "-k")==0){
+ argi++;
+ if (argi == argc){
+ fprintf(stderr, "Key must be specified after -k\n");
+ exit(1);
+ }
+ key = argv[argi];
+ use_key = 1;
+ } else if (strcmp(argv[argi], "--stay-alive")==0){
+ stay_alive = 1; /* dont panic and die with zero clients */
+ } else {
+ port = atoi(argv[argi]);
+ }
+ }
+
+ /* Initialize data structures */
+ head = NULL;
+ ptail = &head;
+
+ /* Set up listener */
+ s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ opt = 1;
+ setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
+
+ sin.sin_family = AF_INET;
+ sin.sin_addr.s_addr = htonl(INADDR_ANY);
+ sin.sin_port = htons(port);
+ while (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) {
+ perror("bind()");
+ sleep(1);
+ errors++;
+ if (errors > 100)
+ exit(1);
+ }
+
+ len = sizeof(sin);
+ getsockname(s, (struct sockaddr *) &sin, &len);
+
+ fprintf(stderr, "Listening on port %hd\n", ntohs(sin.sin_port));
+
+ while (listen(s, MAX_CLIENTS) < 0) {
+ perror("listen()");
+ sleep(1);
+ errors++;
+ if (errors > 100)
+ exit(1);
+ }
+
+ for (;;) {
+ len = sizeof(from);
+ g = accept(s, (struct sockaddr *)&from, &len);
+ if (g < 0) {
+ perror("accept()");
+ sleep(1);
+ continue;
+ }
+ client = malloc(sizeof(struct clientinfo));
+ client->s = g;
+ bcopy(&from, &client->sin, len);
+
+ if (use_key){
+ fd_set set;
+ FD_ZERO(&set);
+ FD_SET(client->s, &set);
+
+ struct timeval timeout;
+ timeout.tv_sec = 3; // number of seconds for timeout
+ timeout.tv_usec = 0;
+
+ int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout);
+ if (ready<1){
+ fprintf(stderr, "Prospective client failed to respond with correct key.\n");
+ close(client->s);
+ free(client);
+ } else {
+ client_key = read_line(client->s,0);
+ client_key[strlen(client_key)-1]='\0'; /* chop trailing newline */
+ if (strcmp(key, client_key)==0){
+ pthread_create(&tid, NULL, new_client, client);
+ } else {
+ fprintf(stderr, "Prospective client failed to respond with correct key.\n");
+ close(client->s);
+ free(client);
+ }
+ free(client_key);
+ }
+ } else {
+ pthread_create(&tid, NULL, new_client, client);
+ }
+ }
+
+}
+
+
+
diff --git a/vest/sentserver.h b/vest/sentserver.h
new file mode 100644
index 00000000..cd17a546
--- /dev/null
+++ b/vest/sentserver.h
@@ -0,0 +1,6 @@
+#ifndef SENTSERVER_H
+#define SENTSERVER_H
+
+#define DEFAULT_PORT 50000
+
+#endif