diff options
Diffstat (limited to 'vest')
-rwxr-xr-x | vest/dist-vest.pl | 6 | ||||
-rwxr-xr-x | vest/parallelize.pl | 288 |
2 files changed, 168 insertions, 126 deletions
diff --git a/vest/dist-vest.pl b/vest/dist-vest.pl index 666509e3..8acec7a9 100755 --- a/vest/dist-vest.pl +++ b/vest/dist-vest.pl @@ -56,6 +56,7 @@ my $maxsim=0; my $oraclen=0; my $oracleb=20; my $dirargs=''; +my $usefork; # Process command-line options Getopt::Long::Configure("no_auto_abbrev"); @@ -63,6 +64,7 @@ if (GetOptions( "decoder=s" => \$decoderOpt, "decode-nodes=i" => \$decode_nodes, "dont-clean" => \$disable_clean, + "use-fork" => \$usefork, "dry-run" => \$dryrun, "epsilon=s" => \$epsilon, "help" => \$help, @@ -89,6 +91,8 @@ if (GetOptions( exit; } +if ($usefork) { $usefork = "--use-fork"; } else { $usefork = ''; } + if ($metric =~ /^(combi|ter)$/i) { $lines_per_mapper = 40; } @@ -230,7 +234,7 @@ while (1){ my $im1 = $iteration - 1; my $weightsFile="$dir/weights.$im1"; my $decoder_cmd = "$decoder -c $iniFile -w $weightsFile -O $dir/hgs"; - my $pcmd = "cat $srcFile | $parallelize -p $pmem -e $logdir -j $decode_nodes -- "; + my $pcmd = "cat $srcFile | $parallelize $usefork -p $pmem -e $logdir -j $decode_nodes -- "; if ($run_local) { $pcmd = "cat $srcFile |"; } my $cmd = $pcmd . "$decoder_cmd 2> $decoderLog 1> $runFile"; print STDERR "COMMAND:\n$cmd\n"; diff --git a/vest/parallelize.pl b/vest/parallelize.pl index a006d924..be12eefb 100755 --- a/vest/parallelize.pl +++ b/vest/parallelize.pl @@ -18,6 +18,7 @@ #ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s +use File::Temp qw/ tempfile /; use Getopt::Long; use IPC::Open2; use strict; @@ -25,8 +26,8 @@ use POSIX ":sys_wait_h"; use Cwd qw(getcwd); my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines -my $recycle_clients; # spawn new clients when previous ones terminate -my $stay_alive; # dont let server die when having zero clients +my $recycle_clients; # spawn new clients when previous ones terminate +my $stay_alive; # dont let server die when having zero clients my $joblist = ""; my $errordir=""; my $multiline; @@ -115,23 +116,27 @@ sub extend_path($$;$$) { my $abscwd=abspath(&getcwd); sub print_help; +my $use_fork; +my @pids; + # Process command-line options unless (GetOptions( - "stay-alive" => \$stay_alive, - "recycle-clients" => \$recycle_clients, - "error-dir=s" => \$errordir, - "multi-line" => \$multiline, - "file=s" => \@files_to_stage, - "verbose" => \$verbose, - "jobs=i" => \$numnodes, - "pmem=s" => \$pmem, + "stay-alive" => \$stay_alive, + "recycle-clients" => \$recycle_clients, + "error-dir=s" => \$errordir, + "multi-line" => \$multiline, + "file=s" => \@files_to_stage, + "use-fork" => \$use_fork, + "verbose" => \$verbose, + "jobs=i" => \$numnodes, + "pmem=s" => \$pmem, "baseport=i" => \$basep, # "iport=i" => \$randp, #for short name -i "no-which!" => \$no_which, "no-cd!" => \$no_cd, "tailn=s" => \$tailn, ) && scalar @ARGV){ - print_help(); + print_help(); die "bad options."; } @@ -158,7 +163,7 @@ $executable=`basename $executable`; chomp $executable; -if ($verbose){ print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; } +print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; # create -e dir and save .sh use File::Temp qw/tempdir/; @@ -210,15 +215,15 @@ my $netstat=&listening_port_lines; if ($verbose){ print STDERR "Testing port $port...";} while ($netstat=~/$port/ || &listening_port_lines=~/$port/){ - if ($verbose){ print STDERR "port is busy\n";} - $port++; - if ($port > $endp){ - die "Unable to find open port\n"; - } - if ($verbose){ print STDERR "Testing port $port... "; } + if ($verbose){ print STDERR "port is busy\n";} + $port++; + if ($port > $endp){ + die "Unable to find open port\n"; + } + if ($verbose){ print STDERR "Testing port $port... "; } } if ($verbose){ - print STDERR "port $port is available\n"; + print STDERR "port $port is available\n"; } my $key = int(rand()*1000000); @@ -228,151 +233,184 @@ if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n" my $stay_alive_flag = ""; if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; } -my %node_count; +my $node_count = 0; my $script = ""; # fork == one thread runs the sentserver, while the # other spawns the sentclient commands. -if (my $pid = fork){ - sleep 4; # give other thread time to start sentserver - $script = - qq{wait +if (my $pid = fork) { + sleep 4; # give other thread time to start sentserver + $script = + qq{wait $cdcmd$sentclient $host:$port:$key $cmd }; - if ($verbose){ - print STDERR "Client script:\n====\n"; - print STDERR $script; - print STDERR "====\n"; - } - for (my $jobn=0; $jobn<$numnodes; $jobn++){ - launch_job_on_node(1); - } - if ($recycle_clients) { - my $ret; - my $livejobs; - while (1) { - $ret = waitpid($pid, WNOHANG); - #print STDERR "waitpid $pid ret = $ret \n"; - last if ($ret != 0); - $livejobs = numof_live_jobs(); - if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j - print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n"; - launch_job_on_node(1); # TODO: support named nodes - } else { - sleep 15; - } - } - } - waitpid($pid, 0); - cleanup(); + if ($verbose){ + print STDERR "Client script:\n====\n"; + print STDERR $script; + print STDERR "====\n"; + } + for (my $jobn=0; $jobn<$numnodes; $jobn++){ + launch_job(); + } + if ($recycle_clients) { + my $ret; + my $livejobs; + while (1) { + $ret = waitpid($pid, WNOHANG); + #print STDERR "waitpid $pid ret = $ret \n"; + last if ($ret != 0); + $livejobs = numof_live_jobs(); + if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j + print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n"; + launch_job(); + } else { + sleep 15; + } + } + } + waitpid($pid, 0); + cleanup(); } else { -# my $todo = "$sentserver -k $key $multiflag $port "; - my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; - if ($verbose){ print STDERR "Running: $todo\n"; } - my $rc = system($todo); - if ($rc){ - die "Error: sentserver returned code $rc\n"; - } +# my $todo = "$sentserver -k $key $multiflag $port "; + my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; + if ($verbose){ print STDERR "Running: $todo\n"; } + my $rc = system($todo); + if ($rc){ + die "Error: sentserver returned code $rc\n"; + } } sub numof_live_jobs { - my @livejobs = grep(/$joblist/, split(/\n/, `qstat`)); - return ($#livejobs + 1); + if ($use_fork) { + die "not implemented"; + } else { + my @livejobs = grep(/$joblist/, split(/\n/, `qstat`)); + return ($#livejobs + 1); + } } my (@errors,@outs,@cmds); -sub launch_job_on_node { - my $node = $_[0]; - - my $errorfile = "/dev/null"; - my $outfile = "/dev/null"; - unless (exists($node_count{$node})){ - $node_count{$node} = 0; - } - my $node_count = $node_count{$node}; - $node_count{$node}++; - my $clientname = $executable; - $clientname =~ s/^(.{4}).*$/$1/; - $clientname = "$clientname.$node.$node_count"; - if ($errordir){ - $errorfile = "$errordir/$clientname.ER"; - $outfile = "$errordir/$clientname.OU"; - push @errors,$errorfile; - push @outs,$outfile; - } - my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile"; - push @cmds,$todo; - - if ($verbose){ print STDERR "Running: $todo\n"; } - local(*QOUT, *QIN); - open2(\*QOUT, \*QIN, $todo); - print QIN $script; - close QIN; - while (my $jobid=<QOUT>){ - chomp $jobid; - if ($verbose){ print STDERR "Launched client job: $jobid"; } - $jobid =~ s/^(\d+)(.*?)$/\1/g; + +sub launch_job { + if ($use_fork) { return launch_job_fork(); } + my $errorfile = "/dev/null"; + my $outfile = "/dev/null"; + $node_count++; + my $clientname = $executable; + $clientname =~ s/^(.{4}).*$/$1/; + $clientname = "$clientname.$node_count"; + if ($errordir){ + $errorfile = "$errordir/$clientname.ER"; + $outfile = "$errordir/$clientname.OU"; + push @errors,$errorfile; + push @outs,$outfile; + } + my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile"; + push @cmds,$todo; + + print STDERR "Running: $todo\n"; + local(*QOUT, *QIN); + open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!"; + print QIN $script; + close QIN; + while (my $jobid=<QOUT>){ + chomp $jobid; + if ($verbose){ print STDERR "Launched client job: $jobid"; } + $jobid =~ s/^(\d+)(.*?)$/\1/g; $jobid =~ s/^Your job (\d+) .*$/\1/; - print STDERR " short job id $jobid\n"; + print STDERR " short job id $jobid\n"; if ($verbose){ print STDERR "cd: $abscwd\n"; print STDERR "cmd: $cmd\n"; } - if ($joblist == "") { $joblist = $jobid; } - else {$joblist = $joblist . "\|" . $jobid; } + if ($joblist == "") { $joblist = $jobid; } + else {$joblist = $joblist . "\|" . $jobid; } my $cleanfn="`qdel $jobid 2> /dev/null`"; - push(@cleanup_cmds, $cleanfn); - } - close QOUT; + push(@cleanup_cmds, $cleanfn); + } + close QOUT; } +sub launch_job_fork { + my $errorfile = "/dev/null"; + my $outfile = "/dev/null"; + $node_count++; + my $clientname = $executable; + $clientname =~ s/^(.{4}).*$/$1/; + $clientname = "$clientname.$node_count"; + if ($errordir){ + $errorfile = "$errordir/$clientname.ER"; + $outfile = "$errordir/$clientname.OU"; + push @errors,$errorfile; + push @outs,$outfile; + } + if (my $pid = fork) { + my ($fh, $scr_name) = get_temp_script(); + print $fh $script; + close $fh; + my $todo = "/bin/sh $scr_name 1> $outfile 2> $errorfile"; + print STDERR "EXEC: $todo\n"; + my $out = `$todo`; + print STDERR "RES: $out\n"; + unlink $scr_name or warn "Failed to remove $scr_name"; + exit 0; + } +} + +sub get_temp_script { + my ($fh, $filename) = tempfile( "workXXXX", SUFFIX => '.sh'); + return ($fh, $filename); +} sub cleanup_and_die { - cleanup(); - die "\n"; + cleanup(); + die "\n"; } sub cleanup { - if ($verbose){ print STDERR "Cleaning up...\n"; } - for $cmd (@cleanup_cmds){ - if ($verbose){ print STDERR " $cmd\n"; } - eval $cmd; - } - print STDERR "outputs:\n",preview_files(\@outs,1),"\n"; - print STDERR "errors:\n",preview_files(\@errors,1),"\n"; - print STDERR "cmd:\n",$cmd,"\n"; - print STDERR " cat $errordir/*.ER\nfor logs.\n"; - if ($verbose){ print STDERR "Cleanup finished.\n"; } + print STDERR "Cleaning up...\n"; + for $cmd (@cleanup_cmds){ + print STDERR " Cleanup command: $cmd\n"; + eval $cmd; + } + print STDERR "outputs:\n",preview_files(\@outs,1),"\n"; + print STDERR "errors:\n",preview_files(\@errors,1),"\n"; + print STDERR "cmd:\n",$cmd,"\n"; + print STDERR " cat $errordir/*.ER\nfor logs.\n"; + print STDERR "Cleanup finished.\n"; } sub print_help { - my $name = `basename $0`; chomp $name; - print << "Help"; + my $name = `basename $0`; chomp $name; + print << "Help"; usage: $name [options] - Automatic black-box parallelization of commands. + Automatic black-box parallelization of commands. options: - -e, --error-dir <dir> - Retain output files from jobs in <dir>, rather - than silently deleting them. + --fork + Instead of using qsub, use fork. + + -e, --error-dir <dir> + Retain output files from jobs in <dir>, rather + than silently deleting them. - -m, --multi-line - Expect that command may produce multiple output - lines for a single input line. $name makes a - reasonable attempt to obtain all output before - processing additional inputs. However, use of this - option is inherently unsafe. + -m, --multi-line + Expect that command may produce multiple output + lines for a single input line. $name makes a + reasonable attempt to obtain all output before + processing additional inputs. However, use of this + option is inherently unsafe. - -v, --verbose - Print diagnostic informatoin on stderr. + -v, --verbose + Print diagnostic informatoin on stderr. - -j, --jobs + -j, --jobs Number of jobs to use. - -p, --pmem - pmem setting for each job. + -p, --pmem + pmem setting for each job. Help } |