summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Dyer <prguest11@taipan.cs>2012-04-20 19:39:17 +0100
committerChris Dyer <prguest11@taipan.cs>2012-04-20 19:39:17 +0100
commit1e206220aa506ac0e8eabcfe0cbd0ab851dee262 (patch)
tree9708851d8d3d951770c6e1d953d8f911d6d22987
parent4dfe96758c5190d2550434d3a0857853c4ef2612 (diff)
parallel gradient computation
-rw-r--r--rst_parser/mst_train.cc98
1 files changed, 74 insertions, 24 deletions
diff --git a/rst_parser/mst_train.cc b/rst_parser/mst_train.cc
index e414f450..b3711aba 100644
--- a/rst_parser/mst_train.cc
+++ b/rst_parser/mst_train.cc
@@ -4,6 +4,10 @@
#include <iostream>
#include <boost/program_options.hpp>
#include <boost/program_options/variables_map.hpp>
+// #define HAVE_THREAD 1
+#if HAVE_THREAD
+#include <boost/thread.hpp>
+#endif
#include "arc_ff.h"
#include "stringlib.h"
@@ -24,6 +28,9 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) {
("weights,w",po::value<string>(), "Optional starting weights")
("output_every_i_iterations,I",po::value<unsigned>()->default_value(1), "Write weights every I iterations")
("regularization_strength,C",po::value<double>()->default_value(1.0), "Regularization strength")
+#if HAVE_THREAD
+ ("threads,T",po::value<unsigned>()->default_value(1), "Number of threads")
+#endif
("correction_buffers,m", po::value<int>()->default_value(10), "LBFGS correction buffers");
po::options_description clo("Command line options");
clo.add_options()
@@ -67,6 +74,46 @@ double ApplyRegularizationTerms(const double C,
return reg;
}
+struct GradientWorker {
+ GradientWorker(int f,
+ int t,
+ vector<double>* w,
+ vector<TrainingInstance>* c,
+ vector<ArcFactoredForest>* fs) : obj(), weights(*w), from(f), to(t), corpus(*c), forests(*fs), g(w->size()) {}
+ void operator()() {
+ int every = (to - from) / 20;
+ if (!every) every++;
+ for (int i = from; i < to; ++i) {
+ if ((from == 0) && (i + 1) % every == 0) cerr << '.' << flush;
+ const int num_words = corpus[i].ts.words.size();
+ forests[i].Reweight(weights);
+ prob_t z;
+ forests[i].EdgeMarginals(&z);
+ obj -= log(z);
+ //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl;
+ //cerr << " ZZ = " << zz << endl;
+ for (int h = -1; h < num_words; ++h) {
+ for (int m = 0; m < num_words; ++m) {
+ if (h == m) continue;
+ const ArcFactoredForest::Edge& edge = forests[i](h,m);
+ const SparseVector<weight_t>& fmap = edge.features;
+ double prob = edge.edge_prob.as_float();
+ if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; }
+ if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; }
+ AddFeatures(prob, fmap, &g);
+ //mfm += fmap * prob; // DE
+ }
+ }
+ }
+ }
+ double obj;
+ vector<double>& weights;
+ const int from, to;
+ vector<TrainingInstance>& corpus;
+ vector<ArcFactoredForest>& forests;
+ vector<double> g; // local gradient
+};
+
int main(int argc, char** argv) {
int rank = 0;
int size = 1;
@@ -108,8 +155,13 @@ int main(int argc, char** argv) {
vector<weight_t> g(FD::NumFeats(), 0.0);
cerr << "features initialized\noptimizing...\n";
boost::shared_ptr<BatchOptimizer> o;
- int every = corpus.size() / 20;
- if (!every) ++every;
+#if HAVE_THREAD
+ unsigned threads = conf["threads"].as<unsigned>();
+ if (threads > corpus.size()) threads = corpus.size();
+#else
+ const unsigned threads = 1;
+#endif
+ int chunk = corpus.size() / threads;
o.reset(new LBFGSOptimizer(g.size(), conf["correction_buffers"].as<int>()));
int iterations = 1000;
for (int iter = 0; iter < iterations; ++iter) {
@@ -118,29 +170,27 @@ int main(int argc, char** argv) {
for (SparseVector<double>::const_iterator it = empirical.begin(); it != empirical.end(); ++it)
g[it->first] = -it->second;
double obj = -empirical.dot(weights);
- // SparseVector<double> mfm; //DE
- for (int i = 0; i < corpus.size(); ++i) {
- if ((i + 1) % every == 0) cerr << '.' << flush;
- const int num_words = corpus[i].ts.words.size();
- forests[i].Reweight(weights);
- prob_t z;
- forests[i].EdgeMarginals(&z);
- obj -= log(z);
- //cerr << " O = " << (-corpus[i].features.dot(weights)) << " D=" << -lz << " OO= " << (-corpus[i].features.dot(weights) - lz) << endl;
- //cerr << " ZZ = " << zz << endl;
- for (int h = -1; h < num_words; ++h) {
- for (int m = 0; m < num_words; ++m) {
- if (h == m) continue;
- const ArcFactoredForest::Edge& edge = forests[i](h,m);
- const SparseVector<weight_t>& fmap = edge.features;
- double prob = edge.edge_prob.as_float();
- if (prob < -0.000001) { cerr << "Prob < 0: " << prob << endl; prob = 0; }
- if (prob > 1.000001) { cerr << "Prob > 1: " << prob << endl; prob = 1; }
- AddFeatures(prob, fmap, &g);
- //mfm += fmap * prob; // DE
- }
- }
+ vector<boost::shared_ptr<GradientWorker> > jobs;
+ for (int from = 0; from < corpus.size(); from += chunk) {
+ int to = from + chunk;
+ if (to > corpus.size()) to = corpus.size();
+ jobs.push_back(boost::shared_ptr<GradientWorker>(new GradientWorker(from, to, &weights, &corpus, &forests)));
}
+#if HAVE_THREAD
+ boost::thread_group tg;
+ for (int i = 0; i < threads; ++i)
+ tg.create_thread(boost::ref(*jobs[i]));
+ tg.join_all();
+#else
+ (*jobs[0])();
+#endif
+ for (int i = 0; i < threads; ++i) {
+ obj += jobs[i]->obj;
+ vector<double>& tg = jobs[i]->g;
+ for (unsigned j = 0; j < g.size(); ++j)
+ g[j] += tg[j];
+ }
+ // SparseVector<double> mfm; //DE
//cerr << endl << "E: " << empirical << endl; // DE
//cerr << "M: " << mfm << endl; // DE
double r = ApplyRegularizationTerms(conf["regularization_strength"].as<double>(), weights, &g);