diff options
Diffstat (limited to 'training/mpi_online_optimize.cc')
-rw-r--r-- | training/mpi_online_optimize.cc | 164 |
1 files changed, 82 insertions, 82 deletions
diff --git a/training/mpi_online_optimize.cc b/training/mpi_online_optimize.cc index 62821aa3..6f5988a4 100644 --- a/training/mpi_online_optimize.cc +++ b/training/mpi_online_optimize.cc @@ -6,6 +6,7 @@ #include <cmath> #include <mpi.h> +#include <boost/mpi.hpp> #include <boost/shared_ptr.hpp> #include <boost/program_options.hpp> #include <boost/program_options/variables_map.hpp> @@ -66,10 +67,11 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) { ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(5), "Number of training instances evaluated per processor in each minibatch") ("freeze_feature_set,Z", "The feature set specified in the initial weights file is frozen throughout the duration of training") ("optimization_method,m", po::value<string>()->default_value("sgd"), "Optimization method (sgd)") + ("fully_random,r", "Fully random draws from the training corpus") + ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)") ("eta_0,e", po::value<double>()->default_value(0.2), "Initial learning rate for SGD (eta_0)") ("L1,1","Use L1 regularization") - ("gaussian_prior,g","Use a Gaussian prior on the weights") - ("sigma_squared", po::value<double>()->default_value(1.0), "Sigma squared term for spherical Gaussian prior"); + ("regularization_strength,C", po::value<double>()->default_value(1.0), "Regularization strength (C)"); po::options_description clo("Command line options"); clo.add_options() ("config", po::value<string>(), "Configuration file") @@ -165,7 +167,7 @@ struct TrainingObserver : public DecoderObserver { } assert(!isnan(log_ref_z)); ref_exp -= cur_model_exp; - acc_grad -= ref_exp; + acc_grad += ref_exp; acc_obj += (cur_obj - log_ref_z); } @@ -176,6 +178,12 @@ struct TrainingObserver : public DecoderObserver { } } + void GetGradient(SparseVector<double>* g) const { + g->clear(); + for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) + g->set_value(it->first, it->second); + } + int total_complete; SparseVector<prob_t> cur_model_exp; SparseVector<prob_t> acc_grad; @@ -193,10 +201,20 @@ inline void Shuffle(vector<T>* c, MT19937* rng) { } } +namespace mpi = boost::mpi; + +namespace boost { namespace mpi { + template<> + struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> > + : mpl::true_ { }; +} } // end namespace boost::mpi + + int main(int argc, char** argv) { - MPI::Init(argc, argv); - const int size = MPI::COMM_WORLD.Get_size(); - const int rank = MPI::COMM_WORLD.Get_rank(); + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); SetSilent(true); // turn off verbose decoder output cerr << "MPI: I am " << rank << '/' << size << endl; register_feature_functions(); @@ -219,7 +237,7 @@ int main(int argc, char** argv) { Decoder decoder(ini_rf.stream()); if (decoder.GetConf()["input"].as<string>() != "-") { cerr << "cdec.ini must not set an input file\n"; - MPI::COMM_WORLD.Abort(1); + abort(); } vector<string> corpus; @@ -228,105 +246,87 @@ int main(int argc, char** argv) { std::tr1::shared_ptr<OnlineOptimizer> o; std::tr1::shared_ptr<LearningRateSchedule> lr; - vector<int> order; + vector<int> order(corpus.size()); + + const bool fully_random = conf.count("fully_random"); + const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); + const unsigned batch_size = size_per_proc * size; if (rank == 0) { + cerr << "Corpus: " << corpus.size() << " batch size: " << batch_size << endl; + if (batch_size > corpus.size()) { + cerr << " Reduce minibatch_size_per_proc!"; + abort(); + } + // TODO config - lr.reset(new ExponentialDecayLearningRate(corpus.size(), conf["eta_0"].as<double>())); + lr.reset(new ExponentialDecayLearningRate(batch_size, conf["eta_0"].as<double>())); const string omethod = conf["optimization_method"].as<string>(); if (omethod == "sgd") { - const double C = 1.0; + const double C = conf["regularization_strength"].as<double>(); o.reset(new CumulativeL1OnlineOptimizer(lr, corpus.size(), C)); } else { assert(!"fail"); } - // randomize corpus - rng = new MT19937; - order.resize(corpus.size()); for (unsigned i = 0; i < order.size(); ++i) order[i]=i; - Shuffle(&order, rng); + // randomize corpus + if (conf.count("random_seed")) + rng = new MT19937(conf["random_seed"].as<uint32_t>()); + else + rng = new MT19937; } + SparseVector<double> x; + int miter = corpus.size(); // hack to cause initial broadcast of order info + TrainingObserver observer; double objective = 0; - vector<double> lambdas; - weights.InitVector(&lambdas); bool converged = false; - const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); - for (int i = 0; i < size_per_proc; ++i) - cerr << "i=" << i << ": " << order[i] << endl; - abort(); - TrainingObserver observer; + + int iter = -1; + vector<double> lambdas; while (!converged) { + weights.InitFromVector(x); + weights.InitVector(&lambdas); + ++miter; ++iter; observer.Reset(); - if (rank == 0) { - cerr << "Starting decoding... (~" << corpus.size() << " sentences / proc)\n"; - } decoder.SetWeights(lambdas); -#if 0 - for (int i = 0; i < corpus.size(); ++i) - decoder.Decode(corpus[i], &observer); - - fill(gradient.begin(), gradient.end(), 0); - fill(rcv_grad.begin(), rcv_grad.end(), 0); - observer.SetLocalGradientAndObjective(&gradient, &objective); - - double to = 0; - MPI::COMM_WORLD.Reduce(const_cast<double*>(&gradient.data()[0]), &rcv_grad[0], num_feats, MPI::DOUBLE, MPI::SUM, 0); - MPI::COMM_WORLD.Reduce(&objective, &to, 1, MPI::DOUBLE, MPI::SUM, 0); - swap(gradient, rcv_grad); - objective = to; - - if (rank == 0) { // run optimizer only on rank=0 node - if (gaussian_prior) { - const double sigsq = conf["sigma_squared"].as<double>(); - double norm = 0; - for (int k = 1; k < lambdas.size(); ++k) { - const double& lambda_k = lambdas[k]; - if (lambda_k) { - const double param = (lambda_k - means[k]); - norm += param * param; - gradient[k] += param / sigsq; - } - } - const double reg = norm / (2.0 * sigsq); - cerr << "REGULARIZATION TERM: " << reg << endl; - objective += reg; - } - cerr << "EVALUATION #" << o->EvaluationCount() << " OBJECTIVE: " << objective << endl; - double gnorm = 0; - for (int i = 0; i < gradient.size(); ++i) - gnorm += gradient[i] * gradient[i]; - cerr << " GNORM=" << sqrt(gnorm) << endl; - vector<double> old = lambdas; - int c = 0; - while (old == lambdas) { - ++c; - if (c > 1) { cerr << "Same lambdas, repeating optimization\n"; } - o->Optimize(objective, gradient, &lambdas); - assert(c < 5); - } - old.clear(); + if (rank == 0) { SanityCheck(lambdas); ShowLargestFeatures(lambdas); - weights.InitFromVector(lambdas); - - converged = o->HasConverged(); - if (converged) { cerr << "OPTIMIZER REPORTS CONVERGENCE!\n"; } - string fname = "weights.cur.gz"; if (converged) { fname = "weights.final.gz"; } ostringstream vv; - vv << "Objective = " << objective << " (eval count=" << o->EvaluationCount() << ")"; + vv << "Objective = " << objective; // << " (eval count=" << o->EvaluationCount() << ")"; const string svv = vv.str(); weights.WriteToFile(fname, true, &svv); - } // rank == 0 - int cint = converged; - MPI::COMM_WORLD.Bcast(const_cast<double*>(&lambdas.data()[0]), num_feats, MPI::DOUBLE, 0); - MPI::COMM_WORLD.Bcast(&cint, 1, MPI::INT, 0); - MPI::COMM_WORLD.Barrier(); - converged = cint; -#endif + } + + if (fully_random || size * size_per_proc * miter > corpus.size()) { + if (rank == 0) + Shuffle(&order, rng); + miter = 0; + broadcast(world, order, 0); + } + if (rank == 0) + cerr << "Starting decoding. minibatch=" << size_per_proc << " sentences/proc x " << size << " procs. num_feats=" << x.size() << " training data proc. = " << (iter * batch_size / static_cast<double>(corpus.size())) << " eta=" << lr->eta(iter) << endl; + + const int beg = size * miter * size_per_proc + rank * size_per_proc; + const int end = beg + size_per_proc; + for (int i = beg; i < end; ++i) { + int ex_num = order[i % order.size()]; + if (rank ==0 && size < 3) cerr << rank << ": ex_num=" << ex_num << endl; + decoder.SetId(ex_num); + decoder.Decode(corpus[ex_num], &observer); + } + SparseVector<double> local_grad, g; + observer.GetGradient(&local_grad); + reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); + if (rank == 0) { + g /= batch_size; + o->UpdateWeights(g, FD::NumFeats(), &x); + } + broadcast(world, x, 0); + world.barrier(); } - MPI::Finalize(); return 0; } |