diff options
author | Chris Dyer <prguest11@taipan.cs> | 2012-04-20 19:39:17 +0100 |
---|---|---|
committer | Chris Dyer <prguest11@taipan.cs> | 2012-04-20 19:39:17 +0100 |
commit | dffea2338257d977b1a7b0289e787576cd3905fa (patch) | |
tree | debc87ae6867d0d41717b5d46ceb5dac8091c910 | |
parent | ef50f21a76e473223f0d7662918e5632d35faaa4 (diff) |
parallel gradient computation
-rw-r--r-- | rst_parser/mst_train.cc | 98 |
1 files changed, 74 insertions, 24 deletions
diff --git a/rst_parser/mst_train.cc b/rst_parser/mst_train.cc index e414f450..b3711aba 100644 --- a/rst_parser/mst_train.cc +++ b/rst_parser/mst_train.cc @@ -4,6 +4,10 @@ #include <iostream> #include <boost/program_options.hpp> #include <boost/program_options/variables_map.hpp> +// #define HAVE_THREAD 1 +#if HAVE_THREAD +#include <boost/thread.hpp> +#endif #include "arc_ff.h" #include "stringlib.h" @@ -24,6 +28,9 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) { ("weights,w",po::value<string>(), "Optional starting weights") ("output_every_i_iterations,I",po::value<unsigned>()->default_value(1), "Write weights every I iterations") ("regularization_strength,C",po::value<double>()->default_value(1.0), "Regularization strength") +#if HAVE_THREAD + ("threads,T",po::value<unsigned>()->default_value(1), "Number of threads") +#endif ("correction_buffers,m", po::value<int>()->default_value(10), "LBFGS correction buffers"); po::options_description clo("Command line options"); clo.add_options() @@ -67,6 +74,46 @@ double ApplyRegularizationTerms(const double C, return reg; } +struct GradientWorker { + GradientWorker(int f, + int t, + vector<double>* w, + vector<TrainingInstance>* c, + vector<ArcFactoredForest>* fs) : obj(), weights(*w), from(f), to(t), corpus(*c), forests(*fs), g(w->size()) {} + void operator()() { + int every = (to - from) / 20; + if (!every) every++; + for (int i = from; i < to; ++i) { + if ((from == 0) && (i + 1) % every == 0) cerr << '.' << flush; + const int num_words = corpus[i].ts.words.size(); + forests[i].Reweight(weights); + prob_t z; + forests[i].EdgeMarginals(&z); + obj -= log(z); + //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl; + //cerr << " ZZ = " << zz << endl; + for (int h = -1; h < num_words; ++h) { + for (int m = 0; m < num_words; ++m) { + if (h == m) continue; + const ArcFactoredForest::Edge& edge = forests[i](h,m); + const SparseVector<weight_t>& fmap = edge.features; + double prob = edge.edge_prob.as_float(); + if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; } + if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; } + AddFeatures(prob, fmap, &g); + //mfm += fmap * prob; // DE + } + } + } + } + double obj; + vector<double>& weights; + const int from, to; + vector<TrainingInstance>& corpus; + vector<ArcFactoredForest>& forests; + vector<double> g; // local gradient +}; + int main(int argc, char** argv) { int rank = 0; int size = 1; @@ -108,8 +155,13 @@ int main(int argc, char** argv) { vector<weight_t> g(FD::NumFeats(), 0.0); cerr << "features initialized\noptimizing...\n"; boost::shared_ptr<BatchOptimizer> o; - int every = corpus.size() / 20; - if (!every) ++every; +#if HAVE_THREAD + unsigned threads = conf["threads"].as<unsigned>(); + if (threads > corpus.size()) threads = corpus.size(); +#else + const unsigned threads = 1; +#endif + int chunk = corpus.size() / threads; o.reset(new LBFGSOptimizer(g.size(), conf["correction_buffers"].as<int>())); int iterations = 1000; for (int iter = 0; iter < iterations; ++iter) { @@ -118,29 +170,27 @@ int main(int argc, char** argv) { for (SparseVector<double>::const_iterator it = empirical.begin(); it != empirical.end(); ++it) g[it->first] = -it->second; double obj = -empirical.dot(weights); - // SparseVector<double> mfm; //DE - for (int i = 0; i < corpus.size(); ++i) { - if ((i + 1) % every == 0) cerr << '.' << flush; - const int num_words = corpus[i].ts.words.size(); - forests[i].Reweight(weights); - prob_t z; - forests[i].EdgeMarginals(&z); - obj -= log(z); - //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl; - //cerr << " ZZ = " << zz << endl; - for (int h = -1; h < num_words; ++h) { - for (int m = 0; m < num_words; ++m) { - if (h == m) continue; - const ArcFactoredForest::Edge& edge = forests[i](h,m); - const SparseVector<weight_t>& fmap = edge.features; - double prob = edge.edge_prob.as_float(); - if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; } - if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; } - AddFeatures(prob, fmap, &g); - //mfm += fmap * prob; // DE - } - } + vector<boost::shared_ptr<GradientWorker> > jobs; + for (int from = 0; from < corpus.size(); from += chunk) { + int to = from + chunk; + if (to > corpus.size()) to = corpus.size(); + jobs.push_back(boost::shared_ptr<GradientWorker>(new GradientWorker(from, to, &weights, &corpus, &forests))); } +#if HAVE_THREAD + boost::thread_group tg; + for (int i = 0; i < threads; ++i) + tg.create_thread(boost::ref(*jobs[i])); + tg.join_all(); +#else + (*jobs[0])(); +#endif + for (int i = 0; i < threads; ++i) { + obj += jobs[i]->obj; + vector<double>& tg = jobs[i]->g; + for (unsigned j = 0; j < g.size(); ++j) + g[j] += tg[j]; + } + // SparseVector<double> mfm; //DE //cerr << endl << "E: " << empirical << endl; // DE //cerr << "M: " << mfm << endl; // DE double r = ApplyRegularizationTerms(conf["regularization_strength"].as<double>(), weights, &g); |