From d24aa658436cbae3404146c106b0b7569eac60ed Mon Sep 17 00:00:00 2001 From: redpony Date: Sat, 17 Jul 2010 18:17:34 +0000 Subject: more support for other clusters git-svn-id: https://ws10smt.googlecode.com/svn/trunk@307 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pipeline/clsp.config | 1 + gi/pipeline/evaluation-pipeline.pl | 16 ++- gi/pipeline/valhalla.config | 1 + vest/dist-vest.pl | 6 +- vest/parallelize.pl | 288 +++++++++++++++++++++---------------- 5 files changed, 182 insertions(+), 130 deletions(-) diff --git a/gi/pipeline/clsp.config b/gi/pipeline/clsp.config index 27161fab..f7f131a0 100644 --- a/gi/pipeline/clsp.config +++ b/gi/pipeline/clsp.config @@ -1,5 +1,6 @@ # THIS FILE GIVES THE LOCATIONS OF THE CORPORA USED # name path aligned-corpus LM dev dev-refs test1 testt-eval.sh ... +/export/ws10smt/data btec /export/ws10smt/data/btec/ split.zh-en.al lm/en.3gram.lm.gz devtest/devset1_2.zh devtest/devset1_2.lc.en* devtest/devset3.zh eval-devset3.sh fbis /export/ws10smt/data/chinese-english.fbis corpus.zh-en.al zhen /export/ws10smt/data/chinese-english corpus.zh-en.al diff --git a/gi/pipeline/evaluation-pipeline.pl b/gi/pipeline/evaluation-pipeline.pl index c6dcca05..178159b9 100755 --- a/gi/pipeline/evaluation-pipeline.pl +++ b/gi/pipeline/evaluation-pipeline.pl @@ -6,6 +6,8 @@ my $CWD = getcwd; my $SCRIPT_DIR; BEGIN { use Cwd qw/ abs_path /; use File::Basename; $SCRIPT_DIR = dirname(abs_path($0)); push @INC, $SCRIPT_DIR; } +my $JOBS = 15; + # featurize_grammar may add multiple features from a single feature extractor # the key in this map is the extractor name, the value is a list of the extracted features my $feat_map = { @@ -89,6 +91,7 @@ my %devs; my %devrefs; my %tests; my %testevals; +my $datadir; print STDERR " LANGUAGE PAIRS:"; while() { chomp; @@ -96,6 +99,7 @@ while() { next if /^\s*$/; s/^\s+//; s/\s+$//; + if (! defined $datadir) { $datadir = $_; next; } my ($name, $path, $corpus, $lm, $dev, $devref, @xtests) = split /\s+/; $paths{$name} = $path; $corpora{$name} = $corpus; @@ -116,15 +120,19 @@ my $FEATURIZER_OPTS = ''; my $dataDir = '/export/ws10smt/data'; my @features; my $bkoffgram; +my $usefork; if (GetOptions( "backoff_grammar" => \$bkoffgram, "data=s" => \$dataDir, "features=s@" => \@features, + "use-fork" => \$usefork, + "jobs=i" => \$JOBS, "out-dir=s" => \$outdir, ) == 0 || @ARGV!=2 || $help) { print_help(); exit; } +if ($usefork) { $usefork="--use-fork"; } else { $usefork = ''; } my @fkeys = keys %$feat_map; push(@features, "BackoffRule") if $bkoffgram; die "You must specify one or more features with -f. Known features: @fkeys\n" unless scalar @features > 0; @@ -200,7 +208,7 @@ my $tuned_weights = mydircat($outdir, 'weights.tuned'); if (-f $tuned_weights) { print STDERR "TUNED WEIGHTS $tuned_weights EXISTS: REUSING\n"; } else { - my $cmd = "$DISTVEST --ref-files=$drefs --source-file=$dev --weights $weights $devini"; + my $cmd = "$DISTVEST $usefork --decode-nodes $JOBS --ref-files=$drefs --source-file=$dev --weights $weights $devini"; print STDERR "MERT COMMAND: $cmd\n"; `rm -rf $outdir/vest 2> /dev/null`; chdir $outdir or die "Can't chdir to $outdir: $!"; @@ -216,7 +224,7 @@ if (-f $tuned_weights) { print STDERR "\nDECODE TEST SET\n"; my $decolog = mydircat($outdir, "test-decode.log"); my $testtrans = mydircat($outdir, "test.trans"); -my $cmd = "cat $test | $PARALLELIZE -j 20 -e $decolog -- $CDEC -c $testini -w $tuned_weights > $testtrans"; +my $cmd = "cat $test | $PARALLELIZE $usefork -j $JOBS -e $decolog -- $CDEC -c $testini -w $tuned_weights > $testtrans"; safesystem($testtrans, $cmd) or die "Failed to decode test set!"; @@ -292,8 +300,8 @@ sub write_cdec_ini { formalism=scfg cubepruning_pop_limit=100 add_pass_through_rules=true -scfg_extra_glue_grammar=/export/ws10smt/data/glue/glue.scfg.gz -grammar=/export/ws10smt/data/oov.scfg.gz +scfg_extra_glue_grammar=$datadir/glue/glue.scfg.gz +grammar=$datadir/oov.scfg.gz grammar=$grammar_path scfg_default_nt=OOV scfg_no_hiero_glue_grammar=true diff --git a/gi/pipeline/valhalla.config b/gi/pipeline/valhalla.config index 503cbd4a..e00a8485 100644 --- a/gi/pipeline/valhalla.config +++ b/gi/pipeline/valhalla.config @@ -1,5 +1,6 @@ # THIS FILE GIVES THE LOCATIONS OF THE CORPORA USED # name path aligned-corpus LM dev dev-refs test1 testt-eval.sh ... +/home/chris/ws10smt/data btec /home/chris/ws10smt/data/btec/ split.zh-en.al lm/en.3gram.lm.gz devtest/devset1_2.zh devtest/devset1_2.lc.en* devtest/devset3.zh eval-devset3.sh fbis /home/chris/ws10smt/data/chinese-english.fbis corpus.zh-en.al zhen /home/chris/ws10smt/data/chinese-english corpus.zh-en.al diff --git a/vest/dist-vest.pl b/vest/dist-vest.pl index 666509e3..8acec7a9 100755 --- a/vest/dist-vest.pl +++ b/vest/dist-vest.pl @@ -56,6 +56,7 @@ my $maxsim=0; my $oraclen=0; my $oracleb=20; my $dirargs=''; +my $usefork; # Process command-line options Getopt::Long::Configure("no_auto_abbrev"); @@ -63,6 +64,7 @@ if (GetOptions( "decoder=s" => \$decoderOpt, "decode-nodes=i" => \$decode_nodes, "dont-clean" => \$disable_clean, + "use-fork" => \$usefork, "dry-run" => \$dryrun, "epsilon=s" => \$epsilon, "help" => \$help, @@ -89,6 +91,8 @@ if (GetOptions( exit; } +if ($usefork) { $usefork = "--use-fork"; } else { $usefork = ''; } + if ($metric =~ /^(combi|ter)$/i) { $lines_per_mapper = 40; } @@ -230,7 +234,7 @@ while (1){ my $im1 = $iteration - 1; my $weightsFile="$dir/weights.$im1"; my $decoder_cmd = "$decoder -c $iniFile -w $weightsFile -O $dir/hgs"; - my $pcmd = "cat $srcFile | $parallelize -p $pmem -e $logdir -j $decode_nodes -- "; + my $pcmd = "cat $srcFile | $parallelize $usefork -p $pmem -e $logdir -j $decode_nodes -- "; if ($run_local) { $pcmd = "cat $srcFile |"; } my $cmd = $pcmd . "$decoder_cmd 2> $decoderLog 1> $runFile"; print STDERR "COMMAND:\n$cmd\n"; diff --git a/vest/parallelize.pl b/vest/parallelize.pl index a006d924..be12eefb 100755 --- a/vest/parallelize.pl +++ b/vest/parallelize.pl @@ -18,6 +18,7 @@ #ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s +use File::Temp qw/ tempfile /; use Getopt::Long; use IPC::Open2; use strict; @@ -25,8 +26,8 @@ use POSIX ":sys_wait_h"; use Cwd qw(getcwd); my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines -my $recycle_clients; # spawn new clients when previous ones terminate -my $stay_alive; # dont let server die when having zero clients +my $recycle_clients; # spawn new clients when previous ones terminate +my $stay_alive; # dont let server die when having zero clients my $joblist = ""; my $errordir=""; my $multiline; @@ -115,23 +116,27 @@ sub extend_path($$;$$) { my $abscwd=abspath(&getcwd); sub print_help; +my $use_fork; +my @pids; + # Process command-line options unless (GetOptions( - "stay-alive" => \$stay_alive, - "recycle-clients" => \$recycle_clients, - "error-dir=s" => \$errordir, - "multi-line" => \$multiline, - "file=s" => \@files_to_stage, - "verbose" => \$verbose, - "jobs=i" => \$numnodes, - "pmem=s" => \$pmem, + "stay-alive" => \$stay_alive, + "recycle-clients" => \$recycle_clients, + "error-dir=s" => \$errordir, + "multi-line" => \$multiline, + "file=s" => \@files_to_stage, + "use-fork" => \$use_fork, + "verbose" => \$verbose, + "jobs=i" => \$numnodes, + "pmem=s" => \$pmem, "baseport=i" => \$basep, # "iport=i" => \$randp, #for short name -i "no-which!" => \$no_which, "no-cd!" => \$no_cd, "tailn=s" => \$tailn, ) && scalar @ARGV){ - print_help(); + print_help(); die "bad options."; } @@ -158,7 +163,7 @@ $executable=`basename $executable`; chomp $executable; -if ($verbose){ print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; } +print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; # create -e dir and save .sh use File::Temp qw/tempdir/; @@ -210,15 +215,15 @@ my $netstat=&listening_port_lines; if ($verbose){ print STDERR "Testing port $port...";} while ($netstat=~/$port/ || &listening_port_lines=~/$port/){ - if ($verbose){ print STDERR "port is busy\n";} - $port++; - if ($port > $endp){ - die "Unable to find open port\n"; - } - if ($verbose){ print STDERR "Testing port $port... "; } + if ($verbose){ print STDERR "port is busy\n";} + $port++; + if ($port > $endp){ + die "Unable to find open port\n"; + } + if ($verbose){ print STDERR "Testing port $port... "; } } if ($verbose){ - print STDERR "port $port is available\n"; + print STDERR "port $port is available\n"; } my $key = int(rand()*1000000); @@ -228,151 +233,184 @@ if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n" my $stay_alive_flag = ""; if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; } -my %node_count; +my $node_count = 0; my $script = ""; # fork == one thread runs the sentserver, while the # other spawns the sentclient commands. -if (my $pid = fork){ - sleep 4; # give other thread time to start sentserver - $script = - qq{wait +if (my $pid = fork) { + sleep 4; # give other thread time to start sentserver + $script = + qq{wait $cdcmd$sentclient $host:$port:$key $cmd }; - if ($verbose){ - print STDERR "Client script:\n====\n"; - print STDERR $script; - print STDERR "====\n"; - } - for (my $jobn=0; $jobn<$numnodes; $jobn++){ - launch_job_on_node(1); - } - if ($recycle_clients) { - my $ret; - my $livejobs; - while (1) { - $ret = waitpid($pid, WNOHANG); - #print STDERR "waitpid $pid ret = $ret \n"; - last if ($ret != 0); - $livejobs = numof_live_jobs(); - if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j - print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n"; - launch_job_on_node(1); # TODO: support named nodes - } else { - sleep 15; - } - } - } - waitpid($pid, 0); - cleanup(); + if ($verbose){ + print STDERR "Client script:\n====\n"; + print STDERR $script; + print STDERR "====\n"; + } + for (my $jobn=0; $jobn<$numnodes; $jobn++){ + launch_job(); + } + if ($recycle_clients) { + my $ret; + my $livejobs; + while (1) { + $ret = waitpid($pid, WNOHANG); + #print STDERR "waitpid $pid ret = $ret \n"; + last if ($ret != 0); + $livejobs = numof_live_jobs(); + if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j + print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n"; + launch_job(); + } else { + sleep 15; + } + } + } + waitpid($pid, 0); + cleanup(); } else { -# my $todo = "$sentserver -k $key $multiflag $port "; - my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; - if ($verbose){ print STDERR "Running: $todo\n"; } - my $rc = system($todo); - if ($rc){ - die "Error: sentserver returned code $rc\n"; - } +# my $todo = "$sentserver -k $key $multiflag $port "; + my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; + if ($verbose){ print STDERR "Running: $todo\n"; } + my $rc = system($todo); + if ($rc){ + die "Error: sentserver returned code $rc\n"; + } } sub numof_live_jobs { - my @livejobs = grep(/$joblist/, split(/\n/, `qstat`)); - return ($#livejobs + 1); + if ($use_fork) { + die "not implemented"; + } else { + my @livejobs = grep(/$joblist/, split(/\n/, `qstat`)); + return ($#livejobs + 1); + } } my (@errors,@outs,@cmds); -sub launch_job_on_node { - my $node = $_[0]; - - my $errorfile = "/dev/null"; - my $outfile = "/dev/null"; - unless (exists($node_count{$node})){ - $node_count{$node} = 0; - } - my $node_count = $node_count{$node}; - $node_count{$node}++; - my $clientname = $executable; - $clientname =~ s/^(.{4}).*$/$1/; - $clientname = "$clientname.$node.$node_count"; - if ($errordir){ - $errorfile = "$errordir/$clientname.ER"; - $outfile = "$errordir/$clientname.OU"; - push @errors,$errorfile; - push @outs,$outfile; - } - my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile"; - push @cmds,$todo; - - if ($verbose){ print STDERR "Running: $todo\n"; } - local(*QOUT, *QIN); - open2(\*QOUT, \*QIN, $todo); - print QIN $script; - close QIN; - while (my $jobid=){ - chomp $jobid; - if ($verbose){ print STDERR "Launched client job: $jobid"; } - $jobid =~ s/^(\d+)(.*?)$/\1/g; + +sub launch_job { + if ($use_fork) { return launch_job_fork(); } + my $errorfile = "/dev/null"; + my $outfile = "/dev/null"; + $node_count++; + my $clientname = $executable; + $clientname =~ s/^(.{4}).*$/$1/; + $clientname = "$clientname.$node_count"; + if ($errordir){ + $errorfile = "$errordir/$clientname.ER"; + $outfile = "$errordir/$clientname.OU"; + push @errors,$errorfile; + push @outs,$outfile; + } + my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile"; + push @cmds,$todo; + + print STDERR "Running: $todo\n"; + local(*QOUT, *QIN); + open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!"; + print QIN $script; + close QIN; + while (my $jobid=){ + chomp $jobid; + if ($verbose){ print STDERR "Launched client job: $jobid"; } + $jobid =~ s/^(\d+)(.*?)$/\1/g; $jobid =~ s/^Your job (\d+) .*$/\1/; - print STDERR " short job id $jobid\n"; + print STDERR " short job id $jobid\n"; if ($verbose){ print STDERR "cd: $abscwd\n"; print STDERR "cmd: $cmd\n"; } - if ($joblist == "") { $joblist = $jobid; } - else {$joblist = $joblist . "\|" . $jobid; } + if ($joblist == "") { $joblist = $jobid; } + else {$joblist = $joblist . "\|" . $jobid; } my $cleanfn="`qdel $jobid 2> /dev/null`"; - push(@cleanup_cmds, $cleanfn); - } - close QOUT; + push(@cleanup_cmds, $cleanfn); + } + close QOUT; } +sub launch_job_fork { + my $errorfile = "/dev/null"; + my $outfile = "/dev/null"; + $node_count++; + my $clientname = $executable; + $clientname =~ s/^(.{4}).*$/$1/; + $clientname = "$clientname.$node_count"; + if ($errordir){ + $errorfile = "$errordir/$clientname.ER"; + $outfile = "$errordir/$clientname.OU"; + push @errors,$errorfile; + push @outs,$outfile; + } + if (my $pid = fork) { + my ($fh, $scr_name) = get_temp_script(); + print $fh $script; + close $fh; + my $todo = "/bin/sh $scr_name 1> $outfile 2> $errorfile"; + print STDERR "EXEC: $todo\n"; + my $out = `$todo`; + print STDERR "RES: $out\n"; + unlink $scr_name or warn "Failed to remove $scr_name"; + exit 0; + } +} + +sub get_temp_script { + my ($fh, $filename) = tempfile( "workXXXX", SUFFIX => '.sh'); + return ($fh, $filename); +} sub cleanup_and_die { - cleanup(); - die "\n"; + cleanup(); + die "\n"; } sub cleanup { - if ($verbose){ print STDERR "Cleaning up...\n"; } - for $cmd (@cleanup_cmds){ - if ($verbose){ print STDERR " $cmd\n"; } - eval $cmd; - } - print STDERR "outputs:\n",preview_files(\@outs,1),"\n"; - print STDERR "errors:\n",preview_files(\@errors,1),"\n"; - print STDERR "cmd:\n",$cmd,"\n"; - print STDERR " cat $errordir/*.ER\nfor logs.\n"; - if ($verbose){ print STDERR "Cleanup finished.\n"; } + print STDERR "Cleaning up...\n"; + for $cmd (@cleanup_cmds){ + print STDERR " Cleanup command: $cmd\n"; + eval $cmd; + } + print STDERR "outputs:\n",preview_files(\@outs,1),"\n"; + print STDERR "errors:\n",preview_files(\@errors,1),"\n"; + print STDERR "cmd:\n",$cmd,"\n"; + print STDERR " cat $errordir/*.ER\nfor logs.\n"; + print STDERR "Cleanup finished.\n"; } sub print_help { - my $name = `basename $0`; chomp $name; - print << "Help"; + my $name = `basename $0`; chomp $name; + print << "Help"; usage: $name [options] - Automatic black-box parallelization of commands. + Automatic black-box parallelization of commands. options: - -e, --error-dir - Retain output files from jobs in , rather - than silently deleting them. + --fork + Instead of using qsub, use fork. + + -e, --error-dir + Retain output files from jobs in , rather + than silently deleting them. - -m, --multi-line - Expect that command may produce multiple output - lines for a single input line. $name makes a - reasonable attempt to obtain all output before - processing additional inputs. However, use of this - option is inherently unsafe. + -m, --multi-line + Expect that command may produce multiple output + lines for a single input line. $name makes a + reasonable attempt to obtain all output before + processing additional inputs. However, use of this + option is inherently unsafe. - -v, --verbose - Print diagnostic informatoin on stderr. + -v, --verbose + Print diagnostic informatoin on stderr. - -j, --jobs + -j, --jobs Number of jobs to use. - -p, --pmem - pmem setting for each job. + -p, --pmem + pmem setting for each job. Help } -- cgit v1.2.3