summaryrefslogtreecommitdiff
path: root/vest/parallelize.pl
diff options
context:
space:
mode:
authorredpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-06-25 21:36:43 +0000
committerredpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f>2010-06-25 21:36:43 +0000
commit15753484522766f059152611a0c7c62a530a89dd (patch)
tree523683290ef67577557806567f2ac33a4a0f6c3a /vest/parallelize.pl
parent8f44c98e72f3a3163a011bfe2313fb85b8325116 (diff)
mert fixes, support scripts
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@29 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'vest/parallelize.pl')
-rwxr-xr-xvest/parallelize.pl255
1 files changed, 255 insertions, 0 deletions
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
new file mode 100755
index 00000000..f01232c8
--- /dev/null
+++ b/vest/parallelize.pl
@@ -0,0 +1,255 @@
+#!/usr/bin/env perl
+
+# Author: Adam Lopez
+#
+# This script takes a command that processes input
+# from stdin one-line-at-time, and parallelizes it
+# on the cluster using David Chiang's sentserver/
+# sentclient architecture.
+#
+# Prerequisites: the command *must* read each line
+# without waiting for subsequent lines of input
+# (for instance, a command which must read all lines
+# of input before processing will not work) and
+# return it to the output *without* buffering
+# multiple lines.
+
+use Getopt::Long;
+use IPC::Open2;
+use strict;
+use POSIX ":sys_wait_h";
+
+my $recycle_clients; # spawn new clients when previous ones terminate
+my $stay_alive; # dont let server die when having zero clients
+my $joblist = "";
+my $errordir;
+my $multiline;
+my @files_to_stage;
+my $verbose = 1;
+my $nodelist;
+my $user = $ENV{"USER"};
+my $pmem = "2g";
+
+sub print_help;
+
+# Process command-line options
+if (GetOptions(
+ "stay-alive" => \$stay_alive,
+ "recycle-clients" => \$recycle_clients,
+ "error-dir=s" => \$errordir,
+ "multi-line" => \$multiline,
+ "file=s" => \@files_to_stage,
+ "verbose" => \$verbose,
+ "nodelist=s" => \$nodelist,
+ "pmem=s" => \$pmem
+) == 0 || @ARGV == 0){
+ print_help();
+}
+
+my @nodes = grep(/^[cd]\d\d$/, split(/\n/, `pbsnodes -a`));
+if ($nodelist){
+ @nodes = split(/ /, $nodelist);
+}
+
+if ($verbose){
+ print STDERR "Compute nodes: @nodes\n";
+}
+
+my $cmd = "";
+for my $arg (@ARGV){
+ if ($arg=~ /\s/){
+ $cmd .= "\"$arg\" ";
+ } else {
+ $cmd .= "$arg "
+ }
+}
+
+if ($errordir){
+ `mkdir -p $errordir`;
+}
+
+if ($verbose){ print STDERR "Parallelizing: $cmd\n\n"; }
+
+# set cleanup handler
+my @cleanup_cmds;
+sub cleanup;
+sub cleanup_and_die;
+$SIG{INT} = "cleanup_and_die";
+$SIG{TERM} = "cleanup_and_die";
+$SIG{HUP} = "cleanup_and_die";
+
+# other subs:
+sub numof_live_jobs;
+sub launch_job_on_node;
+
+
+# vars
+my $mydir = `dirname $0`; chomp $mydir;
+my $sentserver = "$mydir/sentserver";
+my $sentclient = "$mydir/sentclient";
+my $host = `hostname`;
+chomp $host;
+
+my $executable = $cmd;
+$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
+$executable=`basename $executable`;
+chomp $executable;
+
+# find open port
+my $port = 50300;
+if ($verbose){ print STDERR "Testing port $port...";}
+while (`netstat -l | grep $port`){
+ if ($verbose){ print STDERR "port is busy\n";}
+ $port++;
+ if ($port > 50400){
+ die "Unable to find open port\n";
+ }
+ if ($verbose){ print STDERR "Testing port $port... "; }
+}
+if ($verbose){
+ print STDERR "port is available\n";
+}
+
+srand;
+my $key = int(rand()*1000000);
+
+my $multiflag = "";
+if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; }
+my $stay_alive_flag = "";
+if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
+
+my %node_count;
+my $script = "";
+
+# fork == one thread runs the sentserver, while the
+# other spawns the sentclient commands.
+if (my $pid = fork){
+ sleep 5; # give other thread time to start sentserver
+ $script .= "wait\n";
+ $script .= "$sentclient $host:$port:$key $cmd\n";
+ if ($verbose){
+ print STDERR "Client script:\n====\n";
+ print STDERR $script;
+ print STDERR "====\n";
+ }
+ for my $node (@nodes){
+ launch_job_on_node($node);
+ }
+ if ($recycle_clients) {
+ my $ret;
+ my $livejobs;
+ while (1) {
+ $ret = waitpid($pid, WNOHANG);
+ #print STDERR "waitpid $pid ret = $ret \n";
+ if ($ret != 0) {last; } # break
+ $livejobs = numof_live_jobs();
+ if ( $#nodes >= $livejobs ) { # a client terminated
+ my $numof_nodes = scalar @nodes;
+ print STDERR "num of requested nodes = $numof_nodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
+ launch_job_on_node(1); # TODO: support named nodes
+ } else {
+ sleep (60);
+ }
+ }
+ }
+ waitpid($pid, 0);
+ cleanup();
+} else {
+# my $todo = "$sentserver -k $key $multiflag $port ";
+ my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ my $rc = system($todo);
+ if ($rc){
+ die "Error: sentserver returned code $rc\n";
+ }
+}
+
+sub numof_live_jobs {
+ my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
+ return ($#livejobs + 1);
+}
+
+sub launch_job_on_node {
+ my $node = $_[0];
+
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ unless (exists($node_count{$node})){
+ $node_count{$node} = 0;
+ }
+ my $node_count = $node_count{$node};
+ $node_count{$node}++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ }
+ my $todo = "qsub -l mem_free=9G -N $clientname -o $outfile -e $errorfile";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ local(*QOUT, *QIN);
+ open2(\*QOUT, \*QIN, $todo);
+ print QIN $script;
+ close QIN;
+ while (my $jobid=<QOUT>){
+ chomp $jobid;
+ if ($verbose){ print STDERR "Launched client job: $jobid"; }
+ push(@cleanup_cmds, "`qdel $jobid 2> /dev/null`");
+ $jobid =~ s/^(\d+)(.*?)$/\1/g;
+ print STDERR "short job id $jobid\n";
+ if ($joblist == "") { $joblist = $jobid; }
+ else {$joblist = $joblist . "\|" . $jobid; }
+ }
+ close QOUT;
+}
+
+
+sub cleanup_and_die {
+ cleanup();
+ die "\n";
+}
+
+sub cleanup {
+ if ($verbose){ print STDERR "Cleaning up...\n"; }
+ for $cmd (@cleanup_cmds){
+ if ($verbose){ print STDERR " $cmd\n"; }
+ eval $cmd;
+ }
+ if ($verbose){ print STDERR "Cleanup finished.\n"; }
+}
+
+sub print_help
+{
+ my $name = `basename $0`; chomp $name;
+ print << "Help";
+
+usage: $name [options]
+
+ Automatic black-box parallelization of commands.
+
+options:
+
+ -e, --error-dir <dir>
+ Retain output files from jobs in <dir>, rather
+ than silently deleting them.
+
+ -m, --multi-line
+ Expect that command may produce multiple output
+ lines for a single input line. $name makes a
+ reasonable attempt to obtain all output before
+ processing additional inputs. However, use of this
+ option is inherently unsafe.
+
+ -v, --verbose
+ Print diagnostic informatoin on stderr.
+
+ -n, --nodelist
+ Space-delimited list of nodes to request. There is
+ one qsub command per node.
+
+ -p, --pmem
+ pmem setting for each job.
+
+Help
+}