diff options
Diffstat (limited to 'training/crf')
| -rw-r--r-- | training/crf/Makefile.am | 27 | ||||
| -rw-r--r-- | training/crf/cllh_observer.cc | 52 | ||||
| -rw-r--r-- | training/crf/cllh_observer.h | 26 | ||||
| -rw-r--r-- | training/crf/mpi_batch_optimize.cc | 372 | ||||
| -rw-r--r-- | training/crf/mpi_compute_cllh.cc | 134 | ||||
| -rw-r--r-- | training/crf/mpi_extract_features.cc | 151 | ||||
| -rw-r--r-- | training/crf/mpi_extract_reachable.cc | 163 | ||||
| -rw-r--r-- | training/crf/mpi_flex_optimize.cc | 386 | ||||
| -rw-r--r-- | training/crf/mpi_online_optimize.cc | 374 | 
9 files changed, 1685 insertions, 0 deletions
| diff --git a/training/crf/Makefile.am b/training/crf/Makefile.am new file mode 100644 index 00000000..d203df25 --- /dev/null +++ b/training/crf/Makefile.am @@ -0,0 +1,27 @@ +bin_PROGRAMS = \ +  mpi_batch_optimize \ +  mpi_compute_cllh \ +  mpi_extract_features \ +  mpi_extract_reachable \ +  mpi_flex_optimize \ +  mpi_online_optimize + +mpi_online_optimize_SOURCES = mpi_online_optimize.cc +mpi_online_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_flex_optimize_SOURCES = mpi_flex_optimize.cc +mpi_flex_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_extract_reachable_SOURCES = mpi_extract_reachable.cc +mpi_extract_reachable_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_extract_features_SOURCES = mpi_extract_features.cc +mpi_extract_features_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_batch_optimize_SOURCES = mpi_batch_optimize.cc cllh_observer.cc +mpi_batch_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_compute_cllh_SOURCES = mpi_compute_cllh.cc cllh_observer.cc +mpi_compute_cllh_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +AM_CPPFLAGS = -DBOOST_TEST_DYN_LINK -W -Wall -Wno-sign-compare -I$(top_srcdir)/training -I$(top_srcdir)/training/utils -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval diff --git a/training/crf/cllh_observer.cc b/training/crf/cllh_observer.cc new file mode 100644 index 00000000..4ec2fa65 --- /dev/null +++ b/training/crf/cllh_observer.cc @@ -0,0 +1,52 @@ +#include "cllh_observer.h" + +#include <cmath> +#include <cassert> + +#include "inside_outside.h" +#include "hg.h" +#include "sentence_metadata.h" + +using namespace std; + +static const double kMINUS_EPSILON = -1e-6; + +ConditionalLikelihoodObserver::~ConditionalLikelihoodObserver() {} + +void ConditionalLikelihoodObserver::NotifyDecodingStart(const SentenceMetadata&) { +  cur_obj = 0; +  state = 1; +} + +void ConditionalLikelihoodObserver::NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { +  assert(state == 1); +  state = 2; +  SparseVector<prob_t> cur_model_exp; +  const prob_t z = InsideOutside<prob_t, +                                 EdgeProb, +                                 SparseVector<prob_t>, +                                 EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); +  cur_obj = log(z); +} + +void ConditionalLikelihoodObserver::NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { +  assert(state == 2); +  state = 3; +  SparseVector<prob_t> ref_exp; +  const prob_t ref_z = InsideOutside<prob_t, +                                     EdgeProb, +                                     SparseVector<prob_t>, +                                     EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); + +  double log_ref_z = log(ref_z); + +  // rounding errors means that <0 is too strict +  if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { +    cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; +    exit(1); +  } +  assert(!std::isnan(log_ref_z)); +  acc_obj += (cur_obj - log_ref_z); +  trg_words += smeta.GetReference().size(); +} + diff --git a/training/crf/cllh_observer.h b/training/crf/cllh_observer.h new file mode 100644 index 00000000..0de47331 --- /dev/null +++ b/training/crf/cllh_observer.h @@ -0,0 +1,26 @@ +#ifndef _CLLH_OBSERVER_H_ +#define _CLLH_OBSERVER_H_ + +#include "decoder.h" + +struct ConditionalLikelihoodObserver : public DecoderObserver { + +  ConditionalLikelihoodObserver() : trg_words(), acc_obj(), cur_obj() {} +  ~ConditionalLikelihoodObserver(); + +  void Reset() { +    acc_obj = 0; +    trg_words = 0; +  } +  +  virtual void NotifyDecodingStart(const SentenceMetadata&); +  virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg); +  virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg); + +  unsigned trg_words; +  double acc_obj; +  double cur_obj; +  int state; +}; + +#endif diff --git a/training/crf/mpi_batch_optimize.cc b/training/crf/mpi_batch_optimize.cc new file mode 100644 index 00000000..2eff07e4 --- /dev/null +++ b/training/crf/mpi_batch_optimize.cc @@ -0,0 +1,372 @@ +#include <sstream> +#include <iostream> +#include <vector> +#include <cassert> +#include <cmath> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +#include <boost/shared_ptr.hpp> +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "sentence_metadata.h" +#include "cllh_observer.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "stringlib.h" +#include "optimize.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("input_weights,w",po::value<string>(),"Input feature weights file") +        ("training_data,t",po::value<string>(),"Training data") +        ("test_data,T",po::value<string>(),"(optional) test data") +        ("decoder_config,c",po::value<string>(),"Decoder configuration file") +        ("output_weights,o",po::value<string>()->default_value("-"),"Output feature weights file") +        ("optimization_method,m", po::value<string>()->default_value("lbfgs"), "Optimization method (sgd, lbfgs, rprop)") +	("correction_buffers,M", po::value<int>()->default_value(10), "Number of gradients for LBFGS to maintain in memory") +        ("gaussian_prior,p","Use a Gaussian prior on the weights") +        ("sigma_squared", po::value<double>()->default_value(1.0), "Sigma squared term for spherical Gaussian prior") +        ("means,u", po::value<string>(), "(optional) file containing the means for Gaussian prior"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("input_weights") || !(conf->count("training_data")) || !conf->count("decoder_config")) { +    cerr << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int lc = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (lc % size == rank) c->push_back(line); +    ++lc; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { +  void Reset() { +    acc_grad.clear(); +    acc_obj = 0; +    total_complete = 0; +    trg_words = 0; +  }  + +  void SetLocalGradientAndObjective(vector<double>* g, double* o) const { +    *o = acc_obj; +    for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) +      (*g)[it->first] = it->second.as_float(); +  } + +  virtual void NotifyDecodingStart(const SentenceMetadata& smeta) { +    cur_model_exp.clear(); +    cur_obj = 0; +    state = 1; +  } + +  // compute model expectations, denominator of objective +  virtual void NotifyTranslationForest(const SentenceMetadata& smeta, Hypergraph* hg) { +    assert(state == 1); +    state = 2; +    const prob_t z = InsideOutside<prob_t, +                                   EdgeProb, +                                   SparseVector<prob_t>, +                                   EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); +    cur_obj = log(z); +    cur_model_exp /= z; +  } + +  // compute "empirical" expectations, numerator of objective +  virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { +    assert(state == 2); +    state = 3; +    SparseVector<prob_t> ref_exp; +    const prob_t ref_z = InsideOutside<prob_t, +                                       EdgeProb, +                                       SparseVector<prob_t>, +                                       EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); +    ref_exp /= ref_z; + +    double log_ref_z; +#if 0 +    if (crf_uniform_empirical) { +      log_ref_z = ref_exp.dot(feature_weights); +    } else { +      log_ref_z = log(ref_z); +    } +#else +    log_ref_z = log(ref_z); +#endif + +    // rounding errors means that <0 is too strict +    if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { +      cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; +      exit(1); +    } +    assert(!std::isnan(log_ref_z)); +    ref_exp -= cur_model_exp; +    acc_grad -= ref_exp; +    acc_obj += (cur_obj - log_ref_z); +    trg_words += smeta.GetReference().size(); +  } + +  virtual void NotifyDecodingComplete(const SentenceMetadata& smeta) { +    if (state == 3) { +      ++total_complete; +    } else { +    } +  } + +  int total_complete; +  SparseVector<prob_t> cur_model_exp; +  SparseVector<prob_t> acc_grad; +  double acc_obj; +  double cur_obj; +  unsigned trg_words; +  int state; +}; + +void ReadConfig(const string& ini, vector<string>* out) { +  ReadFile rf(ini); +  istream& in = *rf.stream(); +  while(in) { +    string line; +    getline(in, line); +    if (!in) continue; +    out->push_back(line); +  } +} + +void StoreConfig(const vector<string>& cfg, istringstream* o) { +  ostringstream os; +  for (int i = 0; i < cfg.size(); ++i) { os << cfg[i] << endl; } +  o->str(os.str()); +} + +template <typename T> +struct VectorPlus : public binary_function<vector<T>, vector<T>, vector<T> >  { +  vector<T> operator()(const vector<int>& a, const vector<int>& b) const { +    assert(a.size() == b.size()); +    vector<T> v(a.size()); +    transform(a.begin(), a.end(), b.begin(), v.begin(), plus<T>());  +    return v; +  }  +};  + +int main(int argc, char** argv) { +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) return 1; + +  // load cdec.ini and set up decoder +  vector<string> cdec_ini; +  ReadConfig(conf["decoder_config"].as<string>(), &cdec_ini); +  istringstream ini; +  StoreConfig(cdec_ini, &ini); +  if (rank == 0) cerr << "Loading grammar...\n"; +  Decoder* decoder = new Decoder(&ini); +  if (decoder->GetConf()["input"].as<string>() != "-") { +    cerr << "cdec.ini must not set an input file\n"; +    return 1; +  } +  if (rank == 0) cerr << "Done loading grammar!\n"; + +  // load initial weights +  if (rank == 0) { cerr << "Loading weights...\n"; } +  vector<weight_t>& lambdas = decoder->CurrentWeightVector(); +  Weights::InitFromFile(conf["input_weights"].as<string>(), &lambdas); +  if (rank == 0) { cerr << "Done loading weights.\n"; } + +  // freeze feature set (should be optional?) +  const bool freeze_feature_set = true; +  if (freeze_feature_set) FD::Freeze(); + +  const int num_feats = FD::NumFeats(); +  if (rank == 0) cerr << "Number of features: " << num_feats << endl; +  lambdas.resize(num_feats); + +  const bool gaussian_prior = conf.count("gaussian_prior"); +  vector<weight_t> means(num_feats, 0); +  if (conf.count("means")) { +    if (!gaussian_prior) { +      cerr << "Don't use --means without --gaussian_prior!\n"; +      exit(1); +    } +    Weights::InitFromFile(conf["means"].as<string>(), &means); +  } +  boost::shared_ptr<BatchOptimizer> o; +  if (rank == 0) { +    const string omethod = conf["optimization_method"].as<string>(); +    if (omethod == "rprop") +      o.reset(new RPropOptimizer(num_feats));  // TODO add configuration +    else +      o.reset(new LBFGSOptimizer(num_feats, conf["correction_buffers"].as<int>())); +    cerr << "Optimizer: " << o->Name() << endl; +  } +  double objective = 0; +  vector<double> gradient(num_feats, 0.0); +  vector<double> rcv_grad; +  rcv_grad.clear(); +  bool converged = false; + +  vector<string> corpus, test_corpus; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); +  assert(corpus.size() > 0); +  if (conf.count("test_data")) +    ReadTrainingCorpus(conf["test_data"].as<string>(), rank, size, &test_corpus); + +  TrainingObserver observer; +  ConditionalLikelihoodObserver cllh_observer; +  while (!converged) { +    observer.Reset(); +    cllh_observer.Reset(); +#ifdef HAVE_MPI +    mpi::timer timer; +    world.barrier(); +#endif +    if (rank == 0) { +      cerr << "Starting decoding... (~" << corpus.size() << " sentences / proc)\n"; +      cerr << "  Testset size: " << test_corpus.size() << " sentences / proc)\n"; +    } +    for (int i = 0; i < corpus.size(); ++i) +      decoder->Decode(corpus[i], &observer); +    cerr << "  process " << rank << '/' << size << " done\n"; +    fill(gradient.begin(), gradient.end(), 0); +    observer.SetLocalGradientAndObjective(&gradient, &objective); + +    unsigned total_words = 0; +#ifdef HAVE_MPI +    double to = 0; +    rcv_grad.resize(num_feats, 0.0); +    mpi::reduce(world, &gradient[0], gradient.size(), &rcv_grad[0], plus<double>(), 0); +    swap(gradient, rcv_grad); +    rcv_grad.clear(); + +    reduce(world, observer.trg_words, total_words, std::plus<unsigned>(), 0); +    mpi::reduce(world, objective, to, plus<double>(), 0); +    objective = to; +#else +    total_words = observer.trg_words; +#endif +    if (rank == 0) +      cerr << "TRAINING CORPUS: ln p(f|e)=" << objective << "\t log_2 p(f|e) = " << (objective/log(2)) << "\t cond. entropy = " << (objective/log(2) / total_words) << "\t ppl = " << pow(2, (objective/log(2) / total_words)) << endl; + +    for (int i = 0; i < test_corpus.size(); ++i) +      decoder->Decode(test_corpus[i], &cllh_observer); + +    double test_objective = 0; +    unsigned test_total_words = 0; +#ifdef HAVE_MPI +    reduce(world, cllh_observer.acc_obj, test_objective, std::plus<double>(), 0); +    reduce(world, cllh_observer.trg_words, test_total_words, std::plus<unsigned>(), 0); +#else +    test_objective = cllh_observer.acc_obj; +    test_total_words = cllh_observer.trg_words; +#endif + +    if (rank == 0) {  // run optimizer only on rank=0 node +      if (test_corpus.size()) +        cerr << "    TEST CORPUS: ln p(f|e)=" << test_objective << "\t log_2 p(f|e) = " << (test_objective/log(2)) << "\t cond. entropy = " << (test_objective/log(2) / test_total_words) << "\t ppl = " << pow(2, (test_objective/log(2) / test_total_words)) << endl; +      if (gaussian_prior) { +        const double sigsq = conf["sigma_squared"].as<double>(); +        double norm = 0; +        for (int k = 1; k < lambdas.size(); ++k) { +          const double& lambda_k = lambdas[k]; +          if (lambda_k) { +            const double param = (lambda_k - means[k]); +            norm += param * param; +            gradient[k] += param / sigsq; +          } +        } +        const double reg = norm / (2.0 * sigsq); +        cerr << "REGULARIZATION TERM: " << reg << endl; +        objective += reg; +      } +      cerr << "EVALUATION #" << o->EvaluationCount() << " OBJECTIVE: " << objective << endl; +      double gnorm = 0; +      for (int i = 0; i < gradient.size(); ++i) +        gnorm += gradient[i] * gradient[i]; +      cerr << "  GNORM=" << sqrt(gnorm) << endl; +      vector<weight_t> old = lambdas; +      int c = 0; +      while (old == lambdas) { +        ++c; +        if (c > 1) { cerr << "Same lambdas, repeating optimization\n"; } +        o->Optimize(objective, gradient, &lambdas); +        assert(c < 5); +      } +      old.clear(); +      Weights::SanityCheck(lambdas); +      Weights::ShowLargestFeatures(lambdas); + +      converged = o->HasConverged(); +      if (converged) { cerr << "OPTIMIZER REPORTS CONVERGENCE!\n"; } + +      string fname = "weights.cur.gz"; +      if (converged) { fname = "weights.final.gz"; } +      ostringstream vv; +      vv << "Objective = " << objective << "  (eval count=" << o->EvaluationCount() << ")"; +      const string svv = vv.str(); +      Weights::WriteToFile(fname, lambdas, true, &svv); +    }  // rank == 0 +    int cint = converged; +#ifdef HAVE_MPI +    mpi::broadcast(world, &lambdas[0], lambdas.size(), 0); +    mpi::broadcast(world, cint, 0); +    if (rank == 0) { cerr << "  ELAPSED TIME THIS ITERATION=" << timer.elapsed() << endl; } +#endif +    converged = cint; +  } +  return 0; +} + diff --git a/training/crf/mpi_compute_cllh.cc b/training/crf/mpi_compute_cllh.cc new file mode 100644 index 00000000..066389d0 --- /dev/null +++ b/training/crf/mpi_compute_cllh.cc @@ -0,0 +1,134 @@ +#include <iostream> +#include <vector> +#include <cassert> +#include <cmath> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "cllh_observer.h" +#include "sentence_metadata.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("weights,w",po::value<string>(),"Input feature weights file") +        ("training_data,t",po::value<string>(),"Training data corpus") +        ("decoder_config,c",po::value<string>(),"Decoder configuration file"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { +    cerr << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadInstances(const string& fname, int rank, int size, vector<string>* c) { +  assert(fname != "-"); +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int lc = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (lc % size == rank) c->push_back(line); +    ++lc; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  if (size > 1) SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) +    return false; + +  // load cdec.ini and set up decoder +  ReadFile ini_rf(conf["decoder_config"].as<string>()); +  Decoder decoder(ini_rf.stream()); +  if (decoder.GetConf()["input"].as<string>() != "-") { +    cerr << "cdec.ini must not set an input file\n"; +    abort(); +  } + +  // load weights +  vector<weight_t>& weights = decoder.CurrentWeightVector(); +  if (conf.count("weights")) +    Weights::InitFromFile(conf["weights"].as<string>(), &weights); + +  vector<string> corpus; +  ReadInstances(conf["training_data"].as<string>(), rank, size, &corpus); +  assert(corpus.size() > 0); + +  if (rank == 0) +    cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + +  ConditionalLikelihoodObserver observer; +  for (int i = 0; i < corpus.size(); ++i) +    decoder.Decode(corpus[i], &observer); + +  double objective = 0; +  unsigned total_words = 0; +#ifdef HAVE_MPI +  reduce(world, observer.acc_obj, objective, std::plus<double>(), 0); +  reduce(world, observer.trg_words, total_words, std::plus<unsigned>(), 0); +#else +  objective = observer.acc_obj; +#endif + +  if (rank == 0) { +    cout << "CONDITIONAL LOG_e LIKELIHOOD: " << objective << endl; +    cout << "CONDITIONAL LOG_2 LIKELIHOOD: " << (objective/log(2)) << endl; +    cout << "         CONDITIONAL ENTROPY: " << (objective/log(2) / total_words) << endl; +    cout << "                  PERPLEXITY: " << pow(2, (objective/log(2) / total_words)) << endl; +  } + +  return 0; +} + diff --git a/training/crf/mpi_extract_features.cc b/training/crf/mpi_extract_features.cc new file mode 100644 index 00000000..6750aa15 --- /dev/null +++ b/training/crf/mpi_extract_features.cc @@ -0,0 +1,151 @@ +#include <iostream> +#include <sstream> +#include <vector> +#include <cassert> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "ff_register.h" +#include "verbose.h" +#include "filelib.h" +#include "fdict.h" +#include "decoder.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("training_data,t",po::value<string>(),"Training data corpus") +        ("decoder_config,c",po::value<string>(),"Decoder configuration file") +        ("weights,w", po::value<string>(), "(Optional) weights file; weights may affect what features are encountered in pruning configurations") +        ("output_prefix,o",po::value<string>()->default_value("features"),"Output path prefix"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { +    cerr << "Decode an input set (optionally in parallel using MPI) and write\nout the feature strings encountered.\n"; +    cerr << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int lc = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (lc % size == rank) c->push_back(line); +    ++lc; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { + +  virtual void NotifyDecodingStart(const SentenceMetadata&) { +  } + +  // compute model expectations, denominator of objective +  virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { +  } + +  // compute "empirical" expectations, numerator of objective +  virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { +  } +}; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  if (size > 1) SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) +    return false; + +  // load cdec.ini and set up decoder +  ReadFile ini_rf(conf["decoder_config"].as<string>()); +  Decoder decoder(ini_rf.stream()); +  if (decoder.GetConf()["input"].as<string>() != "-") { +    cerr << "cdec.ini must not set an input file\n"; +    abort(); +  } + +  if (FD::UsingPerfectHashFunction()) { +    cerr << "Your configuration file has enabled a cmph hash function. Please disable.\n"; +    return 1; +  } + +  // load optional weights +  if (conf.count("weights")) +    Weights::InitFromFile(conf["weights"].as<string>(), &decoder.CurrentWeightVector()); + +  vector<string> corpus; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); +  assert(corpus.size() > 0); + +  TrainingObserver observer; + +  if (rank == 0) +    cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + +  for (int i = 0; i < corpus.size(); ++i) +    decoder.Decode(corpus[i], &observer); + +  { +    ostringstream os; +    os << conf["output_prefix"].as<string>() << '.' << rank << "_of_" << size; +    WriteFile wf(os.str()); +    ostream& out = *wf.stream(); +    const unsigned num_feats = FD::NumFeats(); +    for (unsigned i = 1; i < num_feats; ++i) { +      out << FD::Convert(i) << endl; +    } +    cerr << "Wrote " << os.str() << endl; +  } + +#ifdef HAVE_MPI +  world.barrier(); +#else +#endif + +  return 0; +} + diff --git a/training/crf/mpi_extract_reachable.cc b/training/crf/mpi_extract_reachable.cc new file mode 100644 index 00000000..2a7c2b9d --- /dev/null +++ b/training/crf/mpi_extract_reachable.cc @@ -0,0 +1,163 @@ +#include <iostream> +#include <sstream> +#include <vector> +#include <cassert> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "ff_register.h" +#include "verbose.h" +#include "filelib.h" +#include "fdict.h" +#include "decoder.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("training_data,t",po::value<string>(),"Training data corpus") +        ("decoder_config,c",po::value<string>(),"Decoder configuration file") +        ("weights,w", po::value<string>(), "(Optional) weights file; weights may affect what features are encountered in pruning configurations") +        ("output_prefix,o",po::value<string>()->default_value("reachable"),"Output path prefix"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { +    cerr << "Decode an input set (optionally in parallel using MPI) and write\nout the inputs that produce reachable parallel parses.\n"; +    cerr << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int lc = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (lc % size == rank) c->push_back(line); +    ++lc; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct ReachabilityObserver : public DecoderObserver { + +  virtual void NotifyDecodingStart(const SentenceMetadata&) { +    reachable = false; +  } + +  // compute model expectations, denominator of objective +  virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { +  } + +  // compute "empirical" expectations, numerator of objective +  virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { +    reachable = true; +  } + +  bool reachable; +}; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  if (size > 1) SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) +    return false; + +  // load cdec.ini and set up decoder +  ReadFile ini_rf(conf["decoder_config"].as<string>()); +  Decoder decoder(ini_rf.stream()); +  if (decoder.GetConf()["input"].as<string>() != "-") { +    cerr << "cdec.ini must not set an input file\n"; +    abort(); +  } + +  if (FD::UsingPerfectHashFunction()) { +    cerr << "Your configuration file has enabled a cmph hash function. Please disable.\n"; +    return 1; +  } + +  // load optional weights +  if (conf.count("weights")) +    Weights::InitFromFile(conf["weights"].as<string>(), &decoder.CurrentWeightVector()); + +  vector<string> corpus; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); +  assert(corpus.size() > 0); + + +  if (rank == 0) +    cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + +  size_t num_reached = 0; +  { +    ostringstream os; +    os << conf["output_prefix"].as<string>() << '.' << rank << "_of_" << size; +    WriteFile wf(os.str()); +    ostream& out = *wf.stream(); +    ReachabilityObserver observer; +    for (int i = 0; i < corpus.size(); ++i) { +      decoder.Decode(corpus[i], &observer); +      if (observer.reachable) { +         out << corpus[i] << endl; +         ++num_reached; +      } +      corpus[i].clear(); +    } +    cerr << "Shard " << rank << '/' << size << " finished, wrote " +         << num_reached << " instances to " << os.str() << endl; +  } + +  size_t total = 0; +#ifdef HAVE_MPI +  reduce(world, num_reached, total, std::plus<double>(), 0); +#else +  total = num_reached; +#endif +  if (rank == 0) { +    cerr << "-----------------------------------------\n"; +    cerr << "TOTAL = " << total << " instances\n"; +  } +  return 0; +} + diff --git a/training/crf/mpi_flex_optimize.cc b/training/crf/mpi_flex_optimize.cc new file mode 100644 index 00000000..b52decdc --- /dev/null +++ b/training/crf/mpi_flex_optimize.cc @@ -0,0 +1,386 @@ +#include <sstream> +#include <iostream> +#include <fstream> +#include <vector> +#include <cassert> +#include <cmath> + +#include <boost/shared_ptr.hpp> +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "stringlib.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "optimize.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" +#include "sampler.h" + +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("cdec_config,c",po::value<string>(),"Decoder configuration file") +        ("weights,w",po::value<string>(),"Initial feature weights") +        ("training_data,d",po::value<string>(),"Training data") +        ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(6), "Number of training instances evaluated per processor in each minibatch") +        ("minibatch_iterations,i", po::value<unsigned>()->default_value(10), "Number of optimization iterations per minibatch") +        ("iterations,I", po::value<unsigned>()->default_value(50), "Number of passes through the training data before termination") +        ("regularization_strength,C", po::value<double>()->default_value(0.2), "Regularization strength") +        ("time_series_strength,T", po::value<double>()->default_value(0.0), "Time series regularization strength") +        ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)") +        ("lbfgs_memory_buffers,M", po::value<unsigned>()->default_value(10), "Number of memory buffers for LBFGS history"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("training_data") || !conf->count("cdec_config")) { +    cerr << "LBFGS minibatch online optimizer (MPI support " +#if HAVE_MPI +         << "enabled" +#else +         << "not enabled" +#endif +         << ")\n" << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c, vector<int>* order) { +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int id = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (id % size == rank) { +      c->push_back(line); +      order->push_back(id); +    } +    ++id; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct CopyHGsObserver : public DecoderObserver { +  Hypergraph* hg_; +  Hypergraph* gold_hg_; + +  // this can free up some memory +  void RemoveRules(Hypergraph* h) { +    for (unsigned i = 0; i < h->edges_.size(); ++i) +      h->edges_[i].rule_.reset(); +  } + +  void SetCurrentHypergraphs(Hypergraph* h, Hypergraph* gold_h) { +    hg_ = h; +    gold_hg_ = gold_h; +  } + +  virtual void NotifyDecodingStart(const SentenceMetadata&) { +    state = 1; +  } + +  // compute model expectations, denominator of objective +  virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { +    *hg_ = *hg; +    RemoveRules(hg_); +    assert(state == 1); +    state = 2; +  } + +  // compute "empirical" expectations, numerator of objective +  virtual void NotifyAlignmentForest(const SentenceMetadata&, Hypergraph* hg) { +    assert(state == 2); +    state = 3; +    *gold_hg_ = *hg; +    RemoveRules(gold_hg_); +  } + +  virtual void NotifyDecodingComplete(const SentenceMetadata&) { +    if (state == 3) { +    } else { +      hg_->clear(); +      gold_hg_->clear(); +    } +  } + +  int state; +}; + +void ReadConfig(const string& ini, istringstream* out) { +  ReadFile rf(ini); +  istream& in = *rf.stream(); +  ostringstream os; +  while(in) { +    string line; +    getline(in, line); +    if (!in) continue; +    os << line << endl; +  } +  out->str(os.str()); +} + +#ifdef HAVE_MPI +namespace boost { namespace mpi { +  template<> +  struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> >  +    : mpl::true_ { }; +} } // end namespace boost::mpi +#endif + +void AddGrad(const SparseVector<prob_t> x, double s, SparseVector<double>* acc) { +  for (SparseVector<prob_t>::const_iterator it = x.begin(); it != x.end(); ++it) +    acc->add_value(it->first, it->second.as_float() * s); +} + +double PNorm(const vector<double>& v, const double p) { +  double acc = 0; +  for (int i = 0; i < v.size(); ++i) +    acc += pow(v[i], p); +  return pow(acc, 1.0 / p); +} + +void VV(ostream&os, const vector<double>& v) { +  for (int i = 1; i < v.size(); ++i) +    if (v[i]) os << FD::Convert(i) << "=" << v[i] << " "; +} + +double ApplyRegularizationTerms(const double C, +                                const double T, +                                const vector<double>& weights, +                                const vector<double>& prev_weights, +                                double* g) { +  double reg = 0; +  for (size_t i = 0; i < weights.size(); ++i) { +    const double prev_w_i = (i < prev_weights.size() ? prev_weights[i] : 0.0); +    const double& w_i = weights[i]; +    reg += C * w_i * w_i; +    g[i] += 2 * C * w_i; + +    reg += T * (w_i - prev_w_i) * (w_i - prev_w_i); +    g[i] += 2 * T * (w_i - prev_w_i); +  } +  return reg; +} + +int main(int argc, char** argv) { +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  if (size > 1) SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); +  MT19937* rng = NULL; + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) +    return 1; + +  boost::shared_ptr<BatchOptimizer> o; +  const unsigned lbfgs_memory_buffers = conf["lbfgs_memory_buffers"].as<unsigned>(); +  const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); +  const unsigned minibatch_iterations = conf["minibatch_iterations"].as<unsigned>(); +  const double regularization_strength = conf["regularization_strength"].as<double>(); +  const double time_series_strength = conf["time_series_strength"].as<double>(); +  const bool use_time_series_reg = time_series_strength > 0.0; +  const unsigned max_iteration = conf["iterations"].as<unsigned>(); + +  vector<string> corpus; +  vector<int> ids; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus, &ids); +  assert(corpus.size() > 0); + +  if (size_per_proc > corpus.size()) { +    cerr << "Minibatch size (per processor) must be smaller or equal to the local corpus size!\n"; +    return 1; +  } + +  // initialize decoder (loads hash functions if necessary) +  istringstream ins; +  ReadConfig(conf["cdec_config"].as<string>(), &ins); +  Decoder decoder(&ins); + +  // load initial weights +  vector<weight_t> prev_weights; +  if (conf.count("weights")) +    Weights::InitFromFile(conf["weights"].as<string>(), &prev_weights); + +  if (conf.count("random_seed")) +    rng = new MT19937(conf["random_seed"].as<uint32_t>()); +  else +    rng = new MT19937; + +  size_t total_corpus_size = 0; +#ifdef HAVE_MPI +  reduce(world, corpus.size(), total_corpus_size, std::plus<size_t>(), 0); +#else +  total_corpus_size = corpus.size(); +#endif + +  if (rank == 0) +    cerr << "Total corpus size: " << total_corpus_size << endl; + +  CopyHGsObserver observer; + +  int write_weights_every_ith = 100; // TODO configure +  int titer = -1; + +  vector<weight_t>& cur_weights = decoder.CurrentWeightVector(); +  if (use_time_series_reg) { +    cur_weights = prev_weights; +  } else { +    cur_weights.swap(prev_weights); +    prev_weights.clear(); +  } + +  int iter = -1; +  bool converged = false; +  vector<double> gg; +  while (!converged) { +#ifdef HAVE_MPI +    mpi::timer timer; +#endif +    ++iter; ++titer; +    if (rank == 0) { +      converged = (iter == max_iteration); +        string fname = "weights.cur.gz"; +        if (iter % write_weights_every_ith == 0) { +          ostringstream o; o << "weights.epoch_" << iter << ".gz"; +          fname = o.str(); +        } +        if (converged) { fname = "weights.final.gz"; } +        ostringstream vv; +        vv << "total iter=" << titer << " (of current config iter=" << iter << ")  minibatch=" << size_per_proc << " sentences/proc x " << size << " procs.   num_feats=" << FD::NumFeats() << "   passes_thru_data=" << (titer * size_per_proc / static_cast<double>(corpus.size())); +        const string svv = vv.str(); +        Weights::WriteToFile(fname, cur_weights, true, &svv); +      } + +      vector<Hypergraph> hgs(size_per_proc); +      vector<Hypergraph> gold_hgs(size_per_proc); +      for (int i = 0; i < size_per_proc; ++i) { +        int ei = corpus.size() * rng->next(); +        int id = ids[ei]; +        observer.SetCurrentHypergraphs(&hgs[i], &gold_hgs[i]); +        decoder.SetId(id); +        decoder.Decode(corpus[ei], &observer); +      } + +      SparseVector<double> local_grad, g; +      double local_obj = 0; +      o.reset(); +      for (unsigned mi = 0; mi < minibatch_iterations; ++mi) { +        local_grad.clear(); +        g.clear(); +        local_obj = 0; + +        for (unsigned i = 0; i < size_per_proc; ++i) { +          Hypergraph& hg = hgs[i]; +          Hypergraph& hg_gold = gold_hgs[i]; +          if (hg.edges_.size() < 2) continue; + +          hg.Reweight(cur_weights); +          hg_gold.Reweight(cur_weights); +          SparseVector<prob_t> model_exp, gold_exp; +          const prob_t z = InsideOutside<prob_t, +                                         EdgeProb, +                                         SparseVector<prob_t>, +                                         EdgeFeaturesAndProbWeightFunction>(hg, &model_exp); +          local_obj += log(z); +          model_exp /= z; +          AddGrad(model_exp, 1.0, &local_grad); +          model_exp.clear(); + +          const prob_t goldz = InsideOutside<prob_t, +                                         EdgeProb, +                                         SparseVector<prob_t>, +                                         EdgeFeaturesAndProbWeightFunction>(hg_gold, &gold_exp); +          local_obj -= log(goldz); + +          if (log(z) - log(goldz) < kMINUS_EPSILON) { +            cerr << "DIFF. ERR! log_model_z < log_gold_z: " << log(z) << " " << log(goldz) << endl; +            return 1; +          } + +          gold_exp /= goldz; +          AddGrad(gold_exp, -1.0, &local_grad); +        } + +        double obj = 0; +#ifdef HAVE_MPI +        reduce(world, local_obj, obj, std::plus<double>(), 0); +        reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +#else +        obj = local_obj; +        g.swap(local_grad); +#endif +        local_grad.clear(); +        if (rank == 0) { +          // g /= (size_per_proc * size); +          if (!o) +            o.reset(new LBFGSOptimizer(FD::NumFeats(), lbfgs_memory_buffers)); +          gg.clear(); +          gg.resize(FD::NumFeats()); +          if (gg.size() != cur_weights.size()) { cur_weights.resize(gg.size()); } +          for (SparseVector<double>::iterator it = g.begin(); it != g.end(); ++it) +            if (it->first) { gg[it->first] = it->second; } +          g.clear(); +          double r = ApplyRegularizationTerms(regularization_strength, +                                time_series_strength, // * (iter == 0 ? 0.0 : 1.0), +                                cur_weights, +                                prev_weights, +                                &gg[0]); +          obj += r; +          if (mi == 0 || mi == (minibatch_iterations - 1)) { +            if (!mi) cerr << iter << ' '; else cerr << ' '; +            cerr << "OBJ=" << obj << " (REG=" << r << ")" << " |g|=" << PNorm(gg, 2) << " |w|=" << PNorm(cur_weights, 2);  +            if (mi > 0) cerr << endl << flush; else cerr << ' '; +          } else { cerr << '.' << flush; } +          // cerr << "w = "; VV(cerr, cur_weights); cerr << endl; +          // cerr << "g = "; VV(cerr, gg); cerr << endl; +          o->Optimize(obj, gg, &cur_weights); +        } +#ifdef HAVE_MPI +        broadcast(world, cur_weights, 0); +        broadcast(world, converged, 0); +        world.barrier(); +#endif +    } +    prev_weights = cur_weights; +  } +  return 0; +} diff --git a/training/crf/mpi_online_optimize.cc b/training/crf/mpi_online_optimize.cc new file mode 100644 index 00000000..d6968848 --- /dev/null +++ b/training/crf/mpi_online_optimize.cc @@ -0,0 +1,374 @@ +#include <sstream> +#include <iostream> +#include <fstream> +#include <vector> +#include <cassert> +#include <cmath> +#include <tr1/memory> + +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "stringlib.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "online_optimizer.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" +#include "sampler.h" + +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { +  po::options_description opts("Configuration options"); +  opts.add_options() +        ("input_weights,w",po::value<string>(),"Input feature weights file") +        ("frozen_features,z",po::value<string>(), "List of features not to optimize") +        ("training_data,t",po::value<string>(),"Training data corpus") +        ("training_agenda,a",po::value<string>(), "Text file listing a series of configuration files and the number of iterations to train using each configuration successively") +        ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(5), "Number of training instances evaluated per processor in each minibatch") +        ("optimization_method,m", po::value<string>()->default_value("sgd"), "Optimization method (sgd)") +        ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)") +        ("eta_0,e", po::value<double>()->default_value(0.2), "Initial learning rate for SGD (eta_0)") +        ("L1,1","Use L1 regularization") +        ("regularization_strength,C", po::value<double>()->default_value(1.0), "Regularization strength (C)"); +  po::options_description clo("Command line options"); +  clo.add_options() +        ("config", po::value<string>(), "Configuration file") +        ("help,h", "Print this help message and exit"); +  po::options_description dconfig_options, dcmdline_options; +  dconfig_options.add(opts); +  dcmdline_options.add(opts).add(clo); +   +  po::store(parse_command_line(argc, argv, dcmdline_options), *conf); +  if (conf->count("config")) { +    ifstream config((*conf)["config"].as<string>().c_str()); +    po::store(po::parse_config_file(config, dconfig_options), *conf); +  } +  po::notify(*conf); + +  if (conf->count("help") || !conf->count("training_data") || !conf->count("training_agenda")) { +    cerr << dcmdline_options << endl; +    return false; +  } +  return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c, vector<int>* order) { +  ReadFile rf(fname); +  istream& in = *rf.stream(); +  string line; +  int id = 0; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (id % size == rank) { +      c->push_back(line); +      order->push_back(id); +    } +    ++id; +  } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { +  void Reset() { +    acc_grad.clear(); +    acc_obj = 0; +    total_complete = 0; +  }  + +  void SetLocalGradientAndObjective(vector<double>* g, double* o) const { +    *o = acc_obj; +    for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) +      (*g)[it->first] = it->second.as_float(); +  } + +  virtual void NotifyDecodingStart(const SentenceMetadata& smeta) { +    cur_model_exp.clear(); +    cur_obj = 0; +    state = 1; +  } + +  // compute model expectations, denominator of objective +  virtual void NotifyTranslationForest(const SentenceMetadata& smeta, Hypergraph* hg) { +    assert(state == 1); +    state = 2; +    const prob_t z = InsideOutside<prob_t, +                                   EdgeProb, +                                   SparseVector<prob_t>, +                                   EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); +    cur_obj = log(z); +    cur_model_exp /= z; +  } + +  // compute "empirical" expectations, numerator of objective +  virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { +    assert(state == 2); +    state = 3; +    SparseVector<prob_t> ref_exp; +    const prob_t ref_z = InsideOutside<prob_t, +                                       EdgeProb, +                                       SparseVector<prob_t>, +                                       EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); +    ref_exp /= ref_z; + +    double log_ref_z; +#if 0 +    if (crf_uniform_empirical) { +      log_ref_z = ref_exp.dot(feature_weights); +    } else { +      log_ref_z = log(ref_z); +    } +#else +    log_ref_z = log(ref_z); +#endif + +    // rounding errors means that <0 is too strict +    if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { +      cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; +      exit(1); +    } +    assert(!std::isnan(log_ref_z)); +    ref_exp -= cur_model_exp; +    acc_grad += ref_exp; +    acc_obj += (cur_obj - log_ref_z); +  } + +  virtual void NotifyDecodingComplete(const SentenceMetadata& smeta) { +    if (state == 3) { +      ++total_complete; +    } else { +    } +  } + +  void GetGradient(SparseVector<double>* g) const { +    g->clear(); +    for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) +      g->set_value(it->first, it->second.as_float()); +  } + +  int total_complete; +  SparseVector<prob_t> cur_model_exp; +  SparseVector<prob_t> acc_grad; +  double acc_obj; +  double cur_obj; +  int state; +}; + +#ifdef HAVE_MPI +namespace boost { namespace mpi { +  template<> +  struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> >  +    : mpl::true_ { }; +} } // end namespace boost::mpi +#endif + +bool LoadAgenda(const string& file, vector<pair<string, int> >* a) { +  ReadFile rf(file); +  istream& in = *rf.stream(); +  string line; +  while(in) { +    getline(in, line); +    if (!in) break; +    if (line.empty()) continue; +    if (line[0] == '#') continue; +    int sc = 0; +    if (line.size() < 3) return false; +    for (int i = 0; i < line.size(); ++i) { if (line[i] == ' ') ++sc; } +    if (sc != 1) { cerr << "Too many spaces in line: " << line << endl; return false; } +    size_t d = line.find(" "); +    pair<string, int> x; +    x.first = line.substr(0,d); +    x.second = atoi(line.substr(d+1).c_str()); +    a->push_back(x); +    if (!FileExists(x.first)) { +      cerr << "Can't find file " << x.first << endl; +      return false; +    } +  } +  return true; +} + +int main(int argc, char** argv) { +  cerr << "THIS SOFTWARE IS DEPRECATED YOU SHOULD USE mpi_flex_optimize\n"; +#ifdef HAVE_MPI +  mpi::environment env(argc, argv); +  mpi::communicator world; +  const int size = world.size();  +  const int rank = world.rank(); +#else +  const int size = 1; +  const int rank = 0; +#endif +  if (size > 1) SetSilent(true);  // turn off verbose decoder output +  register_feature_functions(); +  std::tr1::shared_ptr<MT19937> rng; + +  po::variables_map conf; +  if (!InitCommandLine(argc, argv, &conf)) +    return 1; + +  vector<pair<string, int> > agenda; +  if (!LoadAgenda(conf["training_agenda"].as<string>(), &agenda)) +    return 1; +  if (rank == 0) +    cerr << "Loaded agenda defining " << agenda.size() << " training epochs\n"; + +  assert(agenda.size() > 0); + +  if (1) {  // hack to load the feature hash functions -- TODO this should not be in cdec.ini +    const string& cur_config = agenda[0].first; +    const unsigned max_iteration = agenda[0].second; +    ReadFile ini_rf(cur_config); +    Decoder decoder(ini_rf.stream()); +  } + +  // load initial weights +  vector<weight_t> init_weights; +  if (conf.count("input_weights")) +    Weights::InitFromFile(conf["input_weights"].as<string>(), &init_weights); + +  vector<int> frozen_fids; +  if (conf.count("frozen_features")) { +    ReadFile rf(conf["frozen_features"].as<string>()); +    istream& in = *rf.stream(); +    string line; +    while(in) { +      getline(in, line); +      if (line.empty()) continue; +      if (line[0] == ' ' || line[line.size() - 1] == ' ') { line = Trim(line); } +      frozen_fids.push_back(FD::Convert(line)); +    } +    if (rank == 0) cerr << "Freezing " << frozen_fids.size() << " features.\n"; +  } + +  vector<string> corpus; +  vector<int> ids; +  ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus, &ids); +  assert(corpus.size() > 0); + +  std::tr1::shared_ptr<OnlineOptimizer> o; +  std::tr1::shared_ptr<LearningRateSchedule> lr; + +  const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); +  if (size_per_proc > corpus.size()) { +    cerr << "Minibatch size must be smaller than corpus size!\n"; +    return 1; +  } + +  size_t total_corpus_size = 0; +#ifdef HAVE_MPI +  reduce(world, corpus.size(), total_corpus_size, std::plus<size_t>(), 0); +#else +  total_corpus_size = corpus.size(); +#endif + +  if (rank == 0) { +    cerr << "Total corpus size: " << total_corpus_size << endl; +    const unsigned batch_size = size_per_proc * size; +    // TODO config +    lr.reset(new ExponentialDecayLearningRate(batch_size, conf["eta_0"].as<double>())); + +    const string omethod = conf["optimization_method"].as<string>(); +    if (omethod == "sgd") { +      const double C = conf["regularization_strength"].as<double>(); +      o.reset(new CumulativeL1OnlineOptimizer(lr, total_corpus_size, C, frozen_fids)); +    } else { +      assert(!"fail"); +    } +  } +  if (conf.count("random_seed")) +    rng.reset(new MT19937(conf["random_seed"].as<uint32_t>())); +  else +    rng.reset(new MT19937); + +  SparseVector<double> x; +  Weights::InitSparseVector(init_weights, &x); +  TrainingObserver observer; + +  int write_weights_every_ith = 100; // TODO configure +  int titer = -1; + +  for (int ai = 0; ai < agenda.size(); ++ai) { +    const string& cur_config = agenda[ai].first; +    const unsigned max_iteration = agenda[ai].second; +    if (rank == 0) +      cerr << "STARTING TRAINING EPOCH " << (ai+1) << ". CONFIG=" << cur_config << endl; +    // load cdec.ini and set up decoder +    ReadFile ini_rf(cur_config); +    Decoder decoder(ini_rf.stream()); +    vector<weight_t>& lambdas = decoder.CurrentWeightVector(); +    if (ai == 0) { lambdas.swap(init_weights); init_weights.clear(); } + +    if (rank == 0) +      o->ResetEpoch(); // resets the learning rate-- TODO is this good? + +    int iter = -1; +    bool converged = false; +    while (!converged) { +#ifdef HAVE_MPI +      mpi::timer timer; +#endif +      x.init_vector(&lambdas); +      ++iter; ++titer; +      observer.Reset(); +      if (rank == 0) { +        converged = (iter == max_iteration); +        Weights::SanityCheck(lambdas); +        static int cc = 0; ++cc; if (cc > 1) { Weights::ShowLargestFeatures(lambdas); } +        string fname = "weights.cur.gz"; +        if (iter % write_weights_every_ith == 0) { +          ostringstream o; o << "weights.epoch_" << (ai+1) << '.' << iter << ".gz"; +          fname = o.str(); +        } +        if (converged && ((ai+1)==agenda.size())) { fname = "weights.final.gz"; } +        ostringstream vv; +        vv << "total iter=" << titer << " (of current config iter=" << iter << ")  minibatch=" << size_per_proc << " sentences/proc x " << size << " procs.   num_feats=" << x.size() << '/' << FD::NumFeats() << "   passes_thru_data=" << (titer * size_per_proc / static_cast<double>(corpus.size())) << "   eta=" << lr->eta(titer); +        const string svv = vv.str(); +        cerr << svv << endl; +        Weights::WriteToFile(fname, lambdas, true, &svv); +      } + +      for (int i = 0; i < size_per_proc; ++i) { +        int ei = corpus.size() * rng->next(); +        int id = ids[ei]; +        decoder.SetId(id); +        decoder.Decode(corpus[ei], &observer); +      } +      SparseVector<double> local_grad, g; +      observer.GetGradient(&local_grad); +#ifdef HAVE_MPI +      reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +#else +      g.swap(local_grad); +#endif +      local_grad.clear(); +      if (rank == 0) { +        g /= (size_per_proc * size); +        o->UpdateWeights(g, FD::NumFeats(), &x); +      } +#ifdef HAVE_MPI +      broadcast(world, x, 0); +      broadcast(world, converged, 0); +      world.barrier(); +      if (rank == 0) { cerr << "  ELAPSED TIME THIS ITERATION=" << timer.elapsed() << endl; } +#endif +    } +  } +  return 0; +} | 
