diff options
| author | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-10-15 20:13:01 +0000 | 
|---|---|---|
| committer | redpony <redpony@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-10-15 20:13:01 +0000 | 
| commit | 68a3c1423c4c602a27b0211cf6b0c217135548d3 (patch) | |
| tree | f9d9514fa0da81e27ce342130b4547cb3b2bd740 /training | |
| parent | bb70b3e2fc8c0eca56bbed8132a69cf50ad819bc (diff) | |
new multi-epoch online optimizer
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@675 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'training')
| -rw-r--r-- | training/mpi_batch_optimize.cc | 78 | ||||
| -rw-r--r-- | training/mpi_online_optimize.cc | 209 | ||||
| -rw-r--r-- | training/online_optimizer.cc | 2 | ||||
| -rw-r--r-- | training/online_optimizer.h | 3 | 
4 files changed, 185 insertions, 107 deletions
| diff --git a/training/mpi_batch_optimize.cc b/training/mpi_batch_optimize.cc index 7953513e..f1ee9fb4 100644 --- a/training/mpi_batch_optimize.cc +++ b/training/mpi_batch_optimize.cc @@ -1,6 +1,5 @@  #include <sstream>  #include <iostream> -#include <fstream>  #include <vector>  #include <cassert>  #include <cmath> @@ -61,6 +60,7 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) {          ("input_weights,w",po::value<string>(),"Input feature weights file")          ("training_data,t",po::value<string>(),"Training data")          ("decoder_config,d",po::value<string>(),"Decoder configuration file") +        ("sharded_input,s",po::value<string>(), "Corpus and grammar files are 'sharded' so each processor loads its own input and grammar file. Argument is the directory containing the shards.")          ("output_weights,o",po::value<string>()->default_value("-"),"Output feature weights file")          ("optimization_method,m", po::value<string>()->default_value("lbfgs"), "Optimization method (sgd, lbfgs, rprop)")  	("correction_buffers,M", po::value<int>()->default_value(10), "Number of gradients for LBFGS to maintain in memory") @@ -82,11 +82,16 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) {    }    po::notify(*conf); -  if (conf->count("help") || !conf->count("input_weights") || !conf->count("training_data") || !conf->count("decoder_config")) { +  if (conf->count("help") || !conf->count("input_weights") || !(conf->count("training_data") | conf->count("sharded_input")) || !conf->count("decoder_config")) {      cerr << dcmdline_options << endl;      MPI::Finalize();      exit(1);    } +  if (conf->count("training_data") && conf->count("sharded_input")) { +    cerr << "Cannot specify both --training_data and --sharded_input\n"; +    MPI::Finalize(); +    exit(1); +  }  }  void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { @@ -183,32 +188,79 @@ struct TrainingObserver : public DecoderObserver {    int state;  }; +void ReadConfig(const string& ini, vector<string>* out) { +  ReadFile rf(ini); +  istream& in = *rf.stream(); +  while(in) { +    string line; +    getline(in, line); +    if (!in) continue; +    out->push_back(line); +  } +} + +void StoreConfig(const vector<string>& cfg, istringstream* o) { +  ostringstream os; +  for (int i = 0; i < cfg.size(); ++i) { os << cfg[i] << endl; } +  o->str(os.str()); +} +  int main(int argc, char** argv) {    MPI::Init(argc, argv);    const int size = MPI::COMM_WORLD.Get_size();     const int rank = MPI::COMM_WORLD.Get_rank();    SetSilent(true);  // turn off verbose decoder output -  cerr << "MPI: I am " << rank << '/' << size << endl;    register_feature_functions();    po::variables_map conf;    InitCommandLine(argc, argv, &conf); +  string shard_dir; +  if (conf.count("sharded_input")) { +    shard_dir = conf["sharded_input"].as<string>(); +    if (!DirectoryExists(shard_dir)) { +      if (rank == 0) cerr << "Can't find shard directory: " << shard_dir << endl; +      MPI::Finalize(); +      return 1; +    } +    if (rank == 0) +      cerr << "Shard directory: " << shard_dir << endl; +  } +    // load initial weights    Weights weights; +  if (rank == 0) { cerr << "Loading weights...\n"; }    weights.InitFromFile(conf["input_weights"].as<string>()); +  if (rank == 0) { cerr << "Done loading weights.\n"; }    // freeze feature set (should be optional?)    const bool freeze_feature_set = true;    if (freeze_feature_set) FD::Freeze();    // load cdec.ini and set up decoder -  ReadFile ini_rf(conf["decoder_config"].as<string>()); -  Decoder decoder(ini_rf.stream()); -  if (decoder.GetConf()["input"].as<string>() != "-") { +  vector<string> cdec_ini; +  ReadConfig(conf["decoder_config"].as<string>(), &cdec_ini); +  if (shard_dir.size()) { +    if (rank == 0) { +      for (int i = 0; i < cdec_ini.size(); ++i) { +        if (cdec_ini[i].find("grammar=") == 0) { +          cerr << "!!! using sharded input and " << conf["decoder_config"].as<string>() << " contains a grammar specification:\n" << cdec_ini[i] << "\n  VERIFY THAT THIS IS CORRECT!\n"; +        } +      } +    } +    ostringstream g; +    g << "grammar=" << shard_dir << "/grammar." << rank << "_of_" << size << ".gz"; +    cdec_ini.push_back(g.str()); +  } +  istringstream ini; +  StoreConfig(cdec_ini, &ini); +  if (rank == 0) cerr << "Loading grammar...\n"; +  Decoder* decoder = new Decoder(&ini); +  if (decoder->GetConf()["input"].as<string>() != "-") {      cerr << "cdec.ini must not set an input file\n";      MPI::COMM_WORLD.Abort(1);    } +  if (rank == 0) cerr << "Done loading grammar!\n";    const int num_feats = FD::NumFeats();    if (rank == 0) cerr << "Number of features: " << num_feats << endl; @@ -247,8 +299,16 @@ int main(int argc, char** argv) {    vector<double> gradient(num_feats, 0.0);    vector<double> rcv_grad(num_feats, 0.0);    bool converged = false; +    vector<string> corpus; -  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); +  if (shard_dir.size()) { +    ostringstream os; os << shard_dir << "/corpus." << rank << "_of_" << size; +    ReadTrainingCorpus(os.str(), 0, 1, &corpus); +    cerr << os.str() << " has " << corpus.size() << " training examples. " << endl; +    if (corpus.size() > 500) { corpus.resize(500); cerr << "  TRUNCATING\n"; } +  } else { +    ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); +  }    assert(corpus.size() > 0);    TrainingObserver observer; @@ -257,9 +317,9 @@ int main(int argc, char** argv) {      if (rank == 0) {        cerr << "Starting decoding... (~" << corpus.size() << " sentences / proc)\n";      } -    decoder.SetWeights(lambdas); +    decoder->SetWeights(lambdas);      for (int i = 0; i < corpus.size(); ++i) -      decoder.Decode(corpus[i], &observer); +      decoder->Decode(corpus[i], &observer);      fill(gradient.begin(), gradient.end(), 0);      fill(rcv_grad.begin(), rcv_grad.end(), 0); diff --git a/training/mpi_online_optimize.cc b/training/mpi_online_optimize.cc index 509fbf15..4c08b181 100644 --- a/training/mpi_online_optimize.cc +++ b/training/mpi_online_optimize.cc @@ -6,6 +6,7 @@  #include <cmath>  #include <tr1/memory> +#include <boost/mpi/timer.hpp>  #include <boost/mpi.hpp>  #include <boost/program_options.hpp>  #include <boost/program_options/variables_map.hpp> @@ -61,13 +62,9 @@ bool InitCommandLine(int argc, char** argv, po::variables_map* conf) {    opts.add_options()          ("input_weights,w",po::value<string>(),"Input feature weights file")          ("training_data,t",po::value<string>(),"Training data corpus") -        ("decoder_config,c",po::value<string>(),"Decoder configuration file") -        ("output_weights,o",po::value<string>()->default_value("-"),"Output feature weights file") -        ("maximum_iteration,i", po::value<unsigned>(), "Maximum number of iterations") +        ("training_agenda,a",po::value<string>(), "Text file listing a series of configuration files and the number of iterations to train using each configuration successively")          ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(5), "Number of training instances evaluated per processor in each minibatch") -        ("freeze_feature_set,Z", "The feature set specified in the initial weights file is frozen throughout the duration of training")          ("optimization_method,m", po::value<string>()->default_value("sgd"), "Optimization method (sgd)") -        ("fully_random,r", "Fully random draws from the training corpus")          ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)")          ("eta_0,e", po::value<double>()->default_value(0.2), "Initial learning rate for SGD (eta_0)")          ("L1,1","Use L1 regularization") @@ -87,21 +84,26 @@ bool InitCommandLine(int argc, char** argv, po::variables_map* conf) {    }    po::notify(*conf); -  if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { +  if (conf->count("help") || !conf->count("training_data") || !conf->count("training_agenda")) {      cerr << dcmdline_options << endl;      return false;    }    return true;  } -void ReadTrainingCorpus(const string& fname, vector<string>* c) { +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c, vector<int>* order) {    ReadFile rf(fname);    istream& in = *rf.stream();    string line; +  int id = 0;    while(in) {      getline(in, line);      if (!in) break; -    c->push_back(line); +    if (id % size == rank) { +      c->push_back(line); +      order->push_back(id); +    } +    ++id;    }  } @@ -192,15 +194,6 @@ struct TrainingObserver : public DecoderObserver {    int state;  }; -template <typename T> -inline void Shuffle(vector<T>* c, MT19937* rng) { -  unsigned size = c->size(); -  for (unsigned i = size - 1; i > 0; --i) { -    const unsigned j = static_cast<unsigned>(rng->next() * i); -    swap((*c)[j], (*c)[i]); -  } -} -  namespace mpi = boost::mpi;  namespace boost { namespace mpi { @@ -209,6 +202,32 @@ namespace boost { namespace mpi {      : mpl::true_ { };  } } // end namespace boost::mpi +bool LoadAgenda(const string& file, vector<pair<string, int> >* a) { +  ReadFile rf(file); +  istream& in = *rf.stream(); +  string line; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (line.empty()) continue; +    if (line[0] == '#') continue; +    int sc = 0; +    if (line.size() < 3) return false; +    for (int i = 0; i < line.size(); ++i) { if (line[i] == ' ') ++sc; } +    if (sc != 1) { cerr << "Too many spaces in line: " << line << endl; return false; } +    size_t d = line.find(" "); +    pair<string, int> x; +    x.first = line.substr(0,d); +    x.second = atoi(line.substr(d+1).c_str()); +    a->push_back(x); +    cerr << "X: " << x.second << " - " << x.first << "'\n"; +    if (!FileExists(x.first)) { +      cerr << "Can't find file " << x.first << endl; +      return false; +    } +  } +  return true; +}  int main(int argc, char** argv) {    mpi::environment env(argc, argv); @@ -228,36 +247,22 @@ int main(int argc, char** argv) {    if (conf.count("input_weights"))      weights.InitFromFile(conf["input_weights"].as<string>()); -  // freeze feature set -  const bool freeze_feature_set = conf.count("freeze_feature_set"); -  if (freeze_feature_set) FD::Freeze(); - -  // load cdec.ini and set up decoder -  ReadFile ini_rf(conf["decoder_config"].as<string>()); -  Decoder decoder(ini_rf.stream()); -  if (decoder.GetConf()["input"].as<string>() != "-") { -    cerr << "cdec.ini must not set an input file\n"; -    abort(); -  } -    vector<string> corpus; -  ReadTrainingCorpus(conf["training_data"].as<string>(), &corpus); +  vector<int> ids; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus, &ids);    assert(corpus.size() > 0);    std::tr1::shared_ptr<OnlineOptimizer> o;    std::tr1::shared_ptr<LearningRateSchedule> lr; -  vector<int> order(corpus.size()); -  const bool fully_random = conf.count("fully_random");    const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); -  const unsigned batch_size = size_per_proc * size; -  if (rank == 0) { -    cerr << "Corpus: " << corpus.size() << "  batch size: " << batch_size << endl; -    if (batch_size > corpus.size()) { -      cerr << "  Reduce minibatch_size_per_proc!"; -      abort(); -    } +  if (size_per_proc > corpus.size()) { +    cerr << "Minibatch size must be smaller than corpus size!\n"; +    return 1; +  } +  if (rank == 0) { +    const unsigned batch_size = size_per_proc * size;      // TODO config      lr.reset(new ExponentialDecayLearningRate(batch_size, conf["eta_0"].as<double>())); @@ -268,75 +273,83 @@ int main(int argc, char** argv) {      } else {        assert(!"fail");      } - -    for (unsigned i = 0; i < order.size(); ++i) order[i]=i; -    // randomize corpus -    if (conf.count("random_seed")) -      rng.reset(new MT19937(conf["random_seed"].as<uint32_t>())); -    else -      rng.reset(new MT19937);    } +  if (conf.count("random_seed")) +    rng.reset(new MT19937(conf["random_seed"].as<uint32_t>())); +  else +    rng.reset(new MT19937); +    SparseVector<double> x;    weights.InitSparseVector(&x); -  int miter = corpus.size();  // hack to cause initial broadcast of order info    TrainingObserver observer; -  double objective = 0; -  bool converged = false;    int write_weights_every_ith = 100; // TODO configure -  int iter = -1; +  int titer = -1; + +  vector<pair<string, int> > agenda; +  if (!LoadAgenda(conf["training_agenda"].as<string>(), &agenda)) +    return 1; +  if (rank == 0) +    cerr << "Loaded agenda defining " << agenda.size() << " training epochs\n"; +    vector<double> lambdas; -  while (!converged) { -    weights.InitFromVector(x); -    weights.InitVector(&lambdas); -    ++miter; ++iter; -    observer.Reset(); -    decoder.SetWeights(lambdas); -    if (rank == 0) { -      if (conf.count("maximum_iteration")) { -        if (iter == conf["maximum_iteration"].as<unsigned>()) -          converged = true; -      } -      SanityCheck(lambdas); -      ShowLargestFeatures(lambdas); -      string fname = "weights.cur.gz"; -      if (converged) { fname = "weights.final.gz"; } -      if (iter % write_weights_every_ith == 0) { -        ostringstream o; o << "weights." << iter << ".gz"; -        fname = o.str(); +  for (int ai = 0; ai < agenda.size(); ++ai) { +    const string& cur_config = agenda[ai].first; +    const unsigned max_iteration = agenda[ai].second; +    if (rank == 0) +      cerr << "STARTING TRAINING EPOCH " << (ai+1) << ". CONFIG=" << cur_config << endl; +    // load cdec.ini and set up decoder +    ReadFile ini_rf(cur_config); +    Decoder decoder(ini_rf.stream()); + +    o->ResetEpoch(); // resets the learning rate-- TODO is this good? + +    int iter = -1; +    bool converged = false; +    while (!converged) { +      mpi::timer timer; +      weights.InitFromVector(x); +      weights.InitVector(&lambdas); +      ++iter; ++titer; +      observer.Reset(); +      decoder.SetWeights(lambdas); +      if (rank == 0) { +        converged = (iter == max_iteration); +        SanityCheck(lambdas); +        ShowLargestFeatures(lambdas); +        string fname = "weights.cur.gz"; +        if (iter % write_weights_every_ith == 0) { +          ostringstream o; o << "weights.epoch_" << (ai+1) << '.' << iter << ".gz"; +          fname = o.str(); +        } +        if (converged && ((ai+1)==agenda.size())) { fname = "weights.final.gz"; } +        ostringstream vv; +        vv << "total iter=" << titer << " (of current config iter=" << iter << ")  minibatch=" << size_per_proc << " sentences/proc x " << size << " procs.   num_feats=" << x.size() << '/' << FD::NumFeats() << "   passes_thru_data=" << (titer * size * size_per_proc / static_cast<double>(corpus.size())) << "   eta=" << lr->eta(titer); +        const string svv = vv.str(); +        cerr << svv << endl; +        weights.WriteToFile(fname, true, &svv);        } -      ostringstream vv; -      vv << "Objective = " << objective; // << "  (eval count=" << o->EvaluationCount() << ")"; -      const string svv = vv.str(); -      weights.WriteToFile(fname, true, &svv); -    } -    if (fully_random || size * size_per_proc * miter > corpus.size()) { -      if (rank == 0) -        Shuffle(&order, rng.get()); -      miter = 0; -      broadcast(world, order, 0); -    } -    if (rank == 0) -      cerr << "iter=" << iter << "   minibatch=" << size_per_proc << " sentences/proc x " << size << " procs.   num_feats=" << x.size() << '/' << FD::NumFeats() << "   passes_thru_data=" << (iter * batch_size / static_cast<double>(corpus.size())) << "   eta=" << lr->eta(iter) << endl; - -    const int beg = size * miter * size_per_proc + rank * size_per_proc; -    const int end = beg + size_per_proc; -    for (int i = beg; i < end; ++i) { -      int ex_num = order[i % order.size()]; -      if (rank ==0 && size < 3) cerr << rank << ": ex_num=" << ex_num << endl; -      decoder.SetId(ex_num); -      decoder.Decode(corpus[ex_num], &observer); -    } -    SparseVector<double> local_grad, g; -    observer.GetGradient(&local_grad); -    reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); -    if (rank == 0) { -      g /= batch_size; -      o->UpdateWeights(g, FD::NumFeats(), &x); +      for (int i = 0; i < size_per_proc; ++i) { +        int ei = corpus.size() * rng->next(); +        int id = ids[ei]; +        decoder.SetId(id); +        decoder.Decode(corpus[ei], &observer); +      } +      SparseVector<double> local_grad, g; +      observer.GetGradient(&local_grad); +      reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +      local_grad.clear(); +      if (rank == 0) { +        g /= (size_per_proc * size); +        o->UpdateWeights(g, FD::NumFeats(), &x); +        cerr << "XX: " << x << endl; +      } +      broadcast(world, x, 0); +      broadcast(world, converged, 0); +      world.barrier(); +      if (rank == 0) { cerr << "  ELAPSED TIME THIS ITERATION=" << timer.elapsed() << endl; }      } -    broadcast(world, x, 0); -    world.barrier();    }    return 0;  } diff --git a/training/online_optimizer.cc b/training/online_optimizer.cc index db55c95e..3ed95452 100644 --- a/training/online_optimizer.cc +++ b/training/online_optimizer.cc @@ -12,3 +12,5 @@ double ExponentialDecayLearningRate::eta(int k) const {  OnlineOptimizer::~OnlineOptimizer() {} +void OnlineOptimizer::ResetEpochImpl() {} + diff --git a/training/online_optimizer.h b/training/online_optimizer.h index 963c0380..312aabae 100644 --- a/training/online_optimizer.h +++ b/training/online_optimizer.h @@ -58,6 +58,7 @@ class OnlineOptimizer {    OnlineOptimizer(const std::tr1::shared_ptr<LearningRateSchedule>& s,                    size_t batch_size)      : N_(batch_size),schedule_(s),k_() {} +  void ResetEpoch() { k_ = 0; ResetEpochImpl(); }    void UpdateWeights(const SparseVector<double>& approx_g, int max_feat, SparseVector<double>* weights) {      ++k_;      const double eta = schedule_->eta(k_); @@ -65,6 +66,7 @@ class OnlineOptimizer {    }   protected: +  virtual void ResetEpochImpl();    virtual void UpdateWeightsImpl(const double& eta, const SparseVector<double>& approx_g, int max_feat, SparseVector<double>* weights) = 0;    const size_t N_; // number of training instances per batch @@ -80,6 +82,7 @@ class CumulativeL1OnlineOptimizer : public OnlineOptimizer {      OnlineOptimizer(s, training_instances), C_(C), u_() {}   protected: +  void ResetEpochImpl() { u_ = 0; }    void UpdateWeightsImpl(const double& eta, const SparseVector<double>& approx_g, int max_feat, SparseVector<double>* weights) {      u_ += eta * C_ / N_;      (*weights) += eta * approx_g; | 
