diff options
author | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-06-25 21:36:43 +0000 |
---|---|---|
committer | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-06-25 21:36:43 +0000 |
commit | 15753484522766f059152611a0c7c62a530a89dd (patch) | |
tree | 523683290ef67577557806567f2ac33a4a0f6c3a | |
parent | 8f44c98e72f3a3163a011bfe2313fb85b8325116 (diff) |
mert fixes, support scripts
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@29 ec762483-ff6d-05da-a07a-a48fb63a330f
-rw-r--r-- | vest/Makefile.am | 8 | ||||
-rwxr-xr-x | vest/dist-vest.pl | 22 | ||||
-rwxr-xr-x | vest/parallelize.pl | 255 | ||||
-rw-r--r-- | vest/sentclient.c | 76 | ||||
-rw-r--r-- | vest/sentserver.c | 515 | ||||
-rw-r--r-- | vest/sentserver.h | 6 |
6 files changed, 870 insertions, 12 deletions
diff --git a/vest/Makefile.am b/vest/Makefile.am index 6815ae75..99bd6430 100644 --- a/vest/Makefile.am +++ b/vest/Makefile.am @@ -3,7 +3,9 @@ bin_PROGRAMS = \ mr_vest_map \ mr_vest_reduce \ mr_vest_generate_mapper_input \ - fast_score + fast_score \ + sentserver \ + sentclient if HAVE_GTEST noinst_PROGRAMS = \ @@ -11,6 +13,10 @@ noinst_PROGRAMS = \ lo_test endif +sentserver_SOURCES = sentserver.c + +sentclient_SOURCES = sentclient.c + mbr_kbest_SOURCES = mbr_kbest.cc ter.cc comb_scorer.cc aer_scorer.cc scorer.cc viterbi_envelope.cc mbr_kbest_LDADD = $(top_srcdir)/decoder/libcdec.a -lz diff --git a/vest/dist-vest.pl b/vest/dist-vest.pl index dca2f06b..22d6478a 100755 --- a/vest/dist-vest.pl +++ b/vest/dist-vest.pl @@ -6,6 +6,8 @@ use Getopt::Long; use IPC::Open2; use strict; use POSIX ":sys_wait_h"; +#my $QSUB_FLAGS = "-q batch -l pmem=3000mb,walltime=5:00:00"; +my $QSUB_FLAGS = "-l mem_free=9G"; # Default settings my $srcFile; @@ -17,12 +19,13 @@ die "Can't execute $FAST_SCORE" unless -x $FAST_SCORE; my $MAPINPUT = "$bin_dir/mr_vest_generate_mapper_input"; my $MAPPER = "$bin_dir/mr_vest_map"; my $REDUCER = "$bin_dir/mr_vest_reduce"; +my $parallelize = "$bin_dir/parallelize.pl"; my $SCORER = $FAST_SCORE; die "Can't find $MAPPER" unless -x $MAPPER; my $cdec = "$bin_dir/../decoder/cdec"; die "Can't find decoder in $cdec" unless -x $cdec; +die "Can't find $parallelize" unless -x $parallelize; my $decoder = $cdec; -my $DISCARD_FORESTS = 0; my $lines_per_mapper = 400; my $rand_directions = 15; my $iteration = 1; @@ -157,7 +160,6 @@ $SIG{HUP} = "cleanup"; my $decoderBase = `basename $decoder`; chomp $decoderBase; my $newIniFile = "$dir/$decoderBase.ini"; -my $parallelize = '/chomes/redpony/svn-trunk/sa-utils/parallelize.pl'; my $inputFileName = "$dir/input"; my $user = $ENV{"USER"}; @@ -254,12 +256,8 @@ while (1){ } # run optimizer - print LOGFILE "\nUNION FORESTS\n"; print LOGFILE `date`; my $mergeLog="$logdir/prune-merge.log.$iteration"; - if ($DISCARD_FORESTS) { - `rm -f $dir/hgs/*gz`; - } my $score = 0; my $icc = 0; @@ -314,7 +312,7 @@ while (1){ die; } } else { - my $todo = "qsub -q batch -l pmem=3000mb,walltime=5:00:00 -N $client_name -o /dev/null -e $logdir/$client_name.ER"; + my $todo = "qsub $QSUB_FLAGS -S /bin/bash -N $client_name -o /dev/null -e $logdir/$client_name.ER"; local(*QOUT, *QIN); open2(\*QOUT, \*QIN, $todo); print QIN $script; @@ -323,9 +321,10 @@ while (1){ $nmappers++; while (my $jobid=<QOUT>){ chomp $jobid; - push(@cleanupcmds, "`qdel $jobid 2> /dev/null`"); $jobid =~ s/^(\d+)(.*?)$/\1/g; - print STDERR "short job id $jobid\n"; + $jobid =~ s/^Your job (\d+) .*$/\1/; + push(@cleanupcmds, "`qdel $jobid 2> /dev/null`"); + print LOGFILE " $jobid"; if ($joblist == "") { $joblist = $jobid; } else {$joblist = $joblist . "\|" . $jobid; } } @@ -334,7 +333,8 @@ while (1){ } if ($run_local) { } else { - print LOGFILE "Launched $nmappers mappers.\n"; + print LOGFILE "\nLaunched $nmappers mappers.\n"; + sleep 10; print LOGFILE "Waiting for mappers to complete...\n"; while ($nmappers > 0) { sleep 5; @@ -353,7 +353,7 @@ while (1){ die "$mo: output lines ($olines) doesn't match input lines ($ilines)" unless $olines==$ilines; } print LOGFILE "Results for $tol/$til lines\n"; - print LOGFILE "\nSORTING AND RUNNING FMERT REDUCER\n"; + print LOGFILE "\nSORTING AND RUNNING VEST REDUCER\n"; print LOGFILE `date`; $cmd="sort -t \$'\\t' -k 1 @mapoutputs | $REDUCER -l $metric > $dir/redoutput.$im1"; print LOGFILE "COMMAND:\n$cmd\n"; diff --git a/vest/parallelize.pl b/vest/parallelize.pl new file mode 100755 index 00000000..f01232c8 --- /dev/null +++ b/vest/parallelize.pl @@ -0,0 +1,255 @@ +#!/usr/bin/env perl + +# Author: Adam Lopez +# +# This script takes a command that processes input +# from stdin one-line-at-time, and parallelizes it +# on the cluster using David Chiang's sentserver/ +# sentclient architecture. +# +# Prerequisites: the command *must* read each line +# without waiting for subsequent lines of input +# (for instance, a command which must read all lines +# of input before processing will not work) and +# return it to the output *without* buffering +# multiple lines. + +use Getopt::Long; +use IPC::Open2; +use strict; +use POSIX ":sys_wait_h"; + +my $recycle_clients; # spawn new clients when previous ones terminate +my $stay_alive; # dont let server die when having zero clients +my $joblist = ""; +my $errordir; +my $multiline; +my @files_to_stage; +my $verbose = 1; +my $nodelist; +my $user = $ENV{"USER"}; +my $pmem = "2g"; + +sub print_help; + +# Process command-line options +if (GetOptions( + "stay-alive" => \$stay_alive, + "recycle-clients" => \$recycle_clients, + "error-dir=s" => \$errordir, + "multi-line" => \$multiline, + "file=s" => \@files_to_stage, + "verbose" => \$verbose, + "nodelist=s" => \$nodelist, + "pmem=s" => \$pmem +) == 0 || @ARGV == 0){ + print_help(); +} + +my @nodes = grep(/^[cd]\d\d$/, split(/\n/, `pbsnodes -a`)); +if ($nodelist){ + @nodes = split(/ /, $nodelist); +} + +if ($verbose){ + print STDERR "Compute nodes: @nodes\n"; +} + +my $cmd = ""; +for my $arg (@ARGV){ + if ($arg=~ /\s/){ + $cmd .= "\"$arg\" "; + } else { + $cmd .= "$arg " + } +} + +if ($errordir){ + `mkdir -p $errordir`; +} + +if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; } + +# set cleanup handler +my @cleanup_cmds; +sub cleanup; +sub cleanup_and_die; +$SIG{INT} = "cleanup_and_die"; +$SIG{TERM} = "cleanup_and_die"; +$SIG{HUP} = "cleanup_and_die"; + +# other subs: +sub numof_live_jobs; +sub launch_job_on_node; + + +# vars +my $mydir = `dirname $0`; chomp $mydir; +my $sentserver = "$mydir/sentserver"; +my $sentclient = "$mydir/sentclient"; +my $host = `hostname`; +chomp $host; + +my $executable = $cmd; +$executable =~ s/^\s*(\S+)($|\s.*)/$1/; +$executable=`basename $executable`; +chomp $executable; + +# find open port +my $port = 50300; +if ($verbose){ print STDERR "Testing port $port...";} +while (`netstat -l | grep $port`){ + if ($verbose){ print STDERR "port is busy\n";} + $port++; + if ($port > 50400){ + die "Unable to find open port\n"; + } + if ($verbose){ print STDERR "Testing port $port... "; } +} +if ($verbose){ + print STDERR "port is available\n"; +} + +srand; +my $key = int(rand()*1000000); + +my $multiflag = ""; +if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; } +my $stay_alive_flag = ""; +if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; } + +my %node_count; +my $script = ""; + +# fork == one thread runs the sentserver, while the +# other spawns the sentclient commands. +if (my $pid = fork){ + sleep 5; # give other thread time to start sentserver + $script .= "wait\n"; + $script .= "$sentclient $host:$port:$key $cmd\n"; + if ($verbose){ + print STDERR "Client script:\n====\n"; + print STDERR $script; + print STDERR "====\n"; + } + for my $node (@nodes){ + launch_job_on_node($node); + } + if ($recycle_clients) { + my $ret; + my $livejobs; + while (1) { + $ret = waitpid($pid, WNOHANG); + #print STDERR "waitpid $pid ret = $ret \n"; + if ($ret != 0) {last; } # break + $livejobs = numof_live_jobs(); + if ( $#nodes >= $livejobs ) { # a client terminated + my $numof_nodes = scalar @nodes; + print STDERR "num of requested nodes = $numof_nodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n"; + launch_job_on_node(1); # TODO: support named nodes + } else { + sleep (60); + } + } + } + waitpid($pid, 0); + cleanup(); +} else { +# my $todo = "$sentserver -k $key $multiflag $port "; + my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; + if ($verbose){ print STDERR "Running: $todo\n"; } + my $rc = system($todo); + if ($rc){ + die "Error: sentserver returned code $rc\n"; + } +} + +sub numof_live_jobs { + my @livejobs = grep(/$joblist/, split(/\n/, `qstat`)); + return ($#livejobs + 1); +} + +sub launch_job_on_node { + my $node = $_[0]; + + my $errorfile = "/dev/null"; + my $outfile = "/dev/null"; + unless (exists($node_count{$node})){ + $node_count{$node} = 0; + } + my $node_count = $node_count{$node}; + $node_count{$node}++; + my $clientname = $executable; + $clientname =~ s/^(.{4}).*$/$1/; + $clientname = "$clientname.$node.$node_count"; + if ($errordir){ + $errorfile = "$errordir/$clientname.ER"; + $outfile = "$errordir/$clientname.OU"; + } + my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile"; + if ($verbose){ print STDERR "Running: $todo\n"; } + local(*QOUT, *QIN); + open2(\*QOUT, \*QIN, $todo); + print QIN $script; + close QIN; + while (my $jobid=<QOUT>){ + chomp $jobid; + if ($verbose){ print STDERR "Launched client job: $jobid"; } + push(@cleanup_cmds, "`qdel $jobid 2> /dev/null`"); + $jobid =~ s/^(\d+)(.*?)$/\1/g; + print STDERR "short job id $jobid\n"; + if ($joblist == "") { $joblist = $jobid; } + else {$joblist = $joblist . "\|" . $jobid; } + } + close QOUT; +} + + +sub cleanup_and_die { + cleanup(); + die "\n"; +} + +sub cleanup { + if ($verbose){ print STDERR "Cleaning up...\n"; } + for $cmd (@cleanup_cmds){ + if ($verbose){ print STDERR " $cmd\n"; } + eval $cmd; + } + if ($verbose){ print STDERR "Cleanup finished.\n"; } +} + +sub print_help +{ + my $name = `basename $0`; chomp $name; + print << "Help"; + +usage: $name [options] + + Automatic black-box parallelization of commands. + +options: + + -e, --error-dir <dir> + Retain output files from jobs in <dir>, rather + than silently deleting them. + + -m, --multi-line + Expect that command may produce multiple output + lines for a single input line. $name makes a + reasonable attempt to obtain all output before + processing additional inputs. However, use of this + option is inherently unsafe. + + -v, --verbose + Print diagnostic informatoin on stderr. + + -n, --nodelist + Space-delimited list of nodes to request. There is + one qsub command per node. + + -p, --pmem + pmem setting for each job. + +Help +} diff --git a/vest/sentclient.c b/vest/sentclient.c new file mode 100644 index 00000000..91d994ab --- /dev/null +++ b/vest/sentclient.c @@ -0,0 +1,76 @@ +/* Copyright (c) 2001 by David Chiang. All rights reserved.*/ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <netdb.h> +#include <string.h> + +#include "sentserver.h" + +int main (int argc, char *argv[]) { + int sock, port; + char *s, *key; + struct hostent *hp; + struct sockaddr_in server; + int errors = 0; + + if (argc < 3) { + fprintf(stderr, "Usage: sentclient host[:port[:key]] command [args ...]\n"); + exit(1); + } + + s = strchr(argv[1], ':'); + key = NULL; + + if (s == NULL) { + port = DEFAULT_PORT; + } else { + *s = '\0'; + s+=1; + /* dumb hack */ + key = strchr(s, ':'); + if (key != NULL){ + *key = '\0'; + key += 1; + } + port = atoi(s); + } + + sock = socket(AF_INET, SOCK_STREAM, 0); + + hp = gethostbyname(argv[1]); + if (hp == NULL) { + fprintf(stderr, "unknown host %s\n", argv[1]); + exit(1); + } + + bzero((char *)&server, sizeof(server)); + bcopy(hp->h_addr, (char *)&server.sin_addr, hp->h_length); + server.sin_family = hp->h_addrtype; + server.sin_port = htons(port); + + while (connect(sock, (struct sockaddr *)&server, sizeof(server)) < 0) { + perror("connect()"); + sleep(1); + errors++; + if (errors > 5) + exit(1); + } + + close(0); + close(1); + dup2(sock, 0); + dup2(sock, 1); + + if (key != NULL){ + write(1, key, strlen(key)); + write(1, "\n", 1); + } + + execvp(argv[2], argv+2); + return 0; +} diff --git a/vest/sentserver.c b/vest/sentserver.c new file mode 100644 index 00000000..f7458b6d --- /dev/null +++ b/vest/sentserver.c @@ -0,0 +1,515 @@ +/* Copyright (c) 2001 by David Chiang. All rights reserved.*/ + +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <fcntl.h> +#include <stdio.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/time.h> +#include <netinet/in.h> +#include <sched.h> +#include <pthread.h> +#include <errno.h> + +#include "sentserver.h" + +#define MAX_CLIENTS 32 + +struct clientinfo { + int s; + struct sockaddr_in sin; +}; + +struct line { + int id; + char *s; + int status; + struct line *next; +} *head, **ptail; + +int n_sent = 0, n_received=0, n_flushed=0; + +#define STATUS_RUNNING 0 +#define STATUS_ABORTED 1 +#define STATUS_FINISHED 2 + +pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t input_mutex = PTHREAD_MUTEX_INITIALIZER; + +int n_clients = 0; +int s; +int expect_multiline_output = 0; +int log_mutex = 0; +int stay_alive = 0; /* dont panic and die with zero clients */ + +void queue_finish(struct line *node, char *s, int fid); +char * read_line(int fd, int multiline); +void done (int code); + +struct line * queue_get(int fid) { + struct line *cur; + char *s, *synch; + + if (log_mutex) fprintf(stderr, "Getting for data for fid %d\n", fid); + if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); + pthread_mutex_lock(&queue_mutex); + + /* First, check for aborted sentences. */ + + if (log_mutex) fprintf(stderr, " Checking queue for aborted jobs (fid %d)\n", fid); + for (cur = head; cur != NULL; cur = cur->next) { + if (cur->status == STATUS_ABORTED) { + cur->status = STATUS_RUNNING; + + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + + return cur; + } + } + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + + /* Otherwise, read a new one. */ + if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid); + if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid); + pthread_mutex_lock(&input_mutex); + s = read_line(0,0); + + while (s) { + if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); + pthread_mutex_lock(&queue_mutex); + if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid); + pthread_mutex_unlock(&input_mutex); + + cur = malloc(sizeof (struct line)); + cur->id = n_sent; + cur->s = s; + cur->next = NULL; + + *ptail = cur; + ptail = &cur->next; + + n_sent++; + + if (strcmp(s,"===SYNCH===\n")==0){ + fprintf(stderr, "Received ===SYNCH=== signal (fid %d)\n", fid); + // Note: queue_finish calls free(cur->s). + // Therefore we need to create a new string here. + synch = malloc((strlen("===SYNCH===\n")+2) * sizeof (char)); + synch = strcpy(synch, s); + + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + queue_finish(cur, synch, fid); /* handles its own lock */ + + if (log_mutex) fprintf(stderr, "Locking input mutex (%d)\n", fid); + if (log_mutex) fprintf(stderr, " Reading input for new data (fid %d)\n", fid); + pthread_mutex_lock(&input_mutex); + + s = read_line(0,0); + } else { + if (log_mutex) fprintf(stderr, " Received new data %d (fid %d)\n", cur->id, fid); + cur->status = STATUS_RUNNING; + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + return cur; + } + } + + if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid); + pthread_mutex_unlock(&input_mutex); + /* Only way to reach this point: no more output */ + + if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); + pthread_mutex_lock(&queue_mutex); + if (head == NULL) { + fprintf(stderr, "Reached end of file. Exiting.\n"); + done(0); + } else + ptail = NULL; /* This serves as a signal that there is no more input */ + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + + return NULL; +} + +void queue_panic() { + struct line *next; + while (head && head->status == STATUS_FINISHED) { + /* Write out finished sentences */ + if (head->status == STATUS_FINISHED) { + fputs(head->s, stdout); + fflush(stdout); + } + /* Write out blank line for unfinished sentences */ + if (head->status == STATUS_ABORTED) { + fputs("\n", stdout); + fflush(stdout); + } + /* By defition, there cannot be any RUNNING sentences, since + function is only called when n_clients == 0 */ + free(head->s); + next = head->next; + free(head); + head = next; + n_flushed++; + } + fclose(stdout); + fprintf(stderr, "All clients died. Panicking, flushing completed sentences and exiting.\n"); + done(1); +} + +void queue_abort(struct line *node, int fid) { + if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); + pthread_mutex_lock(&queue_mutex); + node->status = STATUS_ABORTED; + if (n_clients == 0) { + if (stay_alive) { + fprintf(stderr, "Warning! No live clients detected! Staying alive, will retry soon.\n"); + } else { + queue_panic(); + } + } + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); +} + + +void queue_print() { + struct line *cur; + + fprintf(stderr, " Queue\n"); + + for (cur = head; cur != NULL; cur = cur->next) { + switch(cur->status) { + case STATUS_RUNNING: + fprintf(stderr, " %d running ", cur->id); break; + case STATUS_ABORTED: + fprintf(stderr, " %d aborted ", cur->id); break; + case STATUS_FINISHED: + fprintf(stderr, " %d finished ", cur->id); break; + + } + fprintf(stderr, "\n"); + //fprintf(stderr, cur->s); + } +} + +void queue_finish(struct line *node, char *s, int fid) { + struct line *next; + if (log_mutex) fprintf(stderr, "Locking queue mutex (%d)\n", fid); + pthread_mutex_lock(&queue_mutex); + + free(node->s); + node->s = s; + node->status = STATUS_FINISHED; + n_received++; + + /* Flush out finished nodes */ + while (head && head->status == STATUS_FINISHED) { + + if (log_mutex) fprintf(stderr, " Flushing finished node %d\n", head->id); + + fputs(head->s, stdout); + fflush(stdout); + if (log_mutex) fprintf(stderr, " Flushed node %d\n", head->id); + free(head->s); + + next = head->next; + free(head); + + head = next; + + n_flushed++; + + if (head == NULL) { /* empty queue */ + if (ptail == NULL) { /* This can only happen if set in queue_get as signal that there is no more input. */ + fprintf(stderr, "All sentences finished. Exiting.\n"); + done(0); + } else /* ptail pointed at something which was just popped off the stack -- reset to head*/ + ptail = &head; + } + } + + if (log_mutex) fprintf(stderr, " Flushing output %d\n", head->id); + fflush(stdout); + fprintf(stderr, "%d sentences sent, %d sentences finished, %d sentences flushed\n", n_sent, n_received, n_flushed); + + if (log_mutex) fprintf(stderr, "Unlocking queue mutex (%d)\n", fid); + pthread_mutex_unlock(&queue_mutex); + +} + +char * read_line(int fd, int multiline) { + int size = 80; + char errorbuf[100]; + char *s = malloc(size+2); + int result, errors=0; + int i = 0; + + result = read(fd, s+i, 1); + + while (1) { + if (result < 0) { + perror("read()"); + sprintf(errorbuf, "Error code: %d\n", errno); + fprintf(stderr, errorbuf); + errors++; + if (errors > 5) { + free(s); + return NULL; + } else { + sleep(1); /* retry after delay */ + } + } else if (result == 0) { + break; + } else if (multiline==0 && s[i] == '\n') { + break; + } else { + if (s[i] == '\n'){ + /* if we've reached this point, + then multiline must be 1, and we're + going to poll the fd for an additional + line of data. The basic design is to + run a select on the filedescriptor fd. + Select will return under two conditions: + if there is data on the fd, or if a + timeout is reached. We'll select on this + fd. If select returns because there's data + ready, keep going; else assume there's no + more and return the data we already have. + */ + + fd_set set; + FD_ZERO(&set); + FD_SET(fd, &set); + + struct timeval timeout; + timeout.tv_sec = 3; // number of seconds for timeout + timeout.tv_usec = 0; + + int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout); + if (ready<1){ + break; // no more data, stop looping + } + } + i++; + + if (i == size) { + size = size*2; + s = realloc(s, size+2); + } + } + + result = read(fd, s+i, 1); + } + + if (result == 0 && i == 0) { /* end of file */ + free(s); + return NULL; + } + + s[i] = '\n'; + s[i+1] = '\0'; + + return s; +} + +void * new_client(void *arg) { + struct clientinfo *client = (struct clientinfo *)arg; + struct line *cur; + int result; + char *s; + char errorbuf[100]; + + pthread_mutex_lock(&clients_mutex); + n_clients++; + pthread_mutex_unlock(&clients_mutex); + + fprintf(stderr, "Client connected (%d connected)\n", n_clients); + + for (;;) { + + cur = queue_get(client->s); + + if (cur) { + /* fprintf(stderr, "Sending to client: %s", cur->s); */ + fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s); + result = write(client->s, cur->s, strlen(cur->s)); + if (result < strlen(cur->s)){ + perror("write()"); + sprintf(errorbuf, "Error code: %d\n", errno); + fprintf(stderr, errorbuf); + + pthread_mutex_lock(&clients_mutex); + n_clients--; + pthread_mutex_unlock(&clients_mutex); + + fprintf(stderr, "Client died (%d connected)\n", n_clients); + queue_abort(cur, client->s); + + close(client->s); + free(client); + + pthread_exit(NULL); + } + } else { + close(client->s); + pthread_mutex_lock(&clients_mutex); + n_clients--; + pthread_mutex_unlock(&clients_mutex); + fprintf(stderr, "Client dismissed (%d connected)\n", n_clients); + pthread_exit(NULL); + } + + s = read_line(client->s,expect_multiline_output); + if (s) { + /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */ + fprintf(stderr, "Client (fid %d) returned data %d\n", client->s, cur->id); +// queue_print(); + queue_finish(cur, s, client->s); + } else { + pthread_mutex_lock(&clients_mutex); + n_clients--; + pthread_mutex_unlock(&clients_mutex); + + fprintf(stderr, "Client died (%d connected)\n", n_clients); + queue_abort(cur, client->s); + + close(client->s); + free(client); + + pthread_exit(NULL); + } + + } + +} + +void done (int code) { + close(s); + exit(code); +} + + + +int main (int argc, char *argv[]) { + struct sockaddr_in sin, from; + int g; + socklen_t len; + struct clientinfo *client; + int port; + int opt; + int errors = 0; + int argi; + char *key = NULL, *client_key; + int use_key = 0; + /* the key stuff here doesn't provide any + real measure of security, it's mainly to keep + jobs from bumping into each other. */ + + pthread_t tid; + port = DEFAULT_PORT; + + for (argi=1; argi < argc; argi++){ + if (strcmp(argv[argi], "-m")==0){ + expect_multiline_output = 1; + } else if (strcmp(argv[argi], "-k")==0){ + argi++; + if (argi == argc){ + fprintf(stderr, "Key must be specified after -k\n"); + exit(1); + } + key = argv[argi]; + use_key = 1; + } else if (strcmp(argv[argi], "--stay-alive")==0){ + stay_alive = 1; /* dont panic and die with zero clients */ + } else { + port = atoi(argv[argi]); + } + } + + /* Initialize data structures */ + head = NULL; + ptail = &head; + + /* Set up listener */ + s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + opt = 1; + setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(port); + while (bind(s, (struct sockaddr *) &sin, sizeof(sin)) < 0) { + perror("bind()"); + sleep(1); + errors++; + if (errors > 100) + exit(1); + } + + len = sizeof(sin); + getsockname(s, (struct sockaddr *) &sin, &len); + + fprintf(stderr, "Listening on port %hd\n", ntohs(sin.sin_port)); + + while (listen(s, MAX_CLIENTS) < 0) { + perror("listen()"); + sleep(1); + errors++; + if (errors > 100) + exit(1); + } + + for (;;) { + len = sizeof(from); + g = accept(s, (struct sockaddr *)&from, &len); + if (g < 0) { + perror("accept()"); + sleep(1); + continue; + } + client = malloc(sizeof(struct clientinfo)); + client->s = g; + bcopy(&from, &client->sin, len); + + if (use_key){ + fd_set set; + FD_ZERO(&set); + FD_SET(client->s, &set); + + struct timeval timeout; + timeout.tv_sec = 3; // number of seconds for timeout + timeout.tv_usec = 0; + + int ready = select(FD_SETSIZE, &set, NULL, NULL, &timeout); + if (ready<1){ + fprintf(stderr, "Prospective client failed to respond with correct key.\n"); + close(client->s); + free(client); + } else { + client_key = read_line(client->s,0); + client_key[strlen(client_key)-1]='\0'; /* chop trailing newline */ + if (strcmp(key, client_key)==0){ + pthread_create(&tid, NULL, new_client, client); + } else { + fprintf(stderr, "Prospective client failed to respond with correct key.\n"); + close(client->s); + free(client); + } + free(client_key); + } + } else { + pthread_create(&tid, NULL, new_client, client); + } + } + +} + + + diff --git a/vest/sentserver.h b/vest/sentserver.h new file mode 100644 index 00000000..cd17a546 --- /dev/null +++ b/vest/sentserver.h @@ -0,0 +1,6 @@ +#ifndef SENTSERVER_H +#define SENTSERVER_H + +#define DEFAULT_PORT 50000 + +#endif |