From a5a91210fe32fa49a6efb6dfc71774ae03fc87db Mon Sep 17 00:00:00 2001 From: Guest_account Guest_account prguest11 Date: Thu, 20 Oct 2011 22:08:35 +0100 Subject: crazy bugs in parallelize --- vest/parallelize.pl | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/vest/parallelize.pl b/vest/parallelize.pl index b4783f91..869f430b 100755 --- a/vest/parallelize.pl +++ b/vest/parallelize.pl @@ -240,12 +240,11 @@ my $node_count = 0; my $script = ""; # fork == one thread runs the sentserver, while the # other spawns the sentclient commands. -if (my $pid = fork) { +my $pid = fork; +if ($pid == 0) { # child sleep 8; # give other thread time to start sentserver - $script = - qq{wait -$cdcmd$sentclient $host:$port:$key $cmd -}; + $script = "$cdcmd$sentclient $host:$port:$key $cmd"; + if ($verbose){ print STDERR "Client script:\n====\n"; print STDERR $script; @@ -270,13 +269,18 @@ $cdcmd$sentclient $host:$port:$key $cmd } } } - waitpid($pid, 0); - cleanup(); + print STDERR "CHILD PROCESSES SPAWNED ... WAITING\n"; + for my $p (@pids) { + waitpid($p, 0); + } } else { # my $todo = "$sentserver -k $key $multiflag $port "; my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag "; if ($verbose){ print STDERR "Running: $todo\n"; } check_call($todo); + print STDERR "Call to $sentserver returned.\n"; + cleanup(); + exit(0); } sub numof_live_jobs { @@ -343,16 +347,18 @@ sub launch_job_fork { push @errors,$errorfile; push @outs,$outfile; } - if (my $pid = fork) { + my $pid = fork; + if ($pid == 0) { my ($fh, $scr_name) = get_temp_script(); print $fh $script; close $fh; my $todo = "/bin/bash -xeo pipefail $scr_name 1> $outfile 2> $errorfile"; print STDERR "EXEC: $todo\n"; my $out = check_output("$todo"); - print STDERR "RES: $out\n"; unlink $scr_name or warn "Failed to remove $scr_name"; exit 0; + } else { + push @pids, $pid; } } -- cgit v1.2.3 From 230ef6e8f417bf3f7e573a712c79388721e70188 Mon Sep 17 00:00:00 2001 From: Guest_account Guest_account prguest11 Date: Fri, 21 Oct 2011 15:24:32 +0100 Subject: more particle filter stuff --- gi/pf/pf.h | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ gi/pf/pfdist.cc | 18 ++---------- gi/pf/pfnaive.cc | 24 ++++++---------- 3 files changed, 96 insertions(+), 30 deletions(-) create mode 100644 gi/pf/pf.h diff --git a/gi/pf/pf.h b/gi/pf/pf.h new file mode 100644 index 00000000..ede7cda8 --- /dev/null +++ b/gi/pf/pf.h @@ -0,0 +1,84 @@ +#ifndef _PF_H_ +#define _PF_H_ + +#include +#include +#include "sampler.h" +#include "prob.h" + +template +struct ParticleRenormalizer { + void operator()(std::vector* pv) const { + if (pv->empty()) return; + prob_t z = prob_t::Zero(); + for (unsigned i = 0; i < pv->size(); ++i) + z += (*pv)[i].weight; + assert(z > prob_t::Zero()); + for (unsigned i = 0; i < pv->size(); ++i) + (*pv)[i].weight /= z; + } +}; + +template +struct MultinomialResampleFilter { + explicit MultinomialResampleFilter(MT19937* rng) : rng_(rng) {} + + void operator()(std::vector* pv) { + if (pv->empty()) return; + std::vector& ps = *pv; + SampleSet ss; + for (int i = 0; i < ps.size(); ++i) + ss.add(ps[i].weight); + std::vector nps; nps.reserve(ps.size()); + const prob_t uniform_weight(1.0 / ps.size()); + for (int i = 0; i < ps.size(); ++i) { + nps.push_back(ps[rng_->SelectSample(ss)]); + nps[i].weight = uniform_weight; + } + nps.swap(ps); + } + + private: + MT19937* rng_; +}; + +template +struct SystematicResampleFilter { + explicit SystematicResampleFilter(MT19937* rng) : rng_(rng), renorm_() {} + + void operator()(std::vector* pv) { + if (pv->empty()) return; + renorm_(pv); + std::vector& ps = *pv; + std::vector nps; nps.reserve(ps.size()); + double lower = 0, upper = 0; + const double skip = 1.0 / ps.size(); + double u_j = rng_->next() * skip; + //std::cerr << "u_0: " << u_j << std::endl; + int j = 0; + for (unsigned i = 0; i < ps.size(); ++i) { + upper += ps[i].weight.as_float(); + //std::cerr << "lower: " << lower << " upper: " << upper << std::endl; + // how many children does ps[i] have? + while (u_j < lower) { u_j += skip; ++j; } + while (u_j >= lower && u_j <= upper) { + assert(j < ps.size()); + nps.push_back(ps[i]); + u_j += skip; + //std::cerr << " add u_j=" << u_j << std::endl; + ++j; + } + lower = upper; + } + //std::cerr << ps.size() << " " << nps.size() << "\n"; + assert(ps.size() == nps.size()); + //exit(1); + ps.swap(nps); + } + + private: + MT19937* rng_; + ParticleRenormalizer renorm_; +}; + +#endif diff --git a/gi/pf/pfdist.cc b/gi/pf/pfdist.cc index 81abd61b..aae5f798 100644 --- a/gi/pf/pfdist.cc +++ b/gi/pf/pfdist.cc @@ -6,6 +6,7 @@ #include #include +#include "pf.h" #include "base_measures.h" #include "reachability.h" #include "viterbi.h" @@ -413,20 +414,6 @@ ostream& operator<<(ostream& o, const Particle& p) { return o; } -void FilterCrapParticlesAndReweight(vector* pps) { - vector& ps = *pps; - SampleSet ss; - for (int i = 0; i < ps.size(); ++i) - ss.add(ps[i].weight); - vector nps; nps.reserve(ps.size()); - const prob_t uniform_weight(1.0 / ps.size()); - for (int i = 0; i < ps.size(); ++i) { - nps.push_back(ps[prng->SelectSample(ss)]); - nps[i].weight = uniform_weight; - } - nps.swap(ps); -} - int main(int argc, char** argv) { po::variables_map conf; InitCommandLine(argc, argv, &conf); @@ -466,6 +453,7 @@ int main(int argc, char** argv) { MyJointModel m(lp0); #endif + MultinomialResampleFilter filter(&rng); cerr << "Initializing reachability limits...\n"; vector ps(corpusf.size()); vector reaches; reaches.reserve(corpusf.size()); @@ -500,7 +488,7 @@ int main(int argc, char** argv) { // all particles have now been extended a bit, we will reweight them now if (lps[0].trg_cov > 0) - FilterCrapParticlesAndReweight(&lps); + filter(&lps); // loop over all particles and extend them bool done_nothing = true; diff --git a/gi/pf/pfnaive.cc b/gi/pf/pfnaive.cc index d967958c..728ec00d 100644 --- a/gi/pf/pfnaive.cc +++ b/gi/pf/pfnaive.cc @@ -6,6 +6,7 @@ #include #include +#include "pf.h" #include "base_measures.h" #include "monotonic_pseg.h" #include "reachability.h" @@ -135,20 +136,6 @@ ostream& operator<<(ostream& o, const Particle& p) { return o; } -void FilterCrapParticlesAndReweight(vector* pps) { - vector& ps = *pps; - SampleSet ss; - for (int i = 0; i < ps.size(); ++i) - ss.add(ps[i].weight); - vector nps; nps.reserve(ps.size()); - const prob_t uniform_weight(1.0 / ps.size()); - for (int i = 0; i < ps.size(); ++i) { - nps.push_back(ps[prng->SelectSample(ss)]); - nps[i].weight = uniform_weight; - } - nps.swap(ps); -} - int main(int argc, char** argv) { po::variables_map conf; InitCommandLine(argc, argv, &conf); @@ -204,6 +191,8 @@ int main(int argc, char** argv) { cerr << "Sampling...\n"; vector tmp_p(10000); // work space SampleSet pfss; + SystematicResampleFilter filter(&rng); + // MultinomialResampleFilter filter(&rng); for (int SS=0; SS < samples; ++SS) { for (int ci = 0; ci < corpusf.size(); ++ci) { vector& src = corpusf[ci]; @@ -223,7 +212,7 @@ int main(int argc, char** argv) { // all particles have now been extended a bit, we will reweight them now if (lps[0].trg_cov > 0) - FilterCrapParticlesAndReweight(&lps); + filter(&lps); // loop over all particles and extend them bool done_nothing = true; @@ -273,6 +262,11 @@ int main(int argc, char** argv) { } } // loop over particles (pi = 0 .. particles) if (done_nothing) all_complete = true; + prob_t wv = prob_t::Zero(); + for (int pp = 0; pp < lps.size(); ++pp) + wv += lps[pp].weight; + for (int pp = 0; pp < lps.size(); ++pp) + lps[pp].weight /= wv; } pfss.clear(); for (int i = 0; i < lps.size(); ++i) -- cgit v1.2.3