summaryrefslogtreecommitdiff
path: root/dpmert/parallelize.pl
diff options
context:
space:
mode:
Diffstat (limited to 'dpmert/parallelize.pl')
-rwxr-xr-xdpmert/parallelize.pl423
1 files changed, 423 insertions, 0 deletions
diff --git a/dpmert/parallelize.pl b/dpmert/parallelize.pl
new file mode 100755
index 00000000..7d0365cc
--- /dev/null
+++ b/dpmert/parallelize.pl
@@ -0,0 +1,423 @@
+#!/usr/bin/env perl
+
+# Author: Adam Lopez
+#
+# This script takes a command that processes input
+# from stdin one-line-at-time, and parallelizes it
+# on the cluster using David Chiang's sentserver/
+# sentclient architecture.
+#
+# Prerequisites: the command *must* read each line
+# without waiting for subsequent lines of input
+# (for instance, a command which must read all lines
+# of input before processing will not work) and
+# return it to the output *without* buffering
+# multiple lines.
+
+#TODO: if -j 1, run immediately, not via sentserver? possible differences in environment might make debugging harder
+
+#ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s
+
+my $SCRIPT_DIR; BEGIN { use Cwd qw/ abs_path /; use File::Basename; $SCRIPT_DIR = dirname(abs_path($0)); push @INC, $SCRIPT_DIR, "$SCRIPT_DIR/../environment"; }
+use LocalConfig;
+
+use Cwd qw/ abs_path cwd getcwd /;
+use File::Temp qw/ tempfile /;
+use Getopt::Long;
+use IPC::Open2;
+use strict;
+use POSIX ":sys_wait_h";
+
+use File::Basename;
+my $myDir = dirname(__FILE__);
+print STDERR __FILE__." -> $myDir\n";
+push(@INC, $myDir);
+require "libcall.pl";
+
+my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines
+my $recycle_clients; # spawn new clients when previous ones terminate
+my $stay_alive; # dont let server die when having zero clients
+my $joblist = "";
+my $errordir="";
+my $multiline;
+my @files_to_stage;
+my $numnodes = 8;
+my $user = $ENV{"USER"};
+my $pmem = "9g";
+my $basep=50300;
+my $randp=300;
+my $tryp=50;
+my $no_which;
+my $no_cd;
+
+my $DEBUG=$ENV{DEBUG};
+print STDERR "DEBUG=$DEBUG output enabled.\n" if $DEBUG;
+my $verbose = 1;
+sub verbose {
+ if ($verbose) {
+ print STDERR @_,"\n";
+ }
+}
+sub debug {
+ if ($DEBUG) {
+ my ($package, $filename, $line) = caller;
+ print STDERR "DEBUG: $filename($line): ",join(' ',@_),"\n";
+ }
+}
+my $is_shell_special=qr.[ \t\n\\><|&;"'`~*?{}$!()].;
+my $shell_escape_in_quote=qr.[\\"\$`!].;
+sub escape_shell {
+ my ($arg)=@_;
+ return undef unless defined $arg;
+ return '""' unless $arg;
+ if ($arg =~ /$is_shell_special/) {
+ $arg =~ s/($shell_escape_in_quote)/\\$1/g;
+ return "\"$arg\"";
+ }
+ return $arg;
+}
+sub preview_files {
+ my ($l,$skipempty,$footer,$n)=@_;
+ $n=$tailn unless defined $n;
+ my @f=grep { ! ($skipempty && -z $_) } @$l;
+ my $fn=join(' ',map {escape_shell($_)} @f);
+ my $cmd="tail -n $n $fn";
+ unchecked_output("$cmd").($footer?"\nNONEMPTY FILES:\n$fn\n":"");
+}
+sub prefix_dirname($) {
+ #like `dirname but if ends in / then return the whole thing
+ local ($_)=@_;
+ if (/\/$/) {
+ $_;
+ } else {
+ s#/[^/]$##;
+ $_ ? $_ : '';
+ }
+}
+sub ensure_final_slash($) {
+ local ($_)=@_;
+ m#/$# ? $_ : ($_."/");
+}
+sub extend_path($$;$$) {
+ my ($base,$ext,$mkdir,$baseisdir)=@_;
+ if (-d $base) {
+ $base.="/";
+ } else {
+ my $dir;
+ if ($baseisdir) {
+ $dir=$base;
+ $base.='/' unless $base =~ /\/$/;
+ } else {
+ $dir=prefix_dirname($base);
+ }
+ my @cmd=("/bin/mkdir","-p",$dir);
+ check_call(@cmd) if $mkdir;
+ }
+ return $base.$ext;
+}
+
+my $abscwd=abs_path(&getcwd);
+sub print_help;
+
+my $use_fork;
+my @pids;
+
+# Process command-line options
+unless (GetOptions(
+ "stay-alive" => \$stay_alive,
+ "recycle-clients" => \$recycle_clients,
+ "error-dir=s" => \$errordir,
+ "multi-line" => \$multiline,
+ "file=s" => \@files_to_stage,
+ "use-fork" => \$use_fork,
+ "verbose" => \$verbose,
+ "jobs=i" => \$numnodes,
+ "pmem=s" => \$pmem,
+ "baseport=i" => \$basep,
+# "iport=i" => \$randp, #for short name -i
+ "no-which!" => \$no_which,
+ "no-cd!" => \$no_cd,
+ "tailn=s" => \$tailn,
+) && scalar @ARGV){
+ print_help();
+ die "bad options.";
+}
+
+my $cmd = "";
+my $prog=shift;
+if ($no_which) {
+ $cmd=$prog;
+} else {
+ $cmd=check_output("which $prog");
+ chomp $cmd;
+ die "$prog not found - $cmd" unless $cmd;
+}
+#$cmd=abs_path($cmd);
+for my $arg (@ARGV) {
+ $cmd .= " ".escape_shell($arg);
+}
+die "Please specify a command to parallelize\n" if $cmd eq '';
+
+my $cdcmd=$no_cd ? '' : ("cd ".escape_shell($abscwd)."\n");
+
+my $executable = $cmd;
+$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
+$executable=check_output("basename $executable");
+chomp $executable;
+
+
+print STDERR "Parallelizing ($numnodes ways): $cmd\n\n";
+
+# create -e dir and save .sh
+use File::Temp qw/tempdir/;
+unless ($errordir) {
+ $errordir=tempdir("$executable.XXXXXX",CLEANUP=>1);
+}
+if ($errordir) {
+ my $scriptfile=extend_path("$errordir/","$executable.sh",1,1);
+ -d $errordir || die "should have created -e dir $errordir";
+ open SF,">",$scriptfile || die;
+ print SF "$cdcmd$cmd\n";
+ close SF;
+ chmod 0755,$scriptfile;
+ $errordir=abs_path($errordir);
+ &verbose("-e dir: $errordir");
+}
+
+# set cleanup handler
+my @cleanup_cmds;
+sub cleanup;
+sub cleanup_and_die;
+$SIG{INT} = "cleanup_and_die";
+$SIG{TERM} = "cleanup_and_die";
+$SIG{HUP} = "cleanup_and_die";
+
+# other subs:
+sub numof_live_jobs;
+sub launch_job_on_node;
+
+
+# vars
+my $mydir = check_output("dirname $0"); chomp $mydir;
+my $sentserver = "$mydir/sentserver";
+my $sentclient = "$mydir/sentclient";
+my $host = check_output("hostname");
+chomp $host;
+
+
+# find open port
+srand;
+my $port = 50300+int(rand($randp));
+my $endp=$port+$tryp;
+sub listening_port_lines {
+ my $quiet=$verbose?'':'2>/dev/null';
+ return unchecked_output("netstat -a -n $quiet | grep LISTENING | grep -i tcp");
+}
+my $netstat=&listening_port_lines;
+
+if ($verbose){ print STDERR "Testing port $port...";}
+
+while ($netstat=~/$port/ || &listening_port_lines=~/$port/){
+ if ($verbose){ print STDERR "port is busy\n";}
+ $port++;
+ if ($port > $endp){
+ die "Unable to find open port\n";
+ }
+ if ($verbose){ print STDERR "Testing port $port... "; }
+}
+if ($verbose){
+ print STDERR "port $port is available\n";
+}
+
+my $key = int(rand()*1000000);
+
+my $multiflag = "";
+if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; }
+my $stay_alive_flag = "";
+if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
+
+my $node_count = 0;
+my $script = "";
+# fork == one thread runs the sentserver, while the
+# other spawns the sentclient commands.
+my $pid = fork;
+if ($pid == 0) { # child
+ sleep 8; # give other thread time to start sentserver
+ $script = "$cdcmd$sentclient $host:$port:$key $cmd";
+
+ if ($verbose){
+ print STDERR "Client script:\n====\n";
+ print STDERR $script;
+ print STDERR "====\n";
+ }
+ for (my $jobn=0; $jobn<$numnodes; $jobn++){
+ launch_job();
+ }
+ if ($recycle_clients) {
+ my $ret;
+ my $livejobs;
+ while (1) {
+ $ret = waitpid($pid, WNOHANG);
+ #print STDERR "waitpid $pid ret = $ret \n";
+ last if ($ret != 0);
+ $livejobs = numof_live_jobs();
+ if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
+ print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
+ launch_job();
+ } else {
+ sleep 15;
+ }
+ }
+ }
+ print STDERR "CHILD PROCESSES SPAWNED ... WAITING\n";
+ for my $p (@pids) {
+ waitpid($p, 0);
+ }
+} else {
+# my $todo = "$sentserver -k $key $multiflag $port ";
+ my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ check_call($todo);
+ print STDERR "Call to $sentserver returned.\n";
+ cleanup();
+ exit(0);
+}
+
+sub numof_live_jobs {
+ if ($use_fork) {
+ die "not implemented";
+ } else {
+ # We can probably continue decoding if the qstat error is only temporary
+ my @livejobs = grep(/$joblist/, split(/\n/, unchecked_output("qstat")));
+ return ($#livejobs + 1);
+ }
+}
+my (@errors,@outs,@cmds);
+
+sub launch_job {
+ if ($use_fork) { return launch_job_fork(); }
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ my $todo = qsub_args($pmem) . " -N $clientname -o $outfile -e $errorfile";
+ push @cmds,$todo;
+
+ print STDERR "Running: $todo\n";
+ local(*QOUT, *QIN);
+ open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!";
+ print QIN $script;
+ close QIN;
+ while (my $jobid=<QOUT>){
+ chomp $jobid;
+ if ($verbose){ print STDERR "Launched client job: $jobid"; }
+ $jobid =~ s/^(\d+)(.*?)$/\1/g;
+ $jobid =~ s/^Your job (\d+) .*$/\1/;
+ print STDERR " short job id $jobid\n";
+ if ($verbose){
+ print STDERR "cd: $abscwd\n";
+ print STDERR "cmd: $cmd\n";
+ }
+ if ($joblist == "") { $joblist = $jobid; }
+ else {$joblist = $joblist . "\|" . $jobid; }
+ my $cleanfn="qdel $jobid 2> /dev/null";
+ push(@cleanup_cmds, $cleanfn);
+ }
+ close QOUT;
+}
+
+sub launch_job_fork {
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ my $pid = fork;
+ if ($pid == 0) {
+ my ($fh, $scr_name) = get_temp_script();
+ print $fh $script;
+ close $fh;
+ my $todo = "/bin/bash -xeo pipefail $scr_name 1> $outfile 2> $errorfile";
+ print STDERR "EXEC: $todo\n";
+ my $out = check_output("$todo");
+ unlink $scr_name or warn "Failed to remove $scr_name";
+ exit 0;
+ } else {
+ push @pids, $pid;
+ }
+}
+
+sub get_temp_script {
+ my ($fh, $filename) = tempfile( "workXXXX", SUFFIX => '.sh');
+ return ($fh, $filename);
+}
+
+sub cleanup_and_die {
+ cleanup();
+ die "\n";
+}
+
+sub cleanup {
+ print STDERR "Cleaning up...\n";
+ for $cmd (@cleanup_cmds){
+ print STDERR " Cleanup command: $cmd\n";
+ eval $cmd;
+ }
+ print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
+ print STDERR "errors:\n",preview_files(\@errors,1),"\n";
+ print STDERR "cmd:\n",$cmd,"\n";
+ print STDERR " cat $errordir/*.ER\nfor logs.\n";
+ print STDERR "Cleanup finished.\n";
+}
+
+sub print_help
+{
+ my $name = check_output("basename $0"); chomp $name;
+ print << "Help";
+
+usage: $name [options]
+
+ Automatic black-box parallelization of commands.
+
+options:
+
+ --use-fork
+ Instead of using qsub, use fork.
+
+ -e, --error-dir <dir>
+ Retain output files from jobs in <dir>, rather
+ than silently deleting them.
+
+ -m, --multi-line
+ Expect that command may produce multiple output
+ lines for a single input line. $name makes a
+ reasonable attempt to obtain all output before
+ processing additional inputs. However, use of this
+ option is inherently unsafe.
+
+ -v, --verbose
+ Print diagnostic informatoin on stderr.
+
+ -j, --jobs
+ Number of jobs to use.
+
+ -p, --pmem
+ pmem setting for each job.
+
+Help
+}