#!/usr/bin/env perl

# Author: Adam Lopez
#
# This script takes a command that processes input
# from stdin one-line-at-time, and parallelizes it
# on the cluster using David Chiang's sentserver/
# sentclient architecture.
#
# Prerequisites: the command *must* read each line
# without waiting for subsequent lines of input
# (for instance, a command which must read all lines
# of input before processing will not work) and
# return it to the output *without* buffering
# multiple lines.

#TODO: if -j 1, run immediately, not via sentserver?  possible differences in environment might make debugging harder

#ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps.  time cut down to 15s from 60s

use Getopt::Long;
use IPC::Open2;
use strict;
use POSIX ":sys_wait_h";
use Cwd qw(getcwd);

my $tailn=5; # +0 = concatenate all the client logs.  5 = last 5 lines
my $recycle_clients;		# spawn new clients when previous ones terminate
my $stay_alive;			# dont let server die when having zero clients
my $joblist = "";
my $errordir="";
my $multiline;
my @files_to_stage;
my $numnodes = 8;
my $user = $ENV{"USER"};
my $pmem = "9g";
my $basep=50300;
my $randp=300;
my $tryp=50;
my $no_which;
my $no_cd;

my $DEBUG=$ENV{DEBUG};
print STDERR "DEBUG=$DEBUG output enabled.\n" if $DEBUG;
my $verbose = 1;
sub verbose {
    if ($verbose) {
        print STDERR @_,"\n";
    }
}
sub debug {
    if ($DEBUG) {
        my ($package, $filename, $line) = caller;
        print STDERR "DEBUG: $filename($line): ",join(' ',@_),"\n";
    }
}
sub abspath($) {
    my $p=shift;
    my $a=`readlink -f $p`;
    chomp $a;
    $a
}
my $is_shell_special=qr.[ \t\n\\><|&;"'`~*?{}$!()].;
my $shell_escape_in_quote=qr.[\\"\$`!].;
sub escape_shell {
    my ($arg)=@_;
    return undef unless defined $arg;
    return '""' unless $arg;
    if ($arg =~ /$is_shell_special/) {
        $arg =~ s/($shell_escape_in_quote)/\\$1/g;
        return "\"$arg\"";
    }
    return $arg;
}
sub preview_files {
    my ($l,$skipempty,$footer,$n)=@_;
    $n=$tailn unless defined $n;
    my @f=grep { ! ($skipempty && -z $_) } @$l;
    my $fn=join(' ',map {escape_shell($_)} @f);
    my $cmd="tail -n $n $fn";
    `$cmd`.($footer?"\nNONEMPTY FILES:\n$fn\n":"");
}
sub prefix_dirname($) {
    #like `dirname but if ends in / then return the whole thing
    local ($_)=@_;
    if (/\/$/) {
        $_;
    } else {
        s#/[^/]$##;
        $_ ? $_ : '';
    }
}
sub ensure_final_slash($) {
    local ($_)=@_;
    m#/$# ? $_ : ($_."/");
}
sub extend_path($$;$$) {
    my ($base,$ext,$mkdir,$baseisdir)=@_;
    if (-d $base) {
        $base.="/";
    } else {
        my $dir;
        if ($baseisdir) {
            $dir=$base;
            $base.='/' unless $base =~ /\/$/;
        } else {
            $dir=prefix_dirname($base);
        }
        my @cmd=("/bin/mkdir","-p",$dir);
        system(@cmd) if $mkdir;
    }
    return $base.$ext;
}

my $abscwd=abspath(&getcwd);
sub print_help;

# Process command-line options
unless (GetOptions(
			"stay-alive" => \$stay_alive,
			"recycle-clients" => \$recycle_clients,
			"error-dir=s" => \$errordir,
			"multi-line" => \$multiline,
			"file=s" => \@files_to_stage,
			"verbose" => \$verbose,
			"jobs=i" => \$numnodes,
			"pmem=s" => \$pmem,
        "baseport=i" => \$basep,
#       "iport=i" => \$randp, #for short name -i
        "no-which!" => \$no_which,
            "no-cd!" => \$no_cd,
            "tailn=s" => \$tailn,
) && scalar @ARGV){
	print_help();
    die "bad options.";
}

my $cmd = "";
my $prog=shift;
if ($no_which) {
    $cmd=$prog;
} else {
    $cmd=`which $prog`;
    chomp $cmd;
    die "$prog not found - $cmd" unless $cmd;
}
#$cmd=abspath($cmd);
for my $arg (@ARGV) {
    $cmd .= " ".escape_shell($arg);
}
die "Please specify a command to parallelize\n" if $cmd eq '';

my $cdcmd=$no_cd ? '' : ("cd ".escape_shell($abscwd)."\n");

my $executable = $cmd;
$executable =~ s/^\s*(\S+)($|\s.*)/$1/;
$executable=`basename $executable`;
chomp $executable;


if ($verbose){ print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; }

# create -e dir and save .sh
use File::Temp qw/tempdir/;
unless ($errordir) {
    $errordir=tempdir("$executable.XXXXXX",CLEANUP=>1);
}
if ($errordir) {
    my $scriptfile=extend_path("$errordir/","$executable.sh",1,1);
    -d $errordir || die "should have created -e dir $errordir";
    open SF,">",$scriptfile || die;
    print SF "$cdcmd$cmd\n";
    close SF;
    chmod 0755,$scriptfile;
    $errordir=abspath($errordir);
    &verbose("-e dir: $errordir");
}

# set cleanup handler
my @cleanup_cmds;
sub cleanup;
sub cleanup_and_die;
$SIG{INT} = "cleanup_and_die";
$SIG{TERM} = "cleanup_and_die";
$SIG{HUP} = "cleanup_and_die";

# other subs:
sub numof_live_jobs;
sub launch_job_on_node;


# vars
my $mydir = `dirname $0`; chomp $mydir;
my $sentserver = "$mydir/sentserver";
my $sentclient = "$mydir/sentclient";
my $host = `hostname`;
chomp $host;


# find open port
srand;
my $port = 50300+int(rand($randp));
my $endp=$port+$tryp;
sub listening_port_lines {
    my $quiet=$verbose?'':'2>/dev/null';
    `netstat -a -n $quiet | grep LISTENING | grep -i tcp`
}
my $netstat=&listening_port_lines;

if ($verbose){ print STDERR "Testing port $port...";}

while ($netstat=~/$port/ || &listening_port_lines=~/$port/){
	if ($verbose){ print STDERR "port is busy\n";}
	$port++;
	if ($port > $endp){
		die "Unable to find open port\n";
	}
	if ($verbose){ print STDERR "Testing port $port... "; }
}
if ($verbose){
	print STDERR "port $port is available\n";
}

my $key = int(rand()*1000000);

my $multiflag = "";
if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"; }
my $stay_alive_flag = "";
if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }

my %node_count;
my $script = "";
# fork == one thread runs the sentserver, while the
# other spawns the sentclient commands.
if (my $pid = fork){
	  sleep 4; # give other thread time to start sentserver
    $script =
        qq{wait
$cdcmd$sentclient $host:$port:$key $cmd
};
	if ($verbose){
		print STDERR "Client script:\n====\n";
		print STDERR $script;
		print STDERR "====\n";
	}
	for (my $jobn=0; $jobn<$numnodes; $jobn++){
		launch_job_on_node(1);
	}
	if ($recycle_clients) {
		my $ret;
		my $livejobs;
		while (1) {
			$ret = waitpid($pid, WNOHANG);
			#print STDERR "waitpid $pid ret = $ret \n";
			last if ($ret != 0);
			$livejobs = numof_live_jobs();
			if ($numnodes >= $livejobs ) {	# a client terminated, OR # lines of input was less than -j
				print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
				launch_job_on_node(1);	# TODO: support named nodes
			} else {
				sleep 15;
			}
		}
	}
	waitpid($pid, 0);
	cleanup();
} else {
#	my $todo = "$sentserver -k $key $multiflag $port ";
	my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
	if ($verbose){ print STDERR "Running: $todo\n"; }
	my $rc = system($todo);
	if ($rc){
		die "Error: sentserver returned code $rc\n";
	}
}

sub numof_live_jobs {
	my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
	return ($#livejobs + 1);
}
my (@errors,@outs,@cmds);
sub launch_job_on_node {
		my $node = $_[0];

		my $errorfile = "/dev/null";
		my $outfile = "/dev/null";
		unless (exists($node_count{$node})){
			$node_count{$node} = 0;
		}
		my $node_count = $node_count{$node};
		$node_count{$node}++;
		my $clientname = $executable;
		$clientname =~ s/^(.{4}).*$/$1/;
		$clientname = "$clientname.$node.$node_count";
		if ($errordir){
			$errorfile = "$errordir/$clientname.ER";
			$outfile = "$errordir/$clientname.OU";
            push @errors,$errorfile;
            push @outs,$outfile;
		}
		my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile";
        push @cmds,$todo;

		if ($verbose){ print STDERR "Running: $todo\n"; }
		local(*QOUT, *QIN);
		open2(\*QOUT, \*QIN, $todo);
		print QIN $script;
		close QIN;
		while (my $jobid=<QOUT>){
			chomp $jobid;
			if ($verbose){ print STDERR "Launched client job: $jobid"; }
			$jobid =~ s/^(\d+)(.*?)$/\1/g;
            $jobid =~ s/^Your job (\d+) .*$/\1/;
			print STDERR " short job id $jobid\n";
            if ($verbose){
                print STDERR "cd: $abscwd\n";
                print STDERR "cmd: $cmd\n";
            }
			if ($joblist == "") { $joblist = $jobid; }
			else {$joblist = $joblist . "\|" . $jobid; }
            my $cleanfn="`qdel $jobid 2> /dev/null`";
			push(@cleanup_cmds, $cleanfn);
		}
		close QOUT;
}


sub cleanup_and_die {
	cleanup();
	die "\n";
}

sub cleanup {
	if ($verbose){ print STDERR "Cleaning up...\n"; }
	for $cmd (@cleanup_cmds){
		if ($verbose){ print STDERR "  $cmd\n"; }
		eval $cmd;
	}
    print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
    print STDERR "errors:\n",preview_files(\@errors,1),"\n";
    print STDERR "cmd:\n",$cmd,"\n";
    print STDERR " cat $errordir/*.ER\nfor logs.\n";
	if ($verbose){ print STDERR "Cleanup finished.\n"; }
}

sub print_help
{
	my $name = `basename $0`; chomp $name;
	print << "Help";

usage: $name [options]

	Automatic black-box parallelization of commands.

options:

	-e, --error-dir <dir>
		Retain output files from jobs in <dir>, rather
		than silently deleting them.

	-m, --multi-line
		Expect that command may produce multiple output
		lines for a single input line.  $name makes a
		reasonable attempt to obtain all output before
		processing additional inputs.  However, use of this
		option is inherently unsafe.

	-v, --verbose
		Print diagnostic informatoin on stderr.

	-j, --jobs
    Number of jobs to use.

	-p, --pmem
		pmem setting for each job.

Help
}