summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gi/pipeline/clsp.config1
-rwxr-xr-xgi/pipeline/evaluation-pipeline.pl16
-rw-r--r--gi/pipeline/valhalla.config1
-rwxr-xr-xvest/dist-vest.pl6
-rwxr-xr-xvest/parallelize.pl288
5 files changed, 182 insertions, 130 deletions
diff --git a/gi/pipeline/clsp.config b/gi/pipeline/clsp.config
index 27161fab..f7f131a0 100644
--- a/gi/pipeline/clsp.config
+++ b/gi/pipeline/clsp.config
@@ -1,5 +1,6 @@
# THIS FILE GIVES THE LOCATIONS OF THE CORPORA USED
# name path aligned-corpus LM dev dev-refs test1 testt-eval.sh ...
+/export/ws10smt/data
btec /export/ws10smt/data/btec/ split.zh-en.al lm/en.3gram.lm.gz devtest/devset1_2.zh devtest/devset1_2.lc.en* devtest/devset3.zh eval-devset3.sh
fbis /export/ws10smt/data/chinese-english.fbis corpus.zh-en.al
zhen /export/ws10smt/data/chinese-english corpus.zh-en.al
diff --git a/gi/pipeline/evaluation-pipeline.pl b/gi/pipeline/evaluation-pipeline.pl
index c6dcca05..178159b9 100755
--- a/gi/pipeline/evaluation-pipeline.pl
+++ b/gi/pipeline/evaluation-pipeline.pl
@@ -6,6 +6,8 @@ my $CWD = getcwd;
my $SCRIPT_DIR; BEGIN { use Cwd qw/ abs_path /; use File::Basename; $SCRIPT_DIR = dirname(abs_path($0)); push @INC, $SCRIPT_DIR; }
+my $JOBS = 15;
+
# featurize_grammar may add multiple features from a single feature extractor
# the key in this map is the extractor name, the value is a list of the extracted features
my $feat_map = {
@@ -89,6 +91,7 @@ my %devs;
my %devrefs;
my %tests;
my %testevals;
+my $datadir;
print STDERR " LANGUAGE PAIRS:";
while(<CONF>) {
chomp;
@@ -96,6 +99,7 @@ while(<CONF>) {
next if /^\s*$/;
s/^\s+//;
s/\s+$//;
+ if (! defined $datadir) { $datadir = $_; next; }
my ($name, $path, $corpus, $lm, $dev, $devref, @xtests) = split /\s+/;
$paths{$name} = $path;
$corpora{$name} = $corpus;
@@ -116,15 +120,19 @@ my $FEATURIZER_OPTS = '';
my $dataDir = '/export/ws10smt/data';
my @features;
my $bkoffgram;
+my $usefork;
if (GetOptions(
"backoff_grammar" => \$bkoffgram,
"data=s" => \$dataDir,
"features=s@" => \@features,
+ "use-fork" => \$usefork,
+ "jobs=i" => \$JOBS,
"out-dir=s" => \$outdir,
) == 0 || @ARGV!=2 || $help) {
print_help();
exit;
}
+if ($usefork) { $usefork="--use-fork"; } else { $usefork = ''; }
my @fkeys = keys %$feat_map;
push(@features, "BackoffRule") if $bkoffgram;
die "You must specify one or more features with -f. Known features: @fkeys\n" unless scalar @features > 0;
@@ -200,7 +208,7 @@ my $tuned_weights = mydircat($outdir, 'weights.tuned');
if (-f $tuned_weights) {
print STDERR "TUNED WEIGHTS $tuned_weights EXISTS: REUSING\n";
} else {
- my $cmd = "$DISTVEST --ref-files=$drefs --source-file=$dev --weights $weights $devini";
+ my $cmd = "$DISTVEST $usefork --decode-nodes $JOBS --ref-files=$drefs --source-file=$dev --weights $weights $devini";
print STDERR "MERT COMMAND: $cmd\n";
`rm -rf $outdir/vest 2> /dev/null`;
chdir $outdir or die "Can't chdir to $outdir: $!";
@@ -216,7 +224,7 @@ if (-f $tuned_weights) {
print STDERR "\nDECODE TEST SET\n";
my $decolog = mydircat($outdir, "test-decode.log");
my $testtrans = mydircat($outdir, "test.trans");
-my $cmd = "cat $test | $PARALLELIZE -j 20 -e $decolog -- $CDEC -c $testini -w $tuned_weights > $testtrans";
+my $cmd = "cat $test | $PARALLELIZE $usefork -j $JOBS -e $decolog -- $CDEC -c $testini -w $tuned_weights > $testtrans";
safesystem($testtrans, $cmd) or die "Failed to decode test set!";
@@ -292,8 +300,8 @@ sub write_cdec_ini {
formalism=scfg
cubepruning_pop_limit=100
add_pass_through_rules=true
-scfg_extra_glue_grammar=/export/ws10smt/data/glue/glue.scfg.gz
-grammar=/export/ws10smt/data/oov.scfg.gz
+scfg_extra_glue_grammar=$datadir/glue/glue.scfg.gz
+grammar=$datadir/oov.scfg.gz
grammar=$grammar_path
scfg_default_nt=OOV
scfg_no_hiero_glue_grammar=true
diff --git a/gi/pipeline/valhalla.config b/gi/pipeline/valhalla.config
index 503cbd4a..e00a8485 100644
--- a/gi/pipeline/valhalla.config
+++ b/gi/pipeline/valhalla.config
@@ -1,5 +1,6 @@
# THIS FILE GIVES THE LOCATIONS OF THE CORPORA USED
# name path aligned-corpus LM dev dev-refs test1 testt-eval.sh ...
+/home/chris/ws10smt/data
btec /home/chris/ws10smt/data/btec/ split.zh-en.al lm/en.3gram.lm.gz devtest/devset1_2.zh devtest/devset1_2.lc.en* devtest/devset3.zh eval-devset3.sh
fbis /home/chris/ws10smt/data/chinese-english.fbis corpus.zh-en.al
zhen /home/chris/ws10smt/data/chinese-english corpus.zh-en.al
diff --git a/vest/dist-vest.pl b/vest/dist-vest.pl
index 666509e3..8acec7a9 100755
--- a/vest/dist-vest.pl
+++ b/vest/dist-vest.pl
@@ -56,6 +56,7 @@ my $maxsim=0;
my $oraclen=0;
my $oracleb=20;
my $dirargs='';
+my $usefork;
# Process command-line options
Getopt::Long::Configure("no_auto_abbrev");
@@ -63,6 +64,7 @@ if (GetOptions(
"decoder=s" => \$decoderOpt,
"decode-nodes=i" => \$decode_nodes,
"dont-clean" => \$disable_clean,
+ "use-fork" => \$usefork,
"dry-run" => \$dryrun,
"epsilon=s" => \$epsilon,
"help" => \$help,
@@ -89,6 +91,8 @@ if (GetOptions(
exit;
}
+if ($usefork) { $usefork = "--use-fork"; } else { $usefork = ''; }
+
if ($metric =~ /^(combi|ter)$/i) {
$lines_per_mapper = 40;
}
@@ -230,7 +234,7 @@ while (1){
my $im1 = $iteration - 1;
my $weightsFile="$dir/weights.$im1";
my $decoder_cmd = "$decoder -c $iniFile -w $weightsFile -O $dir/hgs";
- my $pcmd = "cat $srcFile | $parallelize -p $pmem -e $logdir -j $decode_nodes -- ";
+ my $pcmd = "cat $srcFile | $parallelize $usefork -p $pmem -e $logdir -j $decode_nodes -- ";
if ($run_local) { $pcmd = "cat $srcFile |"; }
my $cmd = $pcmd . "$decoder_cmd 2> $decoderLog 1> $runFile";
print STDERR "COMMAND:\n$cmd\n";
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
index a006d924..be12eefb 100755
--- a/vest/parallelize.pl
+++ b/vest/parallelize.pl
@@ -18,6 +18,7 @@
#ANNOYANCE: if input is shorter than -j n lines, or at the very last few lines, repeatedly sleeps. time cut down to 15s from 60s
+use File::Temp qw/ tempfile /;
use Getopt::Long;
use IPC::Open2;
use strict;
@@ -25,8 +26,8 @@ use POSIX ":sys_wait_h";
use Cwd qw(getcwd);
my $tailn=5; # +0 = concatenate all the client logs. 5 = last 5 lines
-my $recycle_clients; # spawn new clients when previous ones terminate
-my $stay_alive; # dont let server die when having zero clients
+my $recycle_clients; # spawn new clients when previous ones terminate
+my $stay_alive; # dont let server die when having zero clients
my $joblist = "";
my $errordir="";
my $multiline;
@@ -115,23 +116,27 @@ sub extend_path($$;$$) {
my $abscwd=abspath(&getcwd);
sub print_help;
+my $use_fork;
+my @pids;
+
# Process command-line options
unless (GetOptions(
- "stay-alive" => \$stay_alive,
- "recycle-clients" => \$recycle_clients,
- "error-dir=s" => \$errordir,
- "multi-line" => \$multiline,
- "file=s" => \@files_to_stage,
- "verbose" => \$verbose,
- "jobs=i" => \$numnodes,
- "pmem=s" => \$pmem,
+ "stay-alive" => \$stay_alive,
+ "recycle-clients" => \$recycle_clients,
+ "error-dir=s" => \$errordir,
+ "multi-line" => \$multiline,
+ "file=s" => \@files_to_stage,
+ "use-fork" => \$use_fork,
+ "verbose" => \$verbose,
+ "jobs=i" => \$numnodes,
+ "pmem=s" => \$pmem,
"baseport=i" => \$basep,
# "iport=i" => \$randp, #for short name -i
"no-which!" => \$no_which,
"no-cd!" => \$no_cd,
"tailn=s" => \$tailn,
) && scalar @ARGV){
- print_help();
+ print_help();
die "bad options.";
}
@@ -158,7 +163,7 @@ $executable=`basename $executable`;
chomp $executable;
-if ($verbose){ print STDERR "Parallelizing ($numnodes ways): $cmd\n\n"; }
+print STDERR "Parallelizing ($numnodes ways): $cmd\n\n";
# create -e dir and save .sh
use File::Temp qw/tempdir/;
@@ -210,15 +215,15 @@ my $netstat=&listening_port_lines;
if ($verbose){ print STDERR "Testing port $port...";}
while ($netstat=~/$port/ || &listening_port_lines=~/$port/){
- if ($verbose){ print STDERR "port is busy\n";}
- $port++;
- if ($port > $endp){
- die "Unable to find open port\n";
- }
- if ($verbose){ print STDERR "Testing port $port... "; }
+ if ($verbose){ print STDERR "port is busy\n";}
+ $port++;
+ if ($port > $endp){
+ die "Unable to find open port\n";
+ }
+ if ($verbose){ print STDERR "Testing port $port... "; }
}
if ($verbose){
- print STDERR "port $port is available\n";
+ print STDERR "port $port is available\n";
}
my $key = int(rand()*1000000);
@@ -228,151 +233,184 @@ if ($multiline){ $multiflag = "-m"; print STDERR "expecting multiline output.\n"
my $stay_alive_flag = "";
if ($stay_alive){ $stay_alive_flag = "--stay-alive"; print STDERR "staying alive while no clients are connected.\n"; }
-my %node_count;
+my $node_count = 0;
my $script = "";
# fork == one thread runs the sentserver, while the
# other spawns the sentclient commands.
-if (my $pid = fork){
- sleep 4; # give other thread time to start sentserver
- $script =
- qq{wait
+if (my $pid = fork) {
+ sleep 4; # give other thread time to start sentserver
+ $script =
+ qq{wait
$cdcmd$sentclient $host:$port:$key $cmd
};
- if ($verbose){
- print STDERR "Client script:\n====\n";
- print STDERR $script;
- print STDERR "====\n";
- }
- for (my $jobn=0; $jobn<$numnodes; $jobn++){
- launch_job_on_node(1);
- }
- if ($recycle_clients) {
- my $ret;
- my $livejobs;
- while (1) {
- $ret = waitpid($pid, WNOHANG);
- #print STDERR "waitpid $pid ret = $ret \n";
- last if ($ret != 0);
- $livejobs = numof_live_jobs();
- if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
- print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
- launch_job_on_node(1); # TODO: support named nodes
- } else {
- sleep 15;
- }
- }
- }
- waitpid($pid, 0);
- cleanup();
+ if ($verbose){
+ print STDERR "Client script:\n====\n";
+ print STDERR $script;
+ print STDERR "====\n";
+ }
+ for (my $jobn=0; $jobn<$numnodes; $jobn++){
+ launch_job();
+ }
+ if ($recycle_clients) {
+ my $ret;
+ my $livejobs;
+ while (1) {
+ $ret = waitpid($pid, WNOHANG);
+ #print STDERR "waitpid $pid ret = $ret \n";
+ last if ($ret != 0);
+ $livejobs = numof_live_jobs();
+ if ($numnodes >= $livejobs ) { # a client terminated, OR # lines of input was less than -j
+ print STDERR "num of requested nodes = $numnodes; num of currently live jobs = $livejobs; Client terminated - launching another.\n";
+ launch_job();
+ } else {
+ sleep 15;
+ }
+ }
+ }
+ waitpid($pid, 0);
+ cleanup();
} else {
-# my $todo = "$sentserver -k $key $multiflag $port ";
- my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
- if ($verbose){ print STDERR "Running: $todo\n"; }
- my $rc = system($todo);
- if ($rc){
- die "Error: sentserver returned code $rc\n";
- }
+# my $todo = "$sentserver -k $key $multiflag $port ";
+ my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
+ if ($verbose){ print STDERR "Running: $todo\n"; }
+ my $rc = system($todo);
+ if ($rc){
+ die "Error: sentserver returned code $rc\n";
+ }
}
sub numof_live_jobs {
- my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
- return ($#livejobs + 1);
+ if ($use_fork) {
+ die "not implemented";
+ } else {
+ my @livejobs = grep(/$joblist/, split(/\n/, `qstat`));
+ return ($#livejobs + 1);
+ }
}
my (@errors,@outs,@cmds);
-sub launch_job_on_node {
- my $node = $_[0];
-
- my $errorfile = "/dev/null";
- my $outfile = "/dev/null";
- unless (exists($node_count{$node})){
- $node_count{$node} = 0;
- }
- my $node_count = $node_count{$node};
- $node_count{$node}++;
- my $clientname = $executable;
- $clientname =~ s/^(.{4}).*$/$1/;
- $clientname = "$clientname.$node.$node_count";
- if ($errordir){
- $errorfile = "$errordir/$clientname.ER";
- $outfile = "$errordir/$clientname.OU";
- push @errors,$errorfile;
- push @outs,$outfile;
- }
- my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile";
- push @cmds,$todo;
-
- if ($verbose){ print STDERR "Running: $todo\n"; }
- local(*QOUT, *QIN);
- open2(\*QOUT, \*QIN, $todo);
- print QIN $script;
- close QIN;
- while (my $jobid=<QOUT>){
- chomp $jobid;
- if ($verbose){ print STDERR "Launched client job: $jobid"; }
- $jobid =~ s/^(\d+)(.*?)$/\1/g;
+
+sub launch_job {
+ if ($use_fork) { return launch_job_fork(); }
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ my $todo = "qsub -l mem_free=$pmem -N $clientname -o $outfile -e $errorfile";
+ push @cmds,$todo;
+
+ print STDERR "Running: $todo\n";
+ local(*QOUT, *QIN);
+ open2(\*QOUT, \*QIN, $todo) or die "Failed to open2: $!";
+ print QIN $script;
+ close QIN;
+ while (my $jobid=<QOUT>){
+ chomp $jobid;
+ if ($verbose){ print STDERR "Launched client job: $jobid"; }
+ $jobid =~ s/^(\d+)(.*?)$/\1/g;
$jobid =~ s/^Your job (\d+) .*$/\1/;
- print STDERR " short job id $jobid\n";
+ print STDERR " short job id $jobid\n";
if ($verbose){
print STDERR "cd: $abscwd\n";
print STDERR "cmd: $cmd\n";
}
- if ($joblist == "") { $joblist = $jobid; }
- else {$joblist = $joblist . "\|" . $jobid; }
+ if ($joblist == "") { $joblist = $jobid; }
+ else {$joblist = $joblist . "\|" . $jobid; }
my $cleanfn="`qdel $jobid 2> /dev/null`";
- push(@cleanup_cmds, $cleanfn);
- }
- close QOUT;
+ push(@cleanup_cmds, $cleanfn);
+ }
+ close QOUT;
}
+sub launch_job_fork {
+ my $errorfile = "/dev/null";
+ my $outfile = "/dev/null";
+ $node_count++;
+ my $clientname = $executable;
+ $clientname =~ s/^(.{4}).*$/$1/;
+ $clientname = "$clientname.$node_count";
+ if ($errordir){
+ $errorfile = "$errordir/$clientname.ER";
+ $outfile = "$errordir/$clientname.OU";
+ push @errors,$errorfile;
+ push @outs,$outfile;
+ }
+ if (my $pid = fork) {
+ my ($fh, $scr_name) = get_temp_script();
+ print $fh $script;
+ close $fh;
+ my $todo = "/bin/sh $scr_name 1> $outfile 2> $errorfile";
+ print STDERR "EXEC: $todo\n";
+ my $out = `$todo`;
+ print STDERR "RES: $out\n";
+ unlink $scr_name or warn "Failed to remove $scr_name";
+ exit 0;
+ }
+}
+
+sub get_temp_script {
+ my ($fh, $filename) = tempfile( "workXXXX", SUFFIX => '.sh');
+ return ($fh, $filename);
+}
sub cleanup_and_die {
- cleanup();
- die "\n";
+ cleanup();
+ die "\n";
}
sub cleanup {
- if ($verbose){ print STDERR "Cleaning up...\n"; }
- for $cmd (@cleanup_cmds){
- if ($verbose){ print STDERR " $cmd\n"; }
- eval $cmd;
- }
- print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
- print STDERR "errors:\n",preview_files(\@errors,1),"\n";
- print STDERR "cmd:\n",$cmd,"\n";
- print STDERR " cat $errordir/*.ER\nfor logs.\n";
- if ($verbose){ print STDERR "Cleanup finished.\n"; }
+ print STDERR "Cleaning up...\n";
+ for $cmd (@cleanup_cmds){
+ print STDERR " Cleanup command: $cmd\n";
+ eval $cmd;
+ }
+ print STDERR "outputs:\n",preview_files(\@outs,1),"\n";
+ print STDERR "errors:\n",preview_files(\@errors,1),"\n";
+ print STDERR "cmd:\n",$cmd,"\n";
+ print STDERR " cat $errordir/*.ER\nfor logs.\n";
+ print STDERR "Cleanup finished.\n";
}
sub print_help
{
- my $name = `basename $0`; chomp $name;
- print << "Help";
+ my $name = `basename $0`; chomp $name;
+ print << "Help";
usage: $name [options]
- Automatic black-box parallelization of commands.
+ Automatic black-box parallelization of commands.
options:
- -e, --error-dir <dir>
- Retain output files from jobs in <dir>, rather
- than silently deleting them.
+ --fork
+ Instead of using qsub, use fork.
+
+ -e, --error-dir <dir>
+ Retain output files from jobs in <dir>, rather
+ than silently deleting them.
- -m, --multi-line
- Expect that command may produce multiple output
- lines for a single input line. $name makes a
- reasonable attempt to obtain all output before
- processing additional inputs. However, use of this
- option is inherently unsafe.
+ -m, --multi-line
+ Expect that command may produce multiple output
+ lines for a single input line. $name makes a
+ reasonable attempt to obtain all output before
+ processing additional inputs. However, use of this
+ option is inherently unsafe.
- -v, --verbose
- Print diagnostic informatoin on stderr.
+ -v, --verbose
+ Print diagnostic informatoin on stderr.
- -j, --jobs
+ -j, --jobs
Number of jobs to use.
- -p, --pmem
- pmem setting for each job.
+ -p, --pmem
+ pmem setting for each job.
Help
}