summaryrefslogtreecommitdiff
path: root/dpmert/parallelize.pl
diff options
context:
space:
mode:
authorMichael Denkowski <michael.j.denkowski@gmail.com>2012-12-22 16:01:23 -0500
committerMichael Denkowski <michael.j.denkowski@gmail.com>2012-12-22 16:01:23 -0500
commit597d89c11db53e91bc011eab70fd613bbe6453e8 (patch)
tree83c87c07d1ff6d3ee4e3b1626f7eddd49c61095b /dpmert/parallelize.pl
parent65e958ff2678a41c22be7171456a63f002ef370b (diff)
parent201af2acd394415a05072fbd53d42584875aa4b4 (diff)
Merge branch 'master' of git://github.com/redpony/cdec
Diffstat (limited to 'dpmert/parallelize.pl')
-rwxr-xr-xdpmert/parallelize.pl423
1 files changed, 0 insertions, 423 deletions
diff --git a/dpmert/parallelize.pl b/dpmert/parallelize.pl
deleted file mode 100755
index d2ebaeea..00000000
--- a/dpmert/parallelize.pl
+++ /dev/null
@@ -1,423 +0,0 @@
-#!/usr/bin/env perl
-
-# Author: Adam Lopez
-#
-# This script takes a command that processes input
-# from stdin one-line-at-time, and parallelizes it
-# on the cluster using David Chiang's sentserver/
-# sentclient architecture.
-#
-# Prerequisites: the command *must* read each line
-# without waiting for subsequent lines of input
-# (for instance, a command which must read all lines
-# of input before processing will not work) and
-# return it to the output *without* buffering
-# multiple lines.
-
-#TODO: if -j 1, run immediately, not via sentserver? possible differences in environment might make debugging harder
-
-#ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s
-
-my $SCRIPT_DIR; BEGIN { use Cwd qw/ abs_path /; use File::Basename; $SCRIPT_DIR = dirname(abs_path($0)); push @INC, $SCRIPT_DIR, "$SCRIPT_DIR/../environment"; }
-use LocalConfig;
-
-use Cwd qw/ abs_path cwd getcwd /;
-use File::Temp qw/ tempfile /;
-use Getopt::Long;
-use IPC::Open2;
-use strict;
-use POSIX ":sys_wait_h";
-
-use File::Basename;
-my $myDir = dirname(__FILE__);
-print STDERR __FILE__." -> $myDir\n";
-push(@INC, $myDir);
-require "libcall.pl";
-
-my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines
-my $recycle_clients; # spawn new clients when previous ones terminate
-my $stay_alive; # dont let server die when having zero clients
-my $joblist = "";
-my $errordir="";
-my $multiline;
-my $workdir = '.';
-my $numnodes = 8;
-my $user = $ENV{"USER"};
-my $pmem = "9g";
-my $basep=50300;
-my $randp=300;
-my $tryp=50;
-my $no_which;
-my $no_cd;
-
-my $DEBUG=$ENV{DEBUG};
-print STDERR "DEBUG=$DEBUG output enabled.\n" if $DEBUG;
-my $verbose = 1;
-sub verbose {
- if ($verbose) {
- print STDERR @_,"\n";
- }
-}
-sub debug {
- if ($DEBUG) {
- my ($package, $filename, $line) = caller;
- print STDERR "DEBUG: $filename($line): ",join(' ',@_),"\n";
- }
-}
-my $is_shell_special=qr.[ \t\n\\><|&;"'`~*?{}$!()].;
-my $shell_escape_in_quote=qr.[\\"\$`!].;
-sub escape_shell {
- my ($arg)=@_;
- return undef unless defined $arg;
- return '""' unless $arg;
- if ($arg =~ /$is_shell_special/) {
- $arg =~ s/($shell_escape_in_quote)/\\$1/g;
- return "\"$arg\"";
- }
- return $arg;
-}
-sub preview_files {
- my ($l,$skipempty,$footer,$n)=@_;
- $n=$tailn unless defined $n;
- my @f=grep { ! ($skipempty && -z $_) } @$l;
- my $fn=join(' ',map {escape_shell($_)} @f);
- my $cmd="tail -n $n $fn";
- unchecked_output("$cmd").($footer?"\nNONEMPTY FILES:\n$fn\n":"");
-}
-sub prefix_dirname($) {
- #like `dirname but if ends in / then return the whole thing
- local ($_)=@_;
- if (/\/$/) {
- $_;
- } else {
- s#/[^/]$##;
- $_ ? $_ : '';
- }
-}
-sub ensure_final_slash($) {
- local ($_)=@_;
- m#/$# ? $_ : ($_."/");
-}
-sub extend_path($$;$$) {
- my ($base,$ext,$mkdir,$baseisdir)=@_;
- if (-d $base) {
- $base.="/";
- } else {
- my $dir;
- if ($baseisdir) {
- $dir=$base;
- $base.='/' unless $base =~ /\/$/;
- } else {
- $dir=prefix_dirname($base);
- }
- my @cmd=("/bin/mkdir","-p",$dir);
- check_call(@cmd) if $mkdir;
- }
- return $base.$ext;
-}
-
-my $abscwd=abs_path(&getcwd);
-sub print_help;
-
-my $use_fork;
-my @pids;
-
-# Process command-line options
-unless (GetOptions(
- "stay-alive" => \$stay_alive,
- "recycle-clients" => \$recycle_clients,
- "error-dir=s" => \$errordir,
- "multi-line" => \$multiline,
- "workdir=s" => \$workdir,
- "use-fork" => \$use_fork,
- "verbose" => \$verbose,
- "jobs=i" => \$numnodes,
- "pmem=s" => \$pmem,
- "baseport=i" => \$basep,
-# "iport=i" => \$randp, #for short name -i
- "no-which!" => \$no_which,
- "no-cd!" => \$no_cd,
- "tailn=s" => \$tailn,
-) && scalar @ARGV){
- print_help();
- die "bad options.";
-}
-
-my $cmd = "";
-my $prog=shift;
-if ($no_which) {
- $cmd=$prog;
-} else {
- $cmd=check_output("which $prog");
- chomp $cmd;
- die "$prog not found - $cmd" unless $cmd;
-}
-#$cmd=abs_path($cmd);
-for my $arg (@ARGV) {
- $cmd .= " ".escape_shell($arg);
-}
-die "Please specify a command to parallelize\n" if $cmd eq '';
-
-my $cdcmd=$no_cd ? '' : ("cd ".escape_shell($abscwd)."\n");
-
-my $executable = $cmd;
-$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
-$executable=check_output("basename $executable");
-chomp $executable;
-
-
-print STDERR "Parallelizing ($numnodes ways): $cmd\n\n";
-
-# create -e dir and save .sh
-use File::Temp qw/tempdir/;
-unless ($errordir) {
- $errordir=tempdir("$executable.XXXXXX",CLEANUP=>1);
-}
-if ($errordir) {
- my $scriptfile=extend_path("$errordir/","$executable.sh",1,1);
- -d $errordir || die "should have created -e dir $errordir";
- open SF,">",$scriptfile || die;
- print SF "$cdcmd$cmd\n";
- close SF;
- chmod 0755,$scriptfile;
- $errordir=abs_path($errordir);
- &verbose("-e dir: $errordir");
-}
-
-# set cleanup handler
-my @cleanup_cmds;
-sub cleanup;
-sub cleanup_and_die;
-$SIG{INT} = "cleanup_and_die";
-$SIG{TERM} = "cleanup_and_die";
-$SIG{HUP} = "cleanup_and_die";
-
-# other subs:
-sub numof_live_jobs;
-sub launch_job_on_node;
-
-
-# vars
-my $mydir = check_output("dirname $0"); chomp $mydir;
-my $sentserver = "$mydir/sentserver";
-my $sentclient = "$mydir/sentclient";
-my $host = check_output("hostname");
-chomp $host;
-
-
-# find open port
-srand;
-my $port = 50300+int(rand($randp));
-my $endp=$port+$tryp;
-sub listening_port_lines {
- my $quiet=$verbose?'':'2>/dev/null';
- return unchecked_output("netstat -a -n $quiet | grep LISTENING | grep -i tcp");
-}
-my $netstat=&listening_port_lines;
-
-if ($verbose){ print STDERR "Testing port $port...";}
-
-while ($netstat=~/$port/ || &listening_port_lines=~/$port/){
- if ($verbose){ print STDERR "port is busy\n";}
- $port++;
- if ($port > $endp){
- die "Unable to find open port\n";
- }
- if ($verbose){ print STDERR "Testing port $port... "; }
-}
-if ($verbose){
- print STDERR "port $port is available\n";
-}
-
-my $key = int(rand()*1000000);
-
-my $multiflag = "";
-if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; }
-my $stay_alive_flag = "";
-if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
-
-my $node_count = 0;
-my $script = "";
-# fork == one thread runs the sentserver, while the
-# other spawns the sentclient commands.
-my $pid = fork;
-if ($pid == 0) { # child
- sleep 8; # give other thread time to start sentserver
- $script = "$cdcmd$sentclient $host:$port:$key $cmd";
-
- if ($verbose){
- print STDERR "Client script:\n====\n";
- print STDERR $script;
- print STDERR "====\n";
- }
- for (my $jobn=0; $jobn<$numnodes; $jobn++){
- launch_job();
- }
- if ($recycle_clients) {
- my $ret;
- my $livejobs;
- while (1) {
- $ret = waitpid($pid, WNOHANG);
- #print STDERR "waitpid $pid ret = $ret \n";
- last if ($ret != 0);
- $livejobs = numof_live_jobs();
- if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
- print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
- launch_job();
- } else {
- sleep 15;
- }
- }
- }
- print STDERR "CHILD PROCESSES SPAWNED ... WAITING\n";
- for my $p (@pids) {
- waitpid($p, 0);
- }
-} else {
-# my $todo = "$sentserver -k $key $multiflag $port ";
- my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
- if ($verbose){ print STDERR "Running: $todo\n"; }
- check_call($todo);
- print STDERR "Call to $sentserver returned.\n";
- cleanup();
- exit(0);
-}
-
-sub numof_live_jobs {
- if ($use_fork) {
- die "not implemented";
- } else {
- # We can probably continue decoding if the qstat error is only temporary
- my @livejobs = grep(/$joblist/, split(/\n/, unchecked_output("qstat")));
- return ($#livejobs + 1);
- }
-}
-my (@errors,@outs,@cmds);
-
-sub launch_job {
- if ($use_fork) { return launch_job_fork(); }
- my $errorfile = "/dev/null";
- my $outfile = "/dev/null";
- $node_count++;
- my $clientname = $executable;
- $clientname =~ s/^(.{4}).*$/$1/;
- $clientname = "$clientname.$node_count";
- if ($errordir){
- $errorfile = "$errordir/$clientname.ER";
- $outfile = "$errordir/$clientname.OU";
- push @errors,$errorfile;
- push @outs,$outfile;
- }
- my $todo = qsub_args($pmem) . " -N $clientname -o $outfile -e $errorfile";
- push @cmds,$todo;
-
- print STDERR "Running: $todo\n";
- local(*QOUT, *QIN);
- open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!";
- print QIN $script;
- close QIN;
- while (my $jobid=<QOUT>){
- chomp $jobid;
- if ($verbose){ print STDERR "Launched client job: $jobid"; }
- $jobid =~ s/^(\d+)(.*?)$/\1/g;
- $jobid =~ s/^Your job (\d+) .*$/\1/;
- print STDERR " short job id $jobid\n";
- if ($verbose){
- print STDERR "cd: $abscwd\n";
- print STDERR "cmd: $cmd\n";
- }
- if ($joblist == "") { $joblist = $jobid; }
- else {$joblist = $joblist . "\|" . $jobid; }
- my $cleanfn="qdel $jobid 2> /dev/null";
- push(@cleanup_cmds, $cleanfn);
- }
- close QOUT;
-}
-
-sub launch_job_fork {
- my $errorfile = "/dev/null";
- my $outfile = "/dev/null";
- $node_count++;
- my $clientname = $executable;
- $clientname =~ s/^(.{4}).*$/$1/;
- $clientname = "$clientname.$node_count";
- if ($errordir){
- $errorfile = "$errordir/$clientname.ER";
- $outfile = "$errordir/$clientname.OU";
- push @errors,$errorfile;
- push @outs,$outfile;
- }
- my $pid = fork;
- if ($pid == 0) {
- my ($fh, $scr_name) = get_temp_script();
- print $fh $script;
- close $fh;
- my $todo = "/bin/bash -xeo pipefail $scr_name 1> $outfile 2> $errorfile";
- print STDERR "EXEC: $todo\n";
- my $out = check_output("$todo");
- unlink $scr_name or warn "Failed to remove $scr_name";
- exit 0;
- } else {
- push @pids, $pid;
- }
-}
-
-sub get_temp_script {
- my ($fh, $filename) = tempfile( "$workdir/workXXXX", SUFFIX => '.sh');
- return ($fh, $filename);
-}
-
-sub cleanup_and_die {
- cleanup();
- die "\n";
-}
-
-sub cleanup {
- print STDERR "Cleaning up...\n";
- for $cmd (@cleanup_cmds){
- print STDERR " Cleanup command: $cmd\n";
- eval $cmd;
- }
- print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
- print STDERR "errors:\n",preview_files(\@errors,1),"\n";
- print STDERR "cmd:\n",$cmd,"\n";
- print STDERR " cat $errordir/*.ER\nfor logs.\n";
- print STDERR "Cleanup finished.\n";
-}
-
-sub print_help
-{
- my $name = check_output("basename $0"); chomp $name;
- print << "Help";
-
-usage: $name [options]
-
- Automatic black-box parallelization of commands.
-
-options:
-
- --use-fork
- Instead of using qsub, use fork.
-
- -e, --error-dir <dir>
- Retain output files from jobs in <dir>, rather
- than silently deleting them.
-
- -m, --multi-line
- Expect that command may produce multiple output
- lines for a single input line. $name makes a
- reasonable attempt to obtain all output before
- processing additional inputs. However, use of this
- option is inherently unsafe.
-
- -v, --verbose
- Print diagnostic informatoin on stderr.
-
- -j, --jobs
- Number of jobs to use.
-
- -p, --pmem
- pmem setting for each job.
-
-Help
-}