diff options
Diffstat (limited to 'vest')
| -rwxr-xr-x | vest/parallelize.pl | 32 | ||||
| -rw-r--r-- | vest/sentserver.c | 32 | 
2 files changed, 34 insertions, 30 deletions
diff --git a/vest/parallelize.pl b/vest/parallelize.pl index 8b3f56dd..e6bd8bf9 100755 --- a/vest/parallelize.pl +++ b/vest/parallelize.pl @@ -1,17 +1,17 @@  #!/usr/bin/env perl  # Author: Adam Lopez -#  +#  # This script takes a command that processes input  # from stdin one-line-at-time, and parallelizes it  # on the cluster using David Chiang's sentserver/ -# sentclient architecture.   -#  +# sentclient architecture. +#  # Prerequisites: the command *must* read each line  # without waiting for subsequent lines of input  # (for instance, a command which must read all lines -# of input before processing will not work) and  -# return it to the output *without* buffering  +# of input before processing will not work) and +# return it to the output *without* buffering  # multiple lines.  use Getopt::Long; @@ -63,12 +63,12 @@ if ($errordir){  if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; } -# set cleanup handler  +# set cleanup handler  my @cleanup_cmds;  sub cleanup;  sub cleanup_and_die;  $SIG{INT} = "cleanup_and_die"; -$SIG{TERM} = "cleanup_and_die";  +$SIG{TERM} = "cleanup_and_die";  $SIG{HUP} = "cleanup_and_die";  # other subs: @@ -83,24 +83,28 @@ my $sentclient = "$mydir/sentclient";  my $host = `hostname`;  chomp $host; -my $executable = $cmd;   +my $executable = $cmd;  $executable =~ s/^\s*(\S+)($|\s.*)/$1/;  $executable=`basename $executable`;  chomp $executable;  # find open port -my $port = 50300; +my $basep=50300; +my $randp=300; +my $tryp=50; +my $port = 50300+int(rand($randp)); +my $endp=$port+$tryp;  if ($verbose){ print STDERR "Testing port $port...";} -while (`netstat -l | grep $port`){ +while (`netstat -l 2>/dev/null | grep -q $port`){  	if ($verbose){ print STDERR "port is busy\n";}  	$port++; -	if ($port > 50400){ +	if ($port > $endp){  		die "Unable to find open port\n";  	}  	if ($verbose){ print STDERR "Testing port $port... "; }  }  if ($verbose){ -	print STDERR "port is available\n"; +	print STDERR "port $port is available\n";  }  srand; @@ -175,7 +179,7 @@ sub launch_job_on_node {  		$clientname =~ s/^(.{4}).*$/$1/;  		$clientname = "$clientname.$node.$node_count";  		if ($errordir){ -			$errorfile = "$errordir/$clientname.ER";  +			$errorfile = "$errordir/$clientname.ER";  			$outfile = "$errordir/$clientname.OU";  		}  		my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile"; @@ -226,7 +230,7 @@ options:  	-e, --error-dir <dir>  		Retain output files from jobs in <dir>, rather  		than silently deleting them. -	 +  	-m, --multi-line  		Expect that command may produce multiple output  		lines for a single input line.  $name makes a diff --git a/vest/sentserver.c b/vest/sentserver.c index f7458b6d..e05fe47d 100644 --- a/vest/sentserver.c +++ b/vest/sentserver.c @@ -7,7 +7,7 @@  #include <stdio.h>  #include <sys/socket.h>  #include <sys/types.h> -#include <sys/time.h>  +#include <sys/time.h>  #include <netinet/in.h>  #include <sched.h>  #include <pthread.h> @@ -118,7 +118,7 @@ struct line * queue_get(int fid) {  			pthread_mutex_unlock(&queue_mutex);  			return cur;  		} -	}  +	}  	if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);  	pthread_mutex_unlock(&input_mutex); @@ -146,7 +146,7 @@ void queue_panic() {  			fflush(stdout);  		}  		/* Write out blank line for unfinished sentences */ -		if (head->status == STATUS_ABORTED) {  +		if (head->status == STATUS_ABORTED) {  			fputs("\n", stdout);  			fflush(stdout);  		} @@ -192,7 +192,7 @@ void queue_print() {        fprintf(stderr, "    %d aborted  ", cur->id); break;      case STATUS_FINISHED:        fprintf(stderr, "    %d finished ", cur->id); break; -       +      }  	fprintf(stderr, "\n");      //fprintf(stderr, cur->s); @@ -221,7 +221,7 @@ void queue_finish(struct line *node, char *s, int fid) {      next = head->next;      free(head); -     +      head = next;      n_flushed++; @@ -271,9 +271,9 @@ char * read_line(int fd, int multiline) {        break;      } else {        if (s[i] == '\n'){ -	/* if we've reached this point,  +	/* if we've reached this point,  	   then multiline must be 1, and we're -	   going to poll the fd for an additional  +	   going to poll the fd for an additional  	   line of data.  The basic design is to  	   run a select on the filedescriptor fd.  	   Select will return under two conditions: @@ -283,7 +283,7 @@ char * read_line(int fd, int multiline) {  	   ready, keep going; else assume there's no  	   more and return the data we already have.  	*/ -	 +  	fd_set set;  	FD_ZERO(&set);  	FD_SET(fd, &set); @@ -298,12 +298,12 @@ char * read_line(int fd, int multiline) {  	}        }        i++; -       +        if (i == size) {  	size = size*2;  	s = realloc(s, size+2);        } -    }     +    }      result = read(fd, s+i, 1);    } @@ -312,7 +312,7 @@ char * read_line(int fd, int multiline) {      free(s);      return NULL;    } -     +    s[i] = '\n';    s[i+1] = '\0'; @@ -335,7 +335,7 @@ void * new_client(void *arg) {    for (;;) {      cur = queue_get(client->s); -     +      if (cur) {        /* fprintf(stderr, "Sending to client: %s", cur->s); */        fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s); @@ -365,7 +365,7 @@ void * new_client(void *arg) {        fprintf(stderr, "Client dismissed (%d connected)\n", n_clients);        pthread_exit(NULL);      } -     +      s = read_line(client->s,expect_multiline_output);      if (s) {        /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */ @@ -387,7 +387,7 @@ void * new_client(void *arg) {      }    } - +  return 0;  }  void done (int code) { @@ -456,7 +456,7 @@ int main (int argc, char *argv[]) {    len = sizeof(sin);    getsockname(s, (struct sockaddr *) &sin, &len); -  fprintf(stderr, "Listening on port %hd\n", ntohs(sin.sin_port)); +  fprintf(stderr, "Listening on port %hu\n", ntohs(sin.sin_port));    while (listen(s, MAX_CLIENTS) < 0) {  	perror("listen()"); @@ -477,7 +477,7 @@ int main (int argc, char *argv[]) {      client = malloc(sizeof(struct clientinfo));      client->s = g;      bcopy(&from, &client->sin, len); -     +  	if (use_key){  		fd_set set;  		FD_ZERO(&set);  | 
