summaryrefslogtreecommitdiff
path: root/vest/parallelize.pl
diff options
context:
space:
mode:
Diffstat (limited to 'vest/parallelize.pl')
-rwxr-xr-xvest/parallelize.pl288
1 files changed, 163 insertions, 125 deletions
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
index a006d924..be12eefb 100755
--- a/vest/parallelize.pl
+++ b/vest/parallelize.pl
@@ -18,6 +18,7 @@
#ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s
+use File::Temp qw/ tempfile /;
use Getopt::Long;
use IPC::Open2;
use strict;
@@ -25,8 +26,8 @@ use POSIX ":sys_wait_h";
use Cwd qw(getcwd);
my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines
-my $recycle_clients; # spawn new clients when previous ones terminate
-my $stay_alive; # dont let server die when having zero clients
+my $recycle_clients; # spawn new clients when previous ones terminate
+my $stay_alive; # dont let server die when having zero clients
my $joblist = "";
my $errordir="";
my $multiline;
@@ -115,23 +116,27 @@ sub extend_path($$;$$) {
my $abscwd=abspath(&getcwd);
sub print_help;
+my $use_fork;
+my @pids;
+
# Process command-line options
unless (GetOptions(
- "stay-alive" => \$stay_alive,
- "recycle-clients" => \$recycle_clients,
- "error-dir=s" => \$errordir,
- "multi-line" => \$multiline,
- "file=s" => \@files_to_stage,
- "verbose" => \$verbose,
- "jobs=i" => \$numnodes,
- "pmem=s" => \$pmem,
+ "stay-alive" => \$stay_alive,
+ "recycle-clients" => \$recycle_clients,
+ "error-dir=s" => \$errordir,
+ "multi-line" => \$multiline,
+ "file=s" => \@files_to_stage,
+ "use-fork" => \$use_fork,
+ "verbose" => \$verbose,
+ "jobs=i" => \$numnodes,
+ "pmem=s" => \$pmem,
"baseport=i" => \$basep,
# "iport=i" => \$randp, #for short name -i
"no-which!" => \$no_which,
"no-cd!" => \$no_cd,
"tailn=s" => \$tailn,
) && scalar @ARGV){
- print_help();
+ print_help();
die "bad options.";
}
@@ -158,7 +163,7 @@ $executable=`basename $executable`;
chomp $executable;
-if ($verbose){ print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; }
+print STDERR "Parallelizing ($numnodes ways): $cmd\n\n";
# create -e dir and save .sh
use File::Temp qw/tempdir/;
@@ -210,15 +215,15 @@ my $netstat=&listening_port_lines;
if ($verbose){ print STDERR "Testing port $port...";}
while ($netstat=~/$port/ || &listening_port_lines=~/$port/){
- if ($verbose){ print STDERR "port is busy\n";}
- $port++;
- if ($port > $endp){
- die "Unable to find open port\n";
- }
- if ($verbose){ print STDERR "Testing port $port... "; }
+ if ($verbose){ print STDERR "port is busy\n";}
+ $port++;
+ if ($port > $endp){
+ die "Unable to find open port\n";
+ }
+ if ($verbose){ print STDERR "Testing port $port... "; }
}
if ($verbose){
- print STDERR "port $port is available\n";
+ print STDERR "port $port is available\n";
}
my $key = int(rand()*1000000);
@@ -228,151 +233,184 @@ if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"
my $stay_alive_flag = "";
if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
-my %node_count;
+my $node_count = 0;
my $script = "";
# fork == one thread runs the sentserver, while the
# other spawns the sentclient commands.
-if (my $pid = fork){
- sleep 4; # give other thread time to start sentserver
- $script =
- qq{wait
+if (my $pid = fork) {
+ sleep 4; # give other thread time to start sentserver
+ $script =
+ qq{wait
$cdcmd$sentclient $host:$port:$key $cmd
};
- if ($verbose){
- print STDERR "Client script:\n====\n";
- print STDERR $script;
- print STDERR "====\n";
- }
- for (my $jobn=0; $jobn<$numnodes; $jobn++){
- launch_job_on_node(1);
- }
- if ($recycle_clients) {
- my $ret;
- my $livejobs;
- while (1) {
- $ret = waitpid($pid, WNOHANG);
- #print STDERR "waitpid $pid ret = $ret \n";
- last if ($ret != 0);
- $livejobs = numof_live_jobs();
- if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
- print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
- launch_job_on_node(1); # TODO: support named nodes
- } else {
- sleep 15;
- }
- }
- }
- waitpid($pid, 0);
- cleanup();
+ if ($verbose){
+ print STDERR "Client script:\n====\n";
+ print STDERR $script;
+ print STDERR "====\n";
+ }
+ for (my $jobn=0; $jobn<$numnodes; $jobn++){
+ launch_job();
+ }
+ if ($recycle_clients) {
+ my $ret;
+ my $livejobs;
+ while (1) {
+ $ret = waitpid($pid, WNOHANG);
+ #print STDERR "waitpid $pid ret = $ret \n";
+ last if ($ret != 0);
+ $livejobs = numof_live_jobs();
+ if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
+ print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
+ launch_job();
+ } else {
+ sleep 15;
+ }
+ }
+ }
+ waitpid($pid, 0);
+ cleanup();
} else {
-# my $todo = "$sentserver -k $key $multiflag $port ";
- my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
- if ($verbose){ print STDERR "Running: $todo\n"; }
- my $rc = system($todo);
- if ($rc){
- die "Error: sentserver returned code $rc\n";
- }
+# my $todo = "$sentserver -k $key $multiflag $port ";
+ my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ my $rc = system($todo);
+ if ($rc){
+ die "Error: sentserver returned code $rc\n";
+ }
}
sub numof_live_jobs {
- my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
- return ($#livejobs + 1);
+ if ($use_fork) {
+ die "not implemented";
+ } else {
+ my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
+ return ($#livejobs + 1);
+ }
}
my (@errors,@outs,@cmds);
-sub launch_job_on_node {
- my $node = $_[0];
-
- my $errorfile = "/dev/null";
- my $outfile = "/dev/null";
- unless (exists($node_count{$node})){
- $node_count{$node} = 0;
- }
- my $node_count = $node_count{$node};
- $node_count{$node}++;
- my $clientname = $executable;
- $clientname =~ s/^(.{4}).*$/$1/;
- $clientname = "$clientname.$node.$node_count";
- if ($errordir){
- $errorfile = "$errordir/$clientname.ER";
- $outfile = "$errordir/$clientname.OU";
- push @errors,$errorfile;
- push @outs,$outfile;
- }
- my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile";
- push @cmds,$todo;
-
- if ($verbose){ print STDERR "Running: $todo\n"; }
- local(*QOUT, *QIN);
- open2(\*QOUT, \*QIN, $todo);
- print QIN $script;
- close QIN;
- while (my $jobid=<QOUT>){
- chomp $jobid;
- if ($verbose){ print STDERR "Launched client job: $jobid"; }
- $jobid =~ s/^(\d+)(.*?)$/\1/g;
+
+sub launch_job {
+ if ($use_fork) { return launch_job_fork(); }
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile";
+ push @cmds,$todo;
+
+ print STDERR "Running: $todo\n";
+ local(*QOUT, *QIN);
+ open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!";
+ print QIN $script;
+ close QIN;
+ while (my $jobid=<QOUT>){
+ chomp $jobid;
+ if ($verbose){ print STDERR "Launched client job: $jobid"; }
+ $jobid =~ s/^(\d+)(.*?)$/\1/g;
$jobid =~ s/^Your job (\d+) .*$/\1/;
- print STDERR " short job id $jobid\n";
+ print STDERR " short job id $jobid\n";
if ($verbose){
print STDERR "cd: $abscwd\n";
print STDERR "cmd: $cmd\n";
}
- if ($joblist == "") { $joblist = $jobid; }
- else {$joblist = $joblist . "\|" . $jobid; }
+ if ($joblist == "") { $joblist = $jobid; }
+ else {$joblist = $joblist . "\|" . $jobid; }
my $cleanfn="`qdel $jobid 2> /dev/null`";
- push(@cleanup_cmds, $cleanfn);
- }
- close QOUT;
+ push(@cleanup_cmds, $cleanfn);
+ }
+ close QOUT;
}
+sub launch_job_fork {
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ if (my $pid = fork) {
+ my ($fh, $scr_name) = get_temp_script();
+ print $fh $script;
+ close $fh;
+ my $todo = "/bin/sh $scr_name 1> $outfile 2> $errorfile";
+ print STDERR "EXEC: $todo\n";
+ my $out = `$todo`;
+ print STDERR "RES: $out\n";
+ unlink $scr_name or warn "Failed to remove $scr_name";
+ exit 0;
+ }
+}
+
+sub get_temp_script {
+ my ($fh, $filename) = tempfile( "workXXXX", SUFFIX => '.sh');
+ return ($fh, $filename);
+}
sub cleanup_and_die {
- cleanup();
- die "\n";
+ cleanup();
+ die "\n";
}
sub cleanup {
- if ($verbose){ print STDERR "Cleaning up...\n"; }
- for $cmd (@cleanup_cmds){
- if ($verbose){ print STDERR " $cmd\n"; }
- eval $cmd;
- }
- print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
- print STDERR "errors:\n",preview_files(\@errors,1),"\n";
- print STDERR "cmd:\n",$cmd,"\n";
- print STDERR " cat $errordir/*.ER\nfor logs.\n";
- if ($verbose){ print STDERR "Cleanup finished.\n"; }
+ print STDERR "Cleaning up...\n";
+ for $cmd (@cleanup_cmds){
+ print STDERR " Cleanup command: $cmd\n";
+ eval $cmd;
+ }
+ print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
+ print STDERR "errors:\n",preview_files(\@errors,1),"\n";
+ print STDERR "cmd:\n",$cmd,"\n";
+ print STDERR " cat $errordir/*.ER\nfor logs.\n";
+ print STDERR "Cleanup finished.\n";
}
sub print_help
{
- my $name = `basename $0`; chomp $name;
- print << "Help";
+ my $name = `basename $0`; chomp $name;
+ print << "Help";
usage: $name [options]
- Automatic black-box parallelization of commands.
+ Automatic black-box parallelization of commands.
options:
- -e, --error-dir <dir>
- Retain output files from jobs in <dir>, rather
- than silently deleting them.
+ --fork
+ Instead of using qsub, use fork.
+
+ -e, --error-dir <dir>
+ Retain output files from jobs in <dir>, rather
+ than silently deleting them.
- -m, --multi-line
- Expect that command may produce multiple output
- lines for a single input line. $name makes a
- reasonable attempt to obtain all output before
- processing additional inputs. However, use of this
- option is inherently unsafe.
+ -m, --multi-line
+ Expect that command may produce multiple output
+ lines for a single input line. $name makes a
+ reasonable attempt to obtain all output before
+ processing additional inputs. However, use of this
+ option is inherently unsafe.
- -v, --verbose
- Print diagnostic informatoin on stderr.
+ -v, --verbose
+ Print diagnostic informatoin on stderr.
- -j, --jobs
+ -j, --jobs
Number of jobs to use.
- -p, --pmem
- pmem setting for each job.
+ -p, --pmem
+ pmem setting for each job.
Help
}