diff options
author | graehl <graehl@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-06-28 01:00:52 +0000 |
---|---|---|
committer | graehl <graehl@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-06-28 01:00:52 +0000 |
commit | f08f336a7780511025995e3fa2e6d921c7a5f334 (patch) | |
tree | d068a129227feefb90577a252bd68930f8eb50fc /vest | |
parent | 875daff93c149ca5ed63ae0cdcf20be06b72d778 (diff) |
parallelize avoid port contention
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@31 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'vest')
-rwxr-xr-x | vest/parallelize.pl | 32 | ||||
-rw-r--r-- | vest/sentserver.c | 32 |
2 files changed, 34 insertions, 30 deletions
diff --git a/vest/parallelize.pl b/vest/parallelize.pl index 8b3f56dd..e6bd8bf9 100755 --- a/vest/parallelize.pl +++ b/vest/parallelize.pl @@ -1,17 +1,17 @@ #!/usr/bin/env perl # Author: Adam Lopez -# +# # This script takes a command that processes input # from stdin one-line-at-time, and parallelizes it # on the cluster using David Chiang's sentserver/ -# sentclient architecture. -# +# sentclient architecture. +# # Prerequisites: the command *must* read each line # without waiting for subsequent lines of input # (for instance, a command which must read all lines -# of input before processing will not work) and -# return it to the output *without* buffering +# of input before processing will not work) and +# return it to the output *without* buffering # multiple lines. use Getopt::Long; @@ -63,12 +63,12 @@ if ($errordir){ if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; } -# set cleanup handler +# set cleanup handler my @cleanup_cmds; sub cleanup; sub cleanup_and_die; $SIG{INT} = "cleanup_and_die"; -$SIG{TERM} = "cleanup_and_die"; +$SIG{TERM} = "cleanup_and_die"; $SIG{HUP} = "cleanup_and_die"; # other subs: @@ -83,24 +83,28 @@ my $sentclient = "$mydir/sentclient"; my $host = `hostname`; chomp $host; -my $executable = $cmd; +my $executable = $cmd; $executable =~ s/^\s*(\S+)($|\s.*)/$1/; $executable=`basename $executable`; chomp $executable; # find open port -my $port = 50300; +my $basep=50300; +my $randp=300; +my $tryp=50; +my $port = 50300+int(rand($randp)); +my $endp=$port+$tryp; if ($verbose){ print STDERR "Testing port $port...";} -while (`netstat -l | grep $port`){ +while (`netstat -l 2>/dev/null | grep -q $port`){ if ($verbose){ print STDERR "port is busy\n";} $port++; - if ($port > 50400){ + if ($port > $endp){ die "Unable to find open port\n"; } if ($verbose){ print STDERR "Testing port $port... "; } } if ($verbose){ - print STDERR "port is available\n"; + print STDERR "port $port is available\n"; } srand; @@ -175,7 +179,7 @@ sub launch_job_on_node { $clientname =~ s/^(.{4}).*$/$1/; $clientname = "$clientname.$node.$node_count"; if ($errordir){ - $errorfile = "$errordir/$clientname.ER"; + $errorfile = "$errordir/$clientname.ER"; $outfile = "$errordir/$clientname.OU"; } my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile"; @@ -226,7 +230,7 @@ options: -e, --error-dir <dir> Retain output files from jobs in <dir>, rather than silently deleting them. - + -m, --multi-line Expect that command may produce multiple output lines for a single input line. $name makes a diff --git a/vest/sentserver.c b/vest/sentserver.c index f7458b6d..e05fe47d 100644 --- a/vest/sentserver.c +++ b/vest/sentserver.c @@ -7,7 +7,7 @@ #include <stdio.h> #include <sys/socket.h> #include <sys/types.h> -#include <sys/time.h> +#include <sys/time.h> #include <netinet/in.h> #include <sched.h> #include <pthread.h> @@ -118,7 +118,7 @@ struct line * queue_get(int fid) { pthread_mutex_unlock(&queue_mutex); return cur; } - } + } if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid); pthread_mutex_unlock(&input_mutex); @@ -146,7 +146,7 @@ void queue_panic() { fflush(stdout); } /* Write out blank line for unfinished sentences */ - if (head->status == STATUS_ABORTED) { + if (head->status == STATUS_ABORTED) { fputs("\n", stdout); fflush(stdout); } @@ -192,7 +192,7 @@ void queue_print() { fprintf(stderr, " %d aborted ", cur->id); break; case STATUS_FINISHED: fprintf(stderr, " %d finished ", cur->id); break; - + } fprintf(stderr, "\n"); //fprintf(stderr, cur->s); @@ -221,7 +221,7 @@ void queue_finish(struct line *node, char *s, int fid) { next = head->next; free(head); - + head = next; n_flushed++; @@ -271,9 +271,9 @@ char * read_line(int fd, int multiline) { break; } else { if (s[i] == '\n'){ - /* if we've reached this point, + /* if we've reached this point, then multiline must be 1, and we're - going to poll the fd for an additional + going to poll the fd for an additional line of data. The basic design is to run a select on the filedescriptor fd. Select will return under two conditions: @@ -283,7 +283,7 @@ char * read_line(int fd, int multiline) { ready, keep going; else assume there's no more and return the data we already have. */ - + fd_set set; FD_ZERO(&set); FD_SET(fd, &set); @@ -298,12 +298,12 @@ char * read_line(int fd, int multiline) { } } i++; - + if (i == size) { size = size*2; s = realloc(s, size+2); } - } + } result = read(fd, s+i, 1); } @@ -312,7 +312,7 @@ char * read_line(int fd, int multiline) { free(s); return NULL; } - + s[i] = '\n'; s[i+1] = '\0'; @@ -335,7 +335,7 @@ void * new_client(void *arg) { for (;;) { cur = queue_get(client->s); - + if (cur) { /* fprintf(stderr, "Sending to client: %s", cur->s); */ fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s); @@ -365,7 +365,7 @@ void * new_client(void *arg) { fprintf(stderr, "Client dismissed (%d connected)\n", n_clients); pthread_exit(NULL); } - + s = read_line(client->s,expect_multiline_output); if (s) { /* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */ @@ -387,7 +387,7 @@ void * new_client(void *arg) { } } - + return 0; } void done (int code) { @@ -456,7 +456,7 @@ int main (int argc, char *argv[]) { len = sizeof(sin); getsockname(s, (struct sockaddr *) &sin, &len); - fprintf(stderr, "Listening on port %hd\n", ntohs(sin.sin_port)); + fprintf(stderr, "Listening on port %hu\n", ntohs(sin.sin_port)); while (listen(s, MAX_CLIENTS) < 0) { perror("listen()"); @@ -477,7 +477,7 @@ int main (int argc, char *argv[]) { client = malloc(sizeof(struct clientinfo)); client->s = g; bcopy(&from, &client->sin, len); - + if (use_key){ fd_set set; FD_ZERO(&set); |