summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gi/pf/pf.h84
-rw-r--r--gi/pf/pfdist.cc18
-rw-r--r--gi/pf/pfnaive.cc24
-rwxr-xr-xvest/parallelize.pl24
4 files changed, 111 insertions, 39 deletions
diff --git a/gi/pf/pf.h b/gi/pf/pf.h
new file mode 100644
index 00000000..ede7cda8
--- /dev/null
+++ b/gi/pf/pf.h
@@ -0,0 +1,84 @@
+#ifndef _PF_H_
+#define _PF_H_
+
+#include <cassert>
+#include <vector>
+#include "sampler.h"
+#include "prob.h"
+
+template <typename ParticleType>
+struct ParticleRenormalizer {
+ void operator()(std::vector<ParticleType>* pv) const {
+ if (pv->empty()) return;
+ prob_t z = prob_t::Zero();
+ for (unsigned i = 0; i < pv->size(); ++i)
+ z += (*pv)[i].weight;
+ assert(z > prob_t::Zero());
+ for (unsigned i = 0; i < pv->size(); ++i)
+ (*pv)[i].weight /= z;
+ }
+};
+
+template <typename ParticleType>
+struct MultinomialResampleFilter {
+ explicit MultinomialResampleFilter(MT19937* rng) : rng_(rng) {}
+
+ void operator()(std::vector<ParticleType>* pv) {
+ if (pv->empty()) return;
+ std::vector<ParticleType>& ps = *pv;
+ SampleSet<prob_t> ss;
+ for (int i = 0; i < ps.size(); ++i)
+ ss.add(ps[i].weight);
+ std::vector<ParticleType> nps; nps.reserve(ps.size());
+ const prob_t uniform_weight(1.0 / ps.size());
+ for (int i = 0; i < ps.size(); ++i) {
+ nps.push_back(ps[rng_->SelectSample(ss)]);
+ nps[i].weight = uniform_weight;
+ }
+ nps.swap(ps);
+ }
+
+ private:
+ MT19937* rng_;
+};
+
+template <typename ParticleType>
+struct SystematicResampleFilter {
+ explicit SystematicResampleFilter(MT19937* rng) : rng_(rng), renorm_() {}
+
+ void operator()(std::vector<ParticleType>* pv) {
+ if (pv->empty()) return;
+ renorm_(pv);
+ std::vector<ParticleType>& ps = *pv;
+ std::vector<ParticleType> nps; nps.reserve(ps.size());
+ double lower = 0, upper = 0;
+ const double skip = 1.0 / ps.size();
+ double u_j = rng_->next() * skip;
+ //std::cerr << "u_0: " << u_j << std::endl;
+ int j = 0;
+ for (unsigned i = 0; i < ps.size(); ++i) {
+ upper += ps[i].weight.as_float();
+ //std::cerr << "lower: " << lower << " upper: " << upper << std::endl;
+ // how many children does ps[i] have?
+ while (u_j < lower) { u_j += skip; ++j; }
+ while (u_j >= lower && u_j <= upper) {
+ assert(j < ps.size());
+ nps.push_back(ps[i]);
+ u_j += skip;
+ //std::cerr << " add u_j=" << u_j << std::endl;
+ ++j;
+ }
+ lower = upper;
+ }
+ //std::cerr << ps.size() << " " << nps.size() << "\n";
+ assert(ps.size() == nps.size());
+ //exit(1);
+ ps.swap(nps);
+ }
+
+ private:
+ MT19937* rng_;
+ ParticleRenormalizer<ParticleType> renorm_;
+};
+
+#endif
diff --git a/gi/pf/pfdist.cc b/gi/pf/pfdist.cc
index 81abd61b..aae5f798 100644
--- a/gi/pf/pfdist.cc
+++ b/gi/pf/pfdist.cc
@@ -6,6 +6,7 @@
#include <boost/program_options.hpp>
#include <boost/program_options/variables_map.hpp>
+#include "pf.h"
#include "base_measures.h"
#include "reachability.h"
#include "viterbi.h"
@@ -413,20 +414,6 @@ ostream& operator<<(ostream& o, const Particle& p) {
return o;
}
-void FilterCrapParticlesAndReweight(vector<Particle>* pps) {
- vector<Particle>& ps = *pps;
- SampleSet<prob_t> ss;
- for (int i = 0; i < ps.size(); ++i)
- ss.add(ps[i].weight);
- vector<Particle> nps; nps.reserve(ps.size());
- const prob_t uniform_weight(1.0 / ps.size());
- for (int i = 0; i < ps.size(); ++i) {
- nps.push_back(ps[prng->SelectSample(ss)]);
- nps[i].weight = uniform_weight;
- }
- nps.swap(ps);
-}
-
int main(int argc, char** argv) {
po::variables_map conf;
InitCommandLine(argc, argv, &conf);
@@ -466,6 +453,7 @@ int main(int argc, char** argv) {
MyJointModel m(lp0);
#endif
+ MultinomialResampleFilter<Particle> filter(&rng);
cerr << "Initializing reachability limits...\n";
vector<Particle> ps(corpusf.size());
vector<Reachability> reaches; reaches.reserve(corpusf.size());
@@ -500,7 +488,7 @@ int main(int argc, char** argv) {
// all particles have now been extended a bit, we will reweight them now
if (lps[0].trg_cov > 0)
- FilterCrapParticlesAndReweight(&lps);
+ filter(&lps);
// loop over all particles and extend them
bool done_nothing = true;
diff --git a/gi/pf/pfnaive.cc b/gi/pf/pfnaive.cc
index d967958c..728ec00d 100644
--- a/gi/pf/pfnaive.cc
+++ b/gi/pf/pfnaive.cc
@@ -6,6 +6,7 @@
#include <boost/program_options.hpp>
#include <boost/program_options/variables_map.hpp>
+#include "pf.h"
#include "base_measures.h"
#include "monotonic_pseg.h"
#include "reachability.h"
@@ -135,20 +136,6 @@ ostream& operator<<(ostream& o, const Particle& p) {
return o;
}
-void FilterCrapParticlesAndReweight(vector<Particle>* pps) {
- vector<Particle>& ps = *pps;
- SampleSet<prob_t> ss;
- for (int i = 0; i < ps.size(); ++i)
- ss.add(ps[i].weight);
- vector<Particle> nps; nps.reserve(ps.size());
- const prob_t uniform_weight(1.0 / ps.size());
- for (int i = 0; i < ps.size(); ++i) {
- nps.push_back(ps[prng->SelectSample(ss)]);
- nps[i].weight = uniform_weight;
- }
- nps.swap(ps);
-}
-
int main(int argc, char** argv) {
po::variables_map conf;
InitCommandLine(argc, argv, &conf);
@@ -204,6 +191,8 @@ int main(int argc, char** argv) {
cerr << "Sampling...\n";
vector<Particle> tmp_p(10000); // work space
SampleSet<prob_t> pfss;
+ SystematicResampleFilter<Particle> filter(&rng);
+ // MultinomialResampleFilter<Particle> filter(&rng);
for (int SS=0; SS < samples; ++SS) {
for (int ci = 0; ci < corpusf.size(); ++ci) {
vector<int>& src = corpusf[ci];
@@ -223,7 +212,7 @@ int main(int argc, char** argv) {
// all particles have now been extended a bit, we will reweight them now
if (lps[0].trg_cov > 0)
- FilterCrapParticlesAndReweight(&lps);
+ filter(&lps);
// loop over all particles and extend them
bool done_nothing = true;
@@ -273,6 +262,11 @@ int main(int argc, char** argv) {
}
} // loop over particles (pi = 0 .. particles)
if (done_nothing) all_complete = true;
+ prob_t wv = prob_t::Zero();
+ for (int pp = 0; pp < lps.size(); ++pp)
+ wv += lps[pp].weight;
+ for (int pp = 0; pp < lps.size(); ++pp)
+ lps[pp].weight /= wv;
}
pfss.clear();
for (int i = 0; i < lps.size(); ++i)
diff --git a/vest/parallelize.pl b/vest/parallelize.pl
index b4783f91..869f430b 100755
--- a/vest/parallelize.pl
+++ b/vest/parallelize.pl
@@ -240,12 +240,11 @@ my $node_count = 0;
my $script = "";
# fork == one thread runs the sentserver, while the
# other spawns the sentclient commands.
-if (my $pid = fork) {
+my $pid = fork;
+if ($pid == 0) { # child
sleep 8; # give other thread time to start sentserver
- $script =
- qq{wait
-$cdcmd$sentclient $host:$port:$key $cmd
-};
+ $script = "$cdcmd$sentclient $host:$port:$key $cmd";
+
if ($verbose){
print STDERR "Client script:\n====\n";
print STDERR $script;
@@ -270,13 +269,18 @@ $cdcmd$sentclient $host:$port:$key $cmd
}
}
}
- waitpid($pid, 0);
- cleanup();
+ print STDERR "CHILD PROCESSES SPAWNED ... WAITING\n";
+ for my $p (@pids) {
+ waitpid($p, 0);
+ }
} else {
# my $todo = "$sentserver -k $key $multiflag $port ";
my $todo = "$sentserver -k $key $multiflag $port $stay_alive_flag ";
if ($verbose){ print STDERR "Running: $todo\n"; }
check_call($todo);
+ print STDERR "Call to $sentserver returned.\n";
+ cleanup();
+ exit(0);
}
sub numof_live_jobs {
@@ -343,16 +347,18 @@ sub launch_job_fork {
push @errors,$errorfile;
push @outs,$outfile;
}
- if (my $pid = fork) {
+ my $pid = fork;
+ if ($pid == 0) {
my ($fh, $scr_name) = get_temp_script();
print $fh $script;
close $fh;
my $todo = "/bin/bash -xeo pipefail $scr_name 1> $outfile 2> $errorfile";
print STDERR "EXEC: $todo\n";
my $out = check_output("$todo");
- print STDERR "RES: $out\n";
unlink $scr_name or warn "Failed to remove $scr_name";
exit 0;
+ } else {
+ push @pids, $pid;
}
}