From 1e206220aa506ac0e8eabcfe0cbd0ab851dee262 Mon Sep 17 00:00:00 2001 From: Chris Dyer Date: Fri, 20 Apr 2012 19:39:17 +0100 Subject: parallel gradient computation --- rst_parser/mst_train.cc | 98 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 74 insertions(+), 24 deletions(-) (limited to 'rst_parser') diff --git a/rst_parser/mst_train.cc b/rst_parser/mst_train.cc index e414f450..b3711aba 100644 --- a/rst_parser/mst_train.cc +++ b/rst_parser/mst_train.cc @@ -4,6 +4,10 @@ #include #include #include +// #define HAVE_THREAD 1 +#if HAVE_THREAD +#include +#endif #include "arc_ff.h" #include "stringlib.h" @@ -24,6 +28,9 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) { ("weights,w",po::value(), "Optional starting weights") ("output_every_i_iterations,I",po::value()->default_value(1), "Write weights every I iterations") ("regularization_strength,C",po::value()->default_value(1.0), "Regularization strength") +#if HAVE_THREAD + ("threads,T",po::value()->default_value(1), "Number of threads") +#endif ("correction_buffers,m", po::value()->default_value(10), "LBFGS correction buffers"); po::options_description clo("Command line options"); clo.add_options() @@ -67,6 +74,46 @@ double ApplyRegularizationTerms(const double C, return reg; } +struct GradientWorker { + GradientWorker(int f, + int t, + vector* w, + vector* c, + vector* fs) : obj(), weights(*w), from(f), to(t), corpus(*c), forests(*fs), g(w->size()) {} + void operator()() { + int every = (to - from) / 20; + if (!every) every++; + for (int i = from; i < to; ++i) { + if ((from == 0) && (i + 1) % every == 0) cerr << '.' << flush; + const int num_words = corpus[i].ts.words.size(); + forests[i].Reweight(weights); + prob_t z; + forests[i].EdgeMarginals(&z); + obj -= log(z); + //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl; + //cerr << " ZZ = " << zz << endl; + for (int h = -1; h < num_words; ++h) { + for (int m = 0; m < num_words; ++m) { + if (h == m) continue; + const ArcFactoredForest::Edge& edge = forests[i](h,m); + const SparseVector& fmap = edge.features; + double prob = edge.edge_prob.as_float(); + if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; } + if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; } + AddFeatures(prob, fmap, &g); + //mfm += fmap * prob; // DE + } + } + } + } + double obj; + vector& weights; + const int from, to; + vector& corpus; + vector& forests; + vector g; // local gradient +}; + int main(int argc, char** argv) { int rank = 0; int size = 1; @@ -108,8 +155,13 @@ int main(int argc, char** argv) { vector g(FD::NumFeats(), 0.0); cerr << "features initialized\noptimizing...\n"; boost::shared_ptr o; - int every = corpus.size() / 20; - if (!every) ++every; +#if HAVE_THREAD + unsigned threads = conf["threads"].as(); + if (threads > corpus.size()) threads = corpus.size(); +#else + const unsigned threads = 1; +#endif + int chunk = corpus.size() / threads; o.reset(new LBFGSOptimizer(g.size(), conf["correction_buffers"].as())); int iterations = 1000; for (int iter = 0; iter < iterations; ++iter) { @@ -118,29 +170,27 @@ int main(int argc, char** argv) { for (SparseVector::const_iterator it = empirical.begin(); it != empirical.end(); ++it) g[it->first] = -it->second; double obj = -empirical.dot(weights); - // SparseVector mfm; //DE - for (int i = 0; i < corpus.size(); ++i) { - if ((i + 1) % every == 0) cerr << '.' << flush; - const int num_words = corpus[i].ts.words.size(); - forests[i].Reweight(weights); - prob_t z; - forests[i].EdgeMarginals(&z); - obj -= log(z); - //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl; - //cerr << " ZZ = " << zz << endl; - for (int h = -1; h < num_words; ++h) { - for (int m = 0; m < num_words; ++m) { - if (h == m) continue; - const ArcFactoredForest::Edge& edge = forests[i](h,m); - const SparseVector& fmap = edge.features; - double prob = edge.edge_prob.as_float(); - if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; } - if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; } - AddFeatures(prob, fmap, &g); - //mfm += fmap * prob; // DE - } - } + vector > jobs; + for (int from = 0; from < corpus.size(); from += chunk) { + int to = from + chunk; + if (to > corpus.size()) to = corpus.size(); + jobs.push_back(boost::shared_ptr(new GradientWorker(from, to, &weights, &corpus, &forests))); } +#if HAVE_THREAD + boost::thread_group tg; + for (int i = 0; i < threads; ++i) + tg.create_thread(boost::ref(*jobs[i])); + tg.join_all(); +#else + (*jobs[0])(); +#endif + for (int i = 0; i < threads; ++i) { + obj += jobs[i]->obj; + vector& tg = jobs[i]->g; + for (unsigned j = 0; j < g.size(); ++j) + g[j] += tg[j]; + } + // SparseVector mfm; //DE //cerr << endl << "E: " << empirical << endl; // DE //cerr << "M: " << mfm << endl; // DE double r = ApplyRegularizationTerms(conf["regularization_strength"].as(), weights, &g); -- cgit v1.2.3