diff options
| author | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-09-28 17:06:08 +0000 | 
|---|---|---|
| committer | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-09-28 17:06:08 +0000 | 
| commit | 3db6b004ae1e2319f52862d428c20be5a1538993 (patch) | |
| tree | b35c697027afd92324d8d9a63c8e6b27c32d2339 /training/mpi_online_optimize.cc | |
| parent | 521dc2fdbf7eee7d6a86410f490ba7a76691590b (diff) | |
use boost mpi, fix L1 stochastic optimizer
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@659 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'training/mpi_online_optimize.cc')
| -rw-r--r-- | training/mpi_online_optimize.cc | 164 | 
1 files changed, 82 insertions, 82 deletions
| diff --git a/training/mpi_online_optimize.cc b/training/mpi_online_optimize.cc index 62821aa3..6f5988a4 100644 --- a/training/mpi_online_optimize.cc +++ b/training/mpi_online_optimize.cc @@ -6,6 +6,7 @@  #include <cmath>  #include <mpi.h> +#include <boost/mpi.hpp>  #include <boost/shared_ptr.hpp>  #include <boost/program_options.hpp>  #include <boost/program_options/variables_map.hpp> @@ -66,10 +67,11 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) {          ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(5), "Number of training instances evaluated per processor in each minibatch")          ("freeze_feature_set,Z", "The feature set specified in the initial weights file is frozen throughout the duration of training")          ("optimization_method,m", po::value<string>()->default_value("sgd"), "Optimization method (sgd)") +        ("fully_random,r", "Fully random draws from the training corpus") +        ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)")          ("eta_0,e", po::value<double>()->default_value(0.2), "Initial learning rate for SGD (eta_0)")          ("L1,1","Use L1 regularization") -        ("gaussian_prior,g","Use a Gaussian prior on the weights") -        ("sigma_squared", po::value<double>()->default_value(1.0), "Sigma squared term for spherical Gaussian prior"); +        ("regularization_strength,C", po::value<double>()->default_value(1.0), "Regularization strength (C)");    po::options_description clo("Command line options");    clo.add_options()          ("config", po::value<string>(), "Configuration file") @@ -165,7 +167,7 @@ struct TrainingObserver : public DecoderObserver {      }      assert(!isnan(log_ref_z));      ref_exp -= cur_model_exp; -    acc_grad -= ref_exp; +    acc_grad += ref_exp;      acc_obj += (cur_obj - log_ref_z);    } @@ -176,6 +178,12 @@ struct TrainingObserver : public DecoderObserver {      }    } +  void GetGradient(SparseVector<double>* g) const { +    g->clear(); +    for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) +      g->set_value(it->first, it->second); +  } +    int total_complete;    SparseVector<prob_t> cur_model_exp;    SparseVector<prob_t> acc_grad; @@ -193,10 +201,20 @@ inline void Shuffle(vector<T>* c, MT19937* rng) {    }  } +namespace mpi = boost::mpi; + +namespace boost { namespace mpi { +  template<> +  struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> >  +    : mpl::true_ { }; +} } // end namespace boost::mpi + +  int main(int argc, char** argv) { -  MPI::Init(argc, argv); -  const int size = MPI::COMM_WORLD.Get_size();  -  const int rank = MPI::COMM_WORLD.Get_rank(); +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank();    SetSilent(true);  // turn off verbose decoder output    cerr << "MPI: I am " << rank << '/' << size << endl;    register_feature_functions(); @@ -219,7 +237,7 @@ int main(int argc, char** argv) {    Decoder decoder(ini_rf.stream());    if (decoder.GetConf()["input"].as<string>() != "-") {      cerr << "cdec.ini must not set an input file\n"; -    MPI::COMM_WORLD.Abort(1); +    abort();    }    vector<string> corpus; @@ -228,105 +246,87 @@ int main(int argc, char** argv) {    std::tr1::shared_ptr<OnlineOptimizer> o;    std::tr1::shared_ptr<LearningRateSchedule> lr; -  vector<int> order; +  vector<int> order(corpus.size()); + +  const bool fully_random = conf.count("fully_random"); +  const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); +  const unsigned batch_size = size_per_proc * size;    if (rank == 0) { +    cerr << "Corpus: " << corpus.size() << "  batch size: " << batch_size << endl; +    if (batch_size > corpus.size()) { +      cerr << "  Reduce minibatch_size_per_proc!"; +      abort(); +    } +      // TODO config -    lr.reset(new ExponentialDecayLearningRate(corpus.size(), conf["eta_0"].as<double>())); +    lr.reset(new ExponentialDecayLearningRate(batch_size, conf["eta_0"].as<double>()));      const string omethod = conf["optimization_method"].as<string>();      if (omethod == "sgd") { -      const double C = 1.0; +      const double C = conf["regularization_strength"].as<double>();        o.reset(new CumulativeL1OnlineOptimizer(lr, corpus.size(), C));      } else {        assert(!"fail");      } -    // randomize corpus -    rng = new MT19937; -    order.resize(corpus.size());      for (unsigned i = 0; i < order.size(); ++i) order[i]=i; -    Shuffle(&order, rng); +    // randomize corpus +    if (conf.count("random_seed")) +      rng = new MT19937(conf["random_seed"].as<uint32_t>()); +    else +      rng = new MT19937;    } +  SparseVector<double> x; +  int miter = corpus.size();  // hack to cause initial broadcast of order info +  TrainingObserver observer;    double objective = 0; -  vector<double> lambdas; -  weights.InitVector(&lambdas);    bool converged = false; -  const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); -  for (int i = 0; i < size_per_proc; ++i) -    cerr << "i=" << i << ": " << order[i] << endl; -  abort(); -  TrainingObserver observer; + +  int iter = -1; +  vector<double> lambdas;    while (!converged) { +    weights.InitFromVector(x); +    weights.InitVector(&lambdas); +    ++miter; ++iter;      observer.Reset(); -    if (rank == 0) { -      cerr << "Starting decoding... (~" << corpus.size() << " sentences / proc)\n"; -    }      decoder.SetWeights(lambdas); -#if 0 -    for (int i = 0; i < corpus.size(); ++i) -      decoder.Decode(corpus[i], &observer); - -    fill(gradient.begin(), gradient.end(), 0); -    fill(rcv_grad.begin(), rcv_grad.end(), 0); -    observer.SetLocalGradientAndObjective(&gradient, &objective); - -    double to = 0; -    MPI::COMM_WORLD.Reduce(const_cast<double*>(&gradient.data()[0]), &rcv_grad[0], num_feats, MPI::DOUBLE, MPI::SUM, 0); -    MPI::COMM_WORLD.Reduce(&objective, &to, 1, MPI::DOUBLE, MPI::SUM, 0); -    swap(gradient, rcv_grad); -    objective = to; - -    if (rank == 0) {  // run optimizer only on rank=0 node -      if (gaussian_prior) { -        const double sigsq = conf["sigma_squared"].as<double>(); -        double norm = 0; -        for (int k = 1; k < lambdas.size(); ++k) { -          const double& lambda_k = lambdas[k]; -          if (lambda_k) { -            const double param = (lambda_k - means[k]); -            norm += param * param; -            gradient[k] += param / sigsq; -          } -        } -        const double reg = norm / (2.0 * sigsq); -        cerr << "REGULARIZATION TERM: " << reg << endl; -        objective += reg; -      } -      cerr << "EVALUATION #" << o->EvaluationCount() << " OBJECTIVE: " << objective << endl; -      double gnorm = 0; -      for (int i = 0; i < gradient.size(); ++i) -        gnorm += gradient[i] * gradient[i]; -      cerr << "  GNORM=" << sqrt(gnorm) << endl; -      vector<double> old = lambdas; -      int c = 0; -      while (old == lambdas) { -        ++c; -        if (c > 1) { cerr << "Same lambdas, repeating optimization\n"; } -        o->Optimize(objective, gradient, &lambdas); -        assert(c < 5); -      } -      old.clear(); +    if (rank == 0) {        SanityCheck(lambdas);        ShowLargestFeatures(lambdas); -      weights.InitFromVector(lambdas); - -      converged = o->HasConverged(); -      if (converged) { cerr << "OPTIMIZER REPORTS CONVERGENCE!\n"; } -        string fname = "weights.cur.gz";        if (converged) { fname = "weights.final.gz"; }        ostringstream vv; -      vv << "Objective = " << objective << "  (eval count=" << o->EvaluationCount() << ")"; +      vv << "Objective = " << objective; // << "  (eval count=" << o->EvaluationCount() << ")";        const string svv = vv.str();        weights.WriteToFile(fname, true, &svv); -    }  // rank == 0 -    int cint = converged; -    MPI::COMM_WORLD.Bcast(const_cast<double*>(&lambdas.data()[0]), num_feats, MPI::DOUBLE, 0); -    MPI::COMM_WORLD.Bcast(&cint, 1, MPI::INT, 0); -    MPI::COMM_WORLD.Barrier(); -    converged = cint; -#endif +    } + +    if (fully_random || size * size_per_proc * miter > corpus.size()) { +      if (rank == 0) +        Shuffle(&order, rng); +      miter = 0; +      broadcast(world, order, 0); +    } +    if (rank == 0) +      cerr << "Starting decoding. minibatch=" << size_per_proc << " sentences/proc x " << size << " procs. num_feats=" << x.size() << " training data proc. = " << (iter * batch_size / static_cast<double>(corpus.size())) << "  eta=" << lr->eta(iter) << endl; + +    const int beg = size * miter * size_per_proc + rank * size_per_proc; +    const int end = beg + size_per_proc; +    for (int i = beg; i < end; ++i) { +      int ex_num = order[i % order.size()]; +      if (rank ==0 && size < 3) cerr << rank << ": ex_num=" << ex_num << endl; +      decoder.SetId(ex_num); +      decoder.Decode(corpus[ex_num], &observer); +    } +    SparseVector<double> local_grad, g; +    observer.GetGradient(&local_grad); +    reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +    if (rank == 0) { +      g /= batch_size; +      o->UpdateWeights(g, FD::NumFeats(), &x); +    } +    broadcast(world, x, 0); +    world.barrier();    } -  MPI::Finalize();     return 0;  } | 
