summaryrefslogtreecommitdiff
path: root/vest
diff options
context:
space:
mode:
authorgraehl <graehl@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-06-28 01:00:52 +0000
committergraehl <graehl@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-06-28 01:00:52 +0000
commitf08f336a7780511025995e3fa2e6d921c7a5f334 (patch)
treed068a129227feefb90577a252bd68930f8eb50fc /vest
parent875daff93c149ca5ed63ae0cdcf20be06b72d778 (diff)
parallelize avoid port contention
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@31 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'vest')
-rwxr-xr-xvest/parallelize.pl32
-rw-r--r--vest/sentserver.c32
2 files changed, 34 insertions, 30 deletions
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
index 8b3f56dd..e6bd8bf9 100755
--- a/vest/parallelize.pl
+++ b/vest/parallelize.pl
@@ -1,17 +1,17 @@
#!/usr/bin/env perl
# Author: Adam Lopez
-#
+#
# This script takes a command that processes input
# from stdin one-line-at-time, and parallelizes it
# on the cluster using David Chiang's sentserver/
-# sentclient architecture.
-#
+# sentclient architecture.
+#
# Prerequisites: the command *must* read each line
# without waiting for subsequent lines of input
# (for instance, a command which must read all lines
-# of input before processing will not work) and
-# return it to the output *without* buffering
+# of input before processing will not work) and
+# return it to the output *without* buffering
# multiple lines.
use Getopt::Long;
@@ -63,12 +63,12 @@ if ($errordir){
if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; }
-# set cleanup handler
+# set cleanup handler
my @cleanup_cmds;
sub cleanup;
sub cleanup_and_die;
$SIG{INT} = "cleanup_and_die";
-$SIG{TERM} = "cleanup_and_die";
+$SIG{TERM} = "cleanup_and_die";
$SIG{HUP} = "cleanup_and_die";
# other subs:
@@ -83,24 +83,28 @@ my $sentclient = "$mydir/sentclient";
my $host = `hostname`;
chomp $host;
-my $executable = $cmd;
+my $executable = $cmd;
$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
$executable=`basename $executable`;
chomp $executable;
# find open port
-my $port = 50300;
+my $basep=50300;
+my $randp=300;
+my $tryp=50;
+my $port = 50300+int(rand($randp));
+my $endp=$port+$tryp;
if ($verbose){ print STDERR "Testing port $port...";}
-while (`netstat -l | grep $port`){
+while (`netstat -l 2>/dev/null | grep -q $port`){
if ($verbose){ print STDERR "port is busy\n";}
$port++;
- if ($port > 50400){
+ if ($port > $endp){
die "Unable to find open port\n";
}
if ($verbose){ print STDERR "Testing port $port... "; }
}
if ($verbose){
- print STDERR "port is available\n";
+ print STDERR "port $port is available\n";
}
srand;
@@ -175,7 +179,7 @@ sub launch_job_on_node {
$clientname =~ s/^(.{4}).*$/$1/;
$clientname = "$clientname.$node.$node_count";
if ($errordir){
- $errorfile = "$errordir/$clientname.ER";
+ $errorfile = "$errordir/$clientname.ER";
$outfile = "$errordir/$clientname.OU";
}
my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile";
@@ -226,7 +230,7 @@ options:
-e, --error-dir <dir>
Retain output files from jobs in <dir>, rather
than silently deleting them.
-
+
-m, --multi-line
Expect that command may produce multiple output
lines for a single input line. $name makes a
diff --git a/vest/sentserver.c b/vest/sentserver.c
index f7458b6d..e05fe47d 100644
--- a/vest/sentserver.c
+++ b/vest/sentserver.c
@@ -7,7 +7,7 @@
#include <stdio.h>
#include <sys/socket.h>
#include <sys/types.h>
-#include <sys/time.h>
+#include <sys/time.h>
#include <netinet/in.h>
#include <sched.h>
#include <pthread.h>
@@ -118,7 +118,7 @@ struct line * queue_get(int fid) {
pthread_mutex_unlock(&queue_mutex);
return cur;
}
- }
+ }
if (log_mutex) fprintf(stderr, "Unlocking input mutex (%d)\n", fid);
pthread_mutex_unlock(&input_mutex);
@@ -146,7 +146,7 @@ void queue_panic() {
fflush(stdout);
}
/* Write out blank line for unfinished sentences */
- if (head->status == STATUS_ABORTED) {
+ if (head->status == STATUS_ABORTED) {
fputs("\n", stdout);
fflush(stdout);
}
@@ -192,7 +192,7 @@ void queue_print() {
fprintf(stderr, " %d aborted ", cur->id); break;
case STATUS_FINISHED:
fprintf(stderr, " %d finished ", cur->id); break;
-
+
}
fprintf(stderr, "\n");
//fprintf(stderr, cur->s);
@@ -221,7 +221,7 @@ void queue_finish(struct line *node, char *s, int fid) {
next = head->next;
free(head);
-
+
head = next;
n_flushed++;
@@ -271,9 +271,9 @@ char * read_line(int fd, int multiline) {
break;
} else {
if (s[i] == '\n'){
- /* if we've reached this point,
+ /* if we've reached this point,
then multiline must be 1, and we're
- going to poll the fd for an additional
+ going to poll the fd for an additional
line of data. The basic design is to
run a select on the filedescriptor fd.
Select will return under two conditions:
@@ -283,7 +283,7 @@ char * read_line(int fd, int multiline) {
ready, keep going; else assume there's no
more and return the data we already have.
*/
-
+
fd_set set;
FD_ZERO(&set);
FD_SET(fd, &set);
@@ -298,12 +298,12 @@ char * read_line(int fd, int multiline) {
}
}
i++;
-
+
if (i == size) {
size = size*2;
s = realloc(s, size+2);
}
- }
+ }
result = read(fd, s+i, 1);
}
@@ -312,7 +312,7 @@ char * read_line(int fd, int multiline) {
free(s);
return NULL;
}
-
+
s[i] = '\n';
s[i+1] = '\0';
@@ -335,7 +335,7 @@ void * new_client(void *arg) {
for (;;) {
cur = queue_get(client->s);
-
+
if (cur) {
/* fprintf(stderr, "Sending to client: %s", cur->s); */
fprintf(stderr, "Sending data %d to client (fid %d)\n", cur->id, client->s);
@@ -365,7 +365,7 @@ void * new_client(void *arg) {
fprintf(stderr, "Client dismissed (%d connected)\n", n_clients);
pthread_exit(NULL);
}
-
+
s = read_line(client->s,expect_multiline_output);
if (s) {
/* fprintf(stderr, "Client (fid %d) returned: %s", client->s, s); */
@@ -387,7 +387,7 @@ void * new_client(void *arg) {
}
}
-
+ return 0;
}
void done (int code) {
@@ -456,7 +456,7 @@ int main (int argc, char *argv[]) {
len = sizeof(sin);
getsockname(s, (struct sockaddr *) &sin, &len);
- fprintf(stderr, "Listening on port %hd\n", ntohs(sin.sin_port));
+ fprintf(stderr, "Listening on port %hu\n", ntohs(sin.sin_port));
while (listen(s, MAX_CLIENTS) < 0) {
perror("listen()");
@@ -477,7 +477,7 @@ int main (int argc, char *argv[]) {
client = malloc(sizeof(struct clientinfo));
client->s = g;
bcopy(&from, &client->sin, len);
-
+
if (use_key){
fd_set set;
FD_ZERO(&set);