diff options
author | Michael Denkowski <michael.j.denkowski@gmail.com> | 2012-12-22 16:01:23 -0500 |
---|---|---|
committer | Michael Denkowski <michael.j.denkowski@gmail.com> | 2012-12-22 16:01:23 -0500 |
commit | 597d89c11db53e91bc011eab70fd613bbe6453e8 (patch) | |
tree | 83c87c07d1ff6d3ee4e3b1626f7eddd49c61095b /training/crf | |
parent | 65e958ff2678a41c22be7171456a63f002ef370b (diff) | |
parent | 201af2acd394415a05072fbd53d42584875aa4b4 (diff) |
Merge branch 'master' of git://github.com/redpony/cdec
Diffstat (limited to 'training/crf')
-rw-r--r-- | training/crf/Makefile.am | 27 | ||||
-rw-r--r-- | training/crf/cllh_observer.cc | 52 | ||||
-rw-r--r-- | training/crf/cllh_observer.h | 26 | ||||
-rw-r--r-- | training/crf/mpi_batch_optimize.cc | 372 | ||||
-rw-r--r-- | training/crf/mpi_compute_cllh.cc | 134 | ||||
-rw-r--r-- | training/crf/mpi_extract_features.cc | 151 | ||||
-rw-r--r-- | training/crf/mpi_extract_reachable.cc | 163 | ||||
-rw-r--r-- | training/crf/mpi_flex_optimize.cc | 386 | ||||
-rw-r--r-- | training/crf/mpi_online_optimize.cc | 384 |
9 files changed, 1695 insertions, 0 deletions
diff --git a/training/crf/Makefile.am b/training/crf/Makefile.am new file mode 100644 index 00000000..d203df25 --- /dev/null +++ b/training/crf/Makefile.am @@ -0,0 +1,27 @@ +bin_PROGRAMS = \ + mpi_batch_optimize \ + mpi_compute_cllh \ + mpi_extract_features \ + mpi_extract_reachable \ + mpi_flex_optimize \ + mpi_online_optimize + +mpi_online_optimize_SOURCES = mpi_online_optimize.cc +mpi_online_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_flex_optimize_SOURCES = mpi_flex_optimize.cc +mpi_flex_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_extract_reachable_SOURCES = mpi_extract_reachable.cc +mpi_extract_reachable_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_extract_features_SOURCES = mpi_extract_features.cc +mpi_extract_features_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_batch_optimize_SOURCES = mpi_batch_optimize.cc cllh_observer.cc +mpi_batch_optimize_LDADD = $(top_srcdir)/training/utils/libtraining_utils.a $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +mpi_compute_cllh_SOURCES = mpi_compute_cllh.cc cllh_observer.cc +mpi_compute_cllh_LDADD = $(top_srcdir)/decoder/libcdec.a $(top_srcdir)/klm/search/libksearch.a $(top_srcdir)/mteval/libmteval.a $(top_srcdir)/utils/libutils.a $(top_srcdir)/klm/lm/libklm.a $(top_srcdir)/klm/util/libklm_util.a -lz + +AM_CPPFLAGS = -DBOOST_TEST_DYN_LINK -W -Wall -Wno-sign-compare -I$(top_srcdir)/training -I$(top_srcdir)/training/utils -I$(top_srcdir)/utils -I$(top_srcdir)/decoder -I$(top_srcdir)/mteval diff --git a/training/crf/cllh_observer.cc b/training/crf/cllh_observer.cc new file mode 100644 index 00000000..4ec2fa65 --- /dev/null +++ b/training/crf/cllh_observer.cc @@ -0,0 +1,52 @@ +#include "cllh_observer.h" + +#include <cmath> +#include <cassert> + +#include "inside_outside.h" +#include "hg.h" +#include "sentence_metadata.h" + +using namespace std; + +static const double kMINUS_EPSILON = -1e-6; + +ConditionalLikelihoodObserver::~ConditionalLikelihoodObserver() {} + +void ConditionalLikelihoodObserver::NotifyDecodingStart(const SentenceMetadata&) { + cur_obj = 0; + state = 1; +} + +void ConditionalLikelihoodObserver::NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { + assert(state == 1); + state = 2; + SparseVector<prob_t> cur_model_exp; + const prob_t z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); + cur_obj = log(z); +} + +void ConditionalLikelihoodObserver::NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { + assert(state == 2); + state = 3; + SparseVector<prob_t> ref_exp; + const prob_t ref_z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); + + double log_ref_z = log(ref_z); + + // rounding errors means that <0 is too strict + if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { + cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; + exit(1); + } + assert(!std::isnan(log_ref_z)); + acc_obj += (cur_obj - log_ref_z); + trg_words += smeta.GetReference().size(); +} + diff --git a/training/crf/cllh_observer.h b/training/crf/cllh_observer.h new file mode 100644 index 00000000..0de47331 --- /dev/null +++ b/training/crf/cllh_observer.h @@ -0,0 +1,26 @@ +#ifndef _CLLH_OBSERVER_H_ +#define _CLLH_OBSERVER_H_ + +#include "decoder.h" + +struct ConditionalLikelihoodObserver : public DecoderObserver { + + ConditionalLikelihoodObserver() : trg_words(), acc_obj(), cur_obj() {} + ~ConditionalLikelihoodObserver(); + + void Reset() { + acc_obj = 0; + trg_words = 0; + } + + virtual void NotifyDecodingStart(const SentenceMetadata&); + virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg); + virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg); + + unsigned trg_words; + double acc_obj; + double cur_obj; + int state; +}; + +#endif diff --git a/training/crf/mpi_batch_optimize.cc b/training/crf/mpi_batch_optimize.cc new file mode 100644 index 00000000..2eff07e4 --- /dev/null +++ b/training/crf/mpi_batch_optimize.cc @@ -0,0 +1,372 @@ +#include <sstream> +#include <iostream> +#include <vector> +#include <cassert> +#include <cmath> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +#include <boost/shared_ptr.hpp> +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "sentence_metadata.h" +#include "cllh_observer.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "stringlib.h" +#include "optimize.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("input_weights,w",po::value<string>(),"Input feature weights file") + ("training_data,t",po::value<string>(),"Training data") + ("test_data,T",po::value<string>(),"(optional) test data") + ("decoder_config,c",po::value<string>(),"Decoder configuration file") + ("output_weights,o",po::value<string>()->default_value("-"),"Output feature weights file") + ("optimization_method,m", po::value<string>()->default_value("lbfgs"), "Optimization method (sgd, lbfgs, rprop)") + ("correction_buffers,M", po::value<int>()->default_value(10), "Number of gradients for LBFGS to maintain in memory") + ("gaussian_prior,p","Use a Gaussian prior on the weights") + ("sigma_squared", po::value<double>()->default_value(1.0), "Sigma squared term for spherical Gaussian prior") + ("means,u", po::value<string>(), "(optional) file containing the means for Gaussian prior"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("input_weights") || !(conf->count("training_data")) || !conf->count("decoder_config")) { + cerr << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int lc = 0; + while(in) { + getline(in, line); + if (!in) break; + if (lc % size == rank) c->push_back(line); + ++lc; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { + void Reset() { + acc_grad.clear(); + acc_obj = 0; + total_complete = 0; + trg_words = 0; + } + + void SetLocalGradientAndObjective(vector<double>* g, double* o) const { + *o = acc_obj; + for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) + (*g)[it->first] = it->second.as_float(); + } + + virtual void NotifyDecodingStart(const SentenceMetadata& smeta) { + cur_model_exp.clear(); + cur_obj = 0; + state = 1; + } + + // compute model expectations, denominator of objective + virtual void NotifyTranslationForest(const SentenceMetadata& smeta, Hypergraph* hg) { + assert(state == 1); + state = 2; + const prob_t z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); + cur_obj = log(z); + cur_model_exp /= z; + } + + // compute "empirical" expectations, numerator of objective + virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { + assert(state == 2); + state = 3; + SparseVector<prob_t> ref_exp; + const prob_t ref_z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); + ref_exp /= ref_z; + + double log_ref_z; +#if 0 + if (crf_uniform_empirical) { + log_ref_z = ref_exp.dot(feature_weights); + } else { + log_ref_z = log(ref_z); + } +#else + log_ref_z = log(ref_z); +#endif + + // rounding errors means that <0 is too strict + if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { + cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; + exit(1); + } + assert(!std::isnan(log_ref_z)); + ref_exp -= cur_model_exp; + acc_grad -= ref_exp; + acc_obj += (cur_obj - log_ref_z); + trg_words += smeta.GetReference().size(); + } + + virtual void NotifyDecodingComplete(const SentenceMetadata& smeta) { + if (state == 3) { + ++total_complete; + } else { + } + } + + int total_complete; + SparseVector<prob_t> cur_model_exp; + SparseVector<prob_t> acc_grad; + double acc_obj; + double cur_obj; + unsigned trg_words; + int state; +}; + +void ReadConfig(const string& ini, vector<string>* out) { + ReadFile rf(ini); + istream& in = *rf.stream(); + while(in) { + string line; + getline(in, line); + if (!in) continue; + out->push_back(line); + } +} + +void StoreConfig(const vector<string>& cfg, istringstream* o) { + ostringstream os; + for (int i = 0; i < cfg.size(); ++i) { os << cfg[i] << endl; } + o->str(os.str()); +} + +template <typename T> +struct VectorPlus : public binary_function<vector<T>, vector<T>, vector<T> > { + vector<T> operator()(const vector<int>& a, const vector<int>& b) const { + assert(a.size() == b.size()); + vector<T> v(a.size()); + transform(a.begin(), a.end(), b.begin(), v.begin(), plus<T>()); + return v; + } +}; + +int main(int argc, char** argv) { +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) return 1; + + // load cdec.ini and set up decoder + vector<string> cdec_ini; + ReadConfig(conf["decoder_config"].as<string>(), &cdec_ini); + istringstream ini; + StoreConfig(cdec_ini, &ini); + if (rank == 0) cerr << "Loading grammar...\n"; + Decoder* decoder = new Decoder(&ini); + if (decoder->GetConf()["input"].as<string>() != "-") { + cerr << "cdec.ini must not set an input file\n"; + return 1; + } + if (rank == 0) cerr << "Done loading grammar!\n"; + + // load initial weights + if (rank == 0) { cerr << "Loading weights...\n"; } + vector<weight_t>& lambdas = decoder->CurrentWeightVector(); + Weights::InitFromFile(conf["input_weights"].as<string>(), &lambdas); + if (rank == 0) { cerr << "Done loading weights.\n"; } + + // freeze feature set (should be optional?) + const bool freeze_feature_set = true; + if (freeze_feature_set) FD::Freeze(); + + const int num_feats = FD::NumFeats(); + if (rank == 0) cerr << "Number of features: " << num_feats << endl; + lambdas.resize(num_feats); + + const bool gaussian_prior = conf.count("gaussian_prior"); + vector<weight_t> means(num_feats, 0); + if (conf.count("means")) { + if (!gaussian_prior) { + cerr << "Don't use --means without --gaussian_prior!\n"; + exit(1); + } + Weights::InitFromFile(conf["means"].as<string>(), &means); + } + boost::shared_ptr<BatchOptimizer> o; + if (rank == 0) { + const string omethod = conf["optimization_method"].as<string>(); + if (omethod == "rprop") + o.reset(new RPropOptimizer(num_feats)); // TODO add configuration + else + o.reset(new LBFGSOptimizer(num_feats, conf["correction_buffers"].as<int>())); + cerr << "Optimizer: " << o->Name() << endl; + } + double objective = 0; + vector<double> gradient(num_feats, 0.0); + vector<double> rcv_grad; + rcv_grad.clear(); + bool converged = false; + + vector<string> corpus, test_corpus; + ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); + assert(corpus.size() > 0); + if (conf.count("test_data")) + ReadTrainingCorpus(conf["test_data"].as<string>(), rank, size, &test_corpus); + + TrainingObserver observer; + ConditionalLikelihoodObserver cllh_observer; + while (!converged) { + observer.Reset(); + cllh_observer.Reset(); +#ifdef HAVE_MPI + mpi::timer timer; + world.barrier(); +#endif + if (rank == 0) { + cerr << "Starting decoding... (~" << corpus.size() << " sentences / proc)\n"; + cerr << " Testset size: " << test_corpus.size() << " sentences / proc)\n"; + } + for (int i = 0; i < corpus.size(); ++i) + decoder->Decode(corpus[i], &observer); + cerr << " process " << rank << '/' << size << " done\n"; + fill(gradient.begin(), gradient.end(), 0); + observer.SetLocalGradientAndObjective(&gradient, &objective); + + unsigned total_words = 0; +#ifdef HAVE_MPI + double to = 0; + rcv_grad.resize(num_feats, 0.0); + mpi::reduce(world, &gradient[0], gradient.size(), &rcv_grad[0], plus<double>(), 0); + swap(gradient, rcv_grad); + rcv_grad.clear(); + + reduce(world, observer.trg_words, total_words, std::plus<unsigned>(), 0); + mpi::reduce(world, objective, to, plus<double>(), 0); + objective = to; +#else + total_words = observer.trg_words; +#endif + if (rank == 0) + cerr << "TRAINING CORPUS: ln p(f|e)=" << objective << "\t log_2 p(f|e) = " << (objective/log(2)) << "\t cond. entropy = " << (objective/log(2) / total_words) << "\t ppl = " << pow(2, (objective/log(2) / total_words)) << endl; + + for (int i = 0; i < test_corpus.size(); ++i) + decoder->Decode(test_corpus[i], &cllh_observer); + + double test_objective = 0; + unsigned test_total_words = 0; +#ifdef HAVE_MPI + reduce(world, cllh_observer.acc_obj, test_objective, std::plus<double>(), 0); + reduce(world, cllh_observer.trg_words, test_total_words, std::plus<unsigned>(), 0); +#else + test_objective = cllh_observer.acc_obj; + test_total_words = cllh_observer.trg_words; +#endif + + if (rank == 0) { // run optimizer only on rank=0 node + if (test_corpus.size()) + cerr << " TEST CORPUS: ln p(f|e)=" << test_objective << "\t log_2 p(f|e) = " << (test_objective/log(2)) << "\t cond. entropy = " << (test_objective/log(2) / test_total_words) << "\t ppl = " << pow(2, (test_objective/log(2) / test_total_words)) << endl; + if (gaussian_prior) { + const double sigsq = conf["sigma_squared"].as<double>(); + double norm = 0; + for (int k = 1; k < lambdas.size(); ++k) { + const double& lambda_k = lambdas[k]; + if (lambda_k) { + const double param = (lambda_k - means[k]); + norm += param * param; + gradient[k] += param / sigsq; + } + } + const double reg = norm / (2.0 * sigsq); + cerr << "REGULARIZATION TERM: " << reg << endl; + objective += reg; + } + cerr << "EVALUATION #" << o->EvaluationCount() << " OBJECTIVE: " << objective << endl; + double gnorm = 0; + for (int i = 0; i < gradient.size(); ++i) + gnorm += gradient[i] * gradient[i]; + cerr << " GNORM=" << sqrt(gnorm) << endl; + vector<weight_t> old = lambdas; + int c = 0; + while (old == lambdas) { + ++c; + if (c > 1) { cerr << "Same lambdas, repeating optimization\n"; } + o->Optimize(objective, gradient, &lambdas); + assert(c < 5); + } + old.clear(); + Weights::SanityCheck(lambdas); + Weights::ShowLargestFeatures(lambdas); + + converged = o->HasConverged(); + if (converged) { cerr << "OPTIMIZER REPORTS CONVERGENCE!\n"; } + + string fname = "weights.cur.gz"; + if (converged) { fname = "weights.final.gz"; } + ostringstream vv; + vv << "Objective = " << objective << " (eval count=" << o->EvaluationCount() << ")"; + const string svv = vv.str(); + Weights::WriteToFile(fname, lambdas, true, &svv); + } // rank == 0 + int cint = converged; +#ifdef HAVE_MPI + mpi::broadcast(world, &lambdas[0], lambdas.size(), 0); + mpi::broadcast(world, cint, 0); + if (rank == 0) { cerr << " ELAPSED TIME THIS ITERATION=" << timer.elapsed() << endl; } +#endif + converged = cint; + } + return 0; +} + diff --git a/training/crf/mpi_compute_cllh.cc b/training/crf/mpi_compute_cllh.cc new file mode 100644 index 00000000..066389d0 --- /dev/null +++ b/training/crf/mpi_compute_cllh.cc @@ -0,0 +1,134 @@ +#include <iostream> +#include <vector> +#include <cassert> +#include <cmath> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "cllh_observer.h" +#include "sentence_metadata.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("weights,w",po::value<string>(),"Input feature weights file") + ("training_data,t",po::value<string>(),"Training data corpus") + ("decoder_config,c",po::value<string>(),"Decoder configuration file"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { + cerr << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadInstances(const string& fname, int rank, int size, vector<string>* c) { + assert(fname != "-"); + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int lc = 0; + while(in) { + getline(in, line); + if (!in) break; + if (lc % size == rank) c->push_back(line); + ++lc; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + if (size > 1) SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) + return false; + + // load cdec.ini and set up decoder + ReadFile ini_rf(conf["decoder_config"].as<string>()); + Decoder decoder(ini_rf.stream()); + if (decoder.GetConf()["input"].as<string>() != "-") { + cerr << "cdec.ini must not set an input file\n"; + abort(); + } + + // load weights + vector<weight_t>& weights = decoder.CurrentWeightVector(); + if (conf.count("weights")) + Weights::InitFromFile(conf["weights"].as<string>(), &weights); + + vector<string> corpus; + ReadInstances(conf["training_data"].as<string>(), rank, size, &corpus); + assert(corpus.size() > 0); + + if (rank == 0) + cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + + ConditionalLikelihoodObserver observer; + for (int i = 0; i < corpus.size(); ++i) + decoder.Decode(corpus[i], &observer); + + double objective = 0; + unsigned total_words = 0; +#ifdef HAVE_MPI + reduce(world, observer.acc_obj, objective, std::plus<double>(), 0); + reduce(world, observer.trg_words, total_words, std::plus<unsigned>(), 0); +#else + objective = observer.acc_obj; +#endif + + if (rank == 0) { + cout << "CONDITIONAL LOG_e LIKELIHOOD: " << objective << endl; + cout << "CONDITIONAL LOG_2 LIKELIHOOD: " << (objective/log(2)) << endl; + cout << " CONDITIONAL ENTROPY: " << (objective/log(2) / total_words) << endl; + cout << " PERPLEXITY: " << pow(2, (objective/log(2) / total_words)) << endl; + } + + return 0; +} + diff --git a/training/crf/mpi_extract_features.cc b/training/crf/mpi_extract_features.cc new file mode 100644 index 00000000..6750aa15 --- /dev/null +++ b/training/crf/mpi_extract_features.cc @@ -0,0 +1,151 @@ +#include <iostream> +#include <sstream> +#include <vector> +#include <cassert> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "ff_register.h" +#include "verbose.h" +#include "filelib.h" +#include "fdict.h" +#include "decoder.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("training_data,t",po::value<string>(),"Training data corpus") + ("decoder_config,c",po::value<string>(),"Decoder configuration file") + ("weights,w", po::value<string>(), "(Optional) weights file; weights may affect what features are encountered in pruning configurations") + ("output_prefix,o",po::value<string>()->default_value("features"),"Output path prefix"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { + cerr << "Decode an input set (optionally in parallel using MPI) and write\nout the feature strings encountered.\n"; + cerr << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int lc = 0; + while(in) { + getline(in, line); + if (!in) break; + if (lc % size == rank) c->push_back(line); + ++lc; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { + + virtual void NotifyDecodingStart(const SentenceMetadata&) { + } + + // compute model expectations, denominator of objective + virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { + } + + // compute "empirical" expectations, numerator of objective + virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { + } +}; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + if (size > 1) SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) + return false; + + // load cdec.ini and set up decoder + ReadFile ini_rf(conf["decoder_config"].as<string>()); + Decoder decoder(ini_rf.stream()); + if (decoder.GetConf()["input"].as<string>() != "-") { + cerr << "cdec.ini must not set an input file\n"; + abort(); + } + + if (FD::UsingPerfectHashFunction()) { + cerr << "Your configuration file has enabled a cmph hash function. Please disable.\n"; + return 1; + } + + // load optional weights + if (conf.count("weights")) + Weights::InitFromFile(conf["weights"].as<string>(), &decoder.CurrentWeightVector()); + + vector<string> corpus; + ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); + assert(corpus.size() > 0); + + TrainingObserver observer; + + if (rank == 0) + cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + + for (int i = 0; i < corpus.size(); ++i) + decoder.Decode(corpus[i], &observer); + + { + ostringstream os; + os << conf["output_prefix"].as<string>() << '.' << rank << "_of_" << size; + WriteFile wf(os.str()); + ostream& out = *wf.stream(); + const unsigned num_feats = FD::NumFeats(); + for (unsigned i = 1; i < num_feats; ++i) { + out << FD::Convert(i) << endl; + } + cerr << "Wrote " << os.str() << endl; + } + +#ifdef HAVE_MPI + world.barrier(); +#else +#endif + + return 0; +} + diff --git a/training/crf/mpi_extract_reachable.cc b/training/crf/mpi_extract_reachable.cc new file mode 100644 index 00000000..2a7c2b9d --- /dev/null +++ b/training/crf/mpi_extract_reachable.cc @@ -0,0 +1,163 @@ +#include <iostream> +#include <sstream> +#include <vector> +#include <cassert> + +#include "config.h" +#ifdef HAVE_MPI +#include <boost/mpi.hpp> +#endif +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "ff_register.h" +#include "verbose.h" +#include "filelib.h" +#include "fdict.h" +#include "decoder.h" +#include "weights.h" + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("training_data,t",po::value<string>(),"Training data corpus") + ("decoder_config,c",po::value<string>(),"Decoder configuration file") + ("weights,w", po::value<string>(), "(Optional) weights file; weights may affect what features are encountered in pruning configurations") + ("output_prefix,o",po::value<string>()->default_value("reachable"),"Output path prefix"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("training_data") || !conf->count("decoder_config")) { + cerr << "Decode an input set (optionally in parallel using MPI) and write\nout the inputs that produce reachable parallel parses.\n"; + cerr << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c) { + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int lc = 0; + while(in) { + getline(in, line); + if (!in) break; + if (lc % size == rank) c->push_back(line); + ++lc; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct ReachabilityObserver : public DecoderObserver { + + virtual void NotifyDecodingStart(const SentenceMetadata&) { + reachable = false; + } + + // compute model expectations, denominator of objective + virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { + } + + // compute "empirical" expectations, numerator of objective + virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { + reachable = true; + } + + bool reachable; +}; + +#ifdef HAVE_MPI +namespace mpi = boost::mpi; +#endif + +int main(int argc, char** argv) { +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + if (size > 1) SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) + return false; + + // load cdec.ini and set up decoder + ReadFile ini_rf(conf["decoder_config"].as<string>()); + Decoder decoder(ini_rf.stream()); + if (decoder.GetConf()["input"].as<string>() != "-") { + cerr << "cdec.ini must not set an input file\n"; + abort(); + } + + if (FD::UsingPerfectHashFunction()) { + cerr << "Your configuration file has enabled a cmph hash function. Please disable.\n"; + return 1; + } + + // load optional weights + if (conf.count("weights")) + Weights::InitFromFile(conf["weights"].as<string>(), &decoder.CurrentWeightVector()); + + vector<string> corpus; + ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus); + assert(corpus.size() > 0); + + + if (rank == 0) + cerr << "Each processor is decoding ~" << corpus.size() << " training examples...\n"; + + size_t num_reached = 0; + { + ostringstream os; + os << conf["output_prefix"].as<string>() << '.' << rank << "_of_" << size; + WriteFile wf(os.str()); + ostream& out = *wf.stream(); + ReachabilityObserver observer; + for (int i = 0; i < corpus.size(); ++i) { + decoder.Decode(corpus[i], &observer); + if (observer.reachable) { + out << corpus[i] << endl; + ++num_reached; + } + corpus[i].clear(); + } + cerr << "Shard " << rank << '/' << size << " finished, wrote " + << num_reached << " instances to " << os.str() << endl; + } + + size_t total = 0; +#ifdef HAVE_MPI + reduce(world, num_reached, total, std::plus<double>(), 0); +#else + total = num_reached; +#endif + if (rank == 0) { + cerr << "-----------------------------------------\n"; + cerr << "TOTAL = " << total << " instances\n"; + } + return 0; +} + diff --git a/training/crf/mpi_flex_optimize.cc b/training/crf/mpi_flex_optimize.cc new file mode 100644 index 00000000..b52decdc --- /dev/null +++ b/training/crf/mpi_flex_optimize.cc @@ -0,0 +1,386 @@ +#include <sstream> +#include <iostream> +#include <fstream> +#include <vector> +#include <cassert> +#include <cmath> + +#include <boost/shared_ptr.hpp> +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "stringlib.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "optimize.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" +#include "sampler.h" + +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("cdec_config,c",po::value<string>(),"Decoder configuration file") + ("weights,w",po::value<string>(),"Initial feature weights") + ("training_data,d",po::value<string>(),"Training data") + ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(6), "Number of training instances evaluated per processor in each minibatch") + ("minibatch_iterations,i", po::value<unsigned>()->default_value(10), "Number of optimization iterations per minibatch") + ("iterations,I", po::value<unsigned>()->default_value(50), "Number of passes through the training data before termination") + ("regularization_strength,C", po::value<double>()->default_value(0.2), "Regularization strength") + ("time_series_strength,T", po::value<double>()->default_value(0.0), "Time series regularization strength") + ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)") + ("lbfgs_memory_buffers,M", po::value<unsigned>()->default_value(10), "Number of memory buffers for LBFGS history"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("training_data") || !conf->count("cdec_config")) { + cerr << "LBFGS minibatch online optimizer (MPI support " +#if HAVE_MPI + << "enabled" +#else + << "not enabled" +#endif + << ")\n" << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c, vector<int>* order) { + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int id = 0; + while(in) { + getline(in, line); + if (!in) break; + if (id % size == rank) { + c->push_back(line); + order->push_back(id); + } + ++id; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct CopyHGsObserver : public DecoderObserver { + Hypergraph* hg_; + Hypergraph* gold_hg_; + + // this can free up some memory + void RemoveRules(Hypergraph* h) { + for (unsigned i = 0; i < h->edges_.size(); ++i) + h->edges_[i].rule_.reset(); + } + + void SetCurrentHypergraphs(Hypergraph* h, Hypergraph* gold_h) { + hg_ = h; + gold_hg_ = gold_h; + } + + virtual void NotifyDecodingStart(const SentenceMetadata&) { + state = 1; + } + + // compute model expectations, denominator of objective + virtual void NotifyTranslationForest(const SentenceMetadata&, Hypergraph* hg) { + *hg_ = *hg; + RemoveRules(hg_); + assert(state == 1); + state = 2; + } + + // compute "empirical" expectations, numerator of objective + virtual void NotifyAlignmentForest(const SentenceMetadata&, Hypergraph* hg) { + assert(state == 2); + state = 3; + *gold_hg_ = *hg; + RemoveRules(gold_hg_); + } + + virtual void NotifyDecodingComplete(const SentenceMetadata&) { + if (state == 3) { + } else { + hg_->clear(); + gold_hg_->clear(); + } + } + + int state; +}; + +void ReadConfig(const string& ini, istringstream* out) { + ReadFile rf(ini); + istream& in = *rf.stream(); + ostringstream os; + while(in) { + string line; + getline(in, line); + if (!in) continue; + os << line << endl; + } + out->str(os.str()); +} + +#ifdef HAVE_MPI +namespace boost { namespace mpi { + template<> + struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> > + : mpl::true_ { }; +} } // end namespace boost::mpi +#endif + +void AddGrad(const SparseVector<prob_t> x, double s, SparseVector<double>* acc) { + for (SparseVector<prob_t>::const_iterator it = x.begin(); it != x.end(); ++it) + acc->add_value(it->first, it->second.as_float() * s); +} + +double PNorm(const vector<double>& v, const double p) { + double acc = 0; + for (int i = 0; i < v.size(); ++i) + acc += pow(v[i], p); + return pow(acc, 1.0 / p); +} + +void VV(ostream&os, const vector<double>& v) { + for (int i = 1; i < v.size(); ++i) + if (v[i]) os << FD::Convert(i) << "=" << v[i] << " "; +} + +double ApplyRegularizationTerms(const double C, + const double T, + const vector<double>& weights, + const vector<double>& prev_weights, + double* g) { + double reg = 0; + for (size_t i = 0; i < weights.size(); ++i) { + const double prev_w_i = (i < prev_weights.size() ? prev_weights[i] : 0.0); + const double& w_i = weights[i]; + reg += C * w_i * w_i; + g[i] += 2 * C * w_i; + + reg += T * (w_i - prev_w_i) * (w_i - prev_w_i); + g[i] += 2 * T * (w_i - prev_w_i); + } + return reg; +} + +int main(int argc, char** argv) { +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + if (size > 1) SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + MT19937* rng = NULL; + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) + return 1; + + boost::shared_ptr<BatchOptimizer> o; + const unsigned lbfgs_memory_buffers = conf["lbfgs_memory_buffers"].as<unsigned>(); + const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); + const unsigned minibatch_iterations = conf["minibatch_iterations"].as<unsigned>(); + const double regularization_strength = conf["regularization_strength"].as<double>(); + const double time_series_strength = conf["time_series_strength"].as<double>(); + const bool use_time_series_reg = time_series_strength > 0.0; + const unsigned max_iteration = conf["iterations"].as<unsigned>(); + + vector<string> corpus; + vector<int> ids; + ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus, &ids); + assert(corpus.size() > 0); + + if (size_per_proc > corpus.size()) { + cerr << "Minibatch size (per processor) must be smaller or equal to the local corpus size!\n"; + return 1; + } + + // initialize decoder (loads hash functions if necessary) + istringstream ins; + ReadConfig(conf["cdec_config"].as<string>(), &ins); + Decoder decoder(&ins); + + // load initial weights + vector<weight_t> prev_weights; + if (conf.count("weights")) + Weights::InitFromFile(conf["weights"].as<string>(), &prev_weights); + + if (conf.count("random_seed")) + rng = new MT19937(conf["random_seed"].as<uint32_t>()); + else + rng = new MT19937; + + size_t total_corpus_size = 0; +#ifdef HAVE_MPI + reduce(world, corpus.size(), total_corpus_size, std::plus<size_t>(), 0); +#else + total_corpus_size = corpus.size(); +#endif + + if (rank == 0) + cerr << "Total corpus size: " << total_corpus_size << endl; + + CopyHGsObserver observer; + + int write_weights_every_ith = 100; // TODO configure + int titer = -1; + + vector<weight_t>& cur_weights = decoder.CurrentWeightVector(); + if (use_time_series_reg) { + cur_weights = prev_weights; + } else { + cur_weights.swap(prev_weights); + prev_weights.clear(); + } + + int iter = -1; + bool converged = false; + vector<double> gg; + while (!converged) { +#ifdef HAVE_MPI + mpi::timer timer; +#endif + ++iter; ++titer; + if (rank == 0) { + converged = (iter == max_iteration); + string fname = "weights.cur.gz"; + if (iter % write_weights_every_ith == 0) { + ostringstream o; o << "weights.epoch_" << iter << ".gz"; + fname = o.str(); + } + if (converged) { fname = "weights.final.gz"; } + ostringstream vv; + vv << "total iter=" << titer << " (of current config iter=" << iter << ") minibatch=" << size_per_proc << " sentences/proc x " << size << " procs. num_feats=" << FD::NumFeats() << " passes_thru_data=" << (titer * size_per_proc / static_cast<double>(corpus.size())); + const string svv = vv.str(); + Weights::WriteToFile(fname, cur_weights, true, &svv); + } + + vector<Hypergraph> hgs(size_per_proc); + vector<Hypergraph> gold_hgs(size_per_proc); + for (int i = 0; i < size_per_proc; ++i) { + int ei = corpus.size() * rng->next(); + int id = ids[ei]; + observer.SetCurrentHypergraphs(&hgs[i], &gold_hgs[i]); + decoder.SetId(id); + decoder.Decode(corpus[ei], &observer); + } + + SparseVector<double> local_grad, g; + double local_obj = 0; + o.reset(); + for (unsigned mi = 0; mi < minibatch_iterations; ++mi) { + local_grad.clear(); + g.clear(); + local_obj = 0; + + for (unsigned i = 0; i < size_per_proc; ++i) { + Hypergraph& hg = hgs[i]; + Hypergraph& hg_gold = gold_hgs[i]; + if (hg.edges_.size() < 2) continue; + + hg.Reweight(cur_weights); + hg_gold.Reweight(cur_weights); + SparseVector<prob_t> model_exp, gold_exp; + const prob_t z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(hg, &model_exp); + local_obj += log(z); + model_exp /= z; + AddGrad(model_exp, 1.0, &local_grad); + model_exp.clear(); + + const prob_t goldz = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(hg_gold, &gold_exp); + local_obj -= log(goldz); + + if (log(z) - log(goldz) < kMINUS_EPSILON) { + cerr << "DIFF. ERR! log_model_z < log_gold_z: " << log(z) << " " << log(goldz) << endl; + return 1; + } + + gold_exp /= goldz; + AddGrad(gold_exp, -1.0, &local_grad); + } + + double obj = 0; +#ifdef HAVE_MPI + reduce(world, local_obj, obj, std::plus<double>(), 0); + reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +#else + obj = local_obj; + g.swap(local_grad); +#endif + local_grad.clear(); + if (rank == 0) { + // g /= (size_per_proc * size); + if (!o) + o.reset(new LBFGSOptimizer(FD::NumFeats(), lbfgs_memory_buffers)); + gg.clear(); + gg.resize(FD::NumFeats()); + if (gg.size() != cur_weights.size()) { cur_weights.resize(gg.size()); } + for (SparseVector<double>::iterator it = g.begin(); it != g.end(); ++it) + if (it->first) { gg[it->first] = it->second; } + g.clear(); + double r = ApplyRegularizationTerms(regularization_strength, + time_series_strength, // * (iter == 0 ? 0.0 : 1.0), + cur_weights, + prev_weights, + &gg[0]); + obj += r; + if (mi == 0 || mi == (minibatch_iterations - 1)) { + if (!mi) cerr << iter << ' '; else cerr << ' '; + cerr << "OBJ=" << obj << " (REG=" << r << ")" << " |g|=" << PNorm(gg, 2) << " |w|=" << PNorm(cur_weights, 2); + if (mi > 0) cerr << endl << flush; else cerr << ' '; + } else { cerr << '.' << flush; } + // cerr << "w = "; VV(cerr, cur_weights); cerr << endl; + // cerr << "g = "; VV(cerr, gg); cerr << endl; + o->Optimize(obj, gg, &cur_weights); + } +#ifdef HAVE_MPI + broadcast(world, cur_weights, 0); + broadcast(world, converged, 0); + world.barrier(); +#endif + } + prev_weights = cur_weights; + } + return 0; +} diff --git a/training/crf/mpi_online_optimize.cc b/training/crf/mpi_online_optimize.cc new file mode 100644 index 00000000..9e1ae34c --- /dev/null +++ b/training/crf/mpi_online_optimize.cc @@ -0,0 +1,384 @@ +#include <sstream> +#include <iostream> +#include <fstream> +#include <vector> +#include <cassert> +#include <cmath> +#include <tr1/memory> +#include <ctime> + +#include <boost/program_options.hpp> +#include <boost/program_options/variables_map.hpp> + +#include "stringlib.h" +#include "verbose.h" +#include "hg.h" +#include "prob.h" +#include "inside_outside.h" +#include "ff_register.h" +#include "decoder.h" +#include "filelib.h" +#include "online_optimizer.h" +#include "fdict.h" +#include "weights.h" +#include "sparse_vector.h" +#include "sampler.h" + +#ifdef HAVE_MPI +#include <boost/mpi/timer.hpp> +#include <boost/mpi.hpp> +namespace mpi = boost::mpi; +#endif + +using namespace std; +namespace po = boost::program_options; + +bool InitCommandLine(int argc, char** argv, po::variables_map* conf) { + po::options_description opts("Configuration options"); + opts.add_options() + ("input_weights,w",po::value<string>(),"Input feature weights file") + ("frozen_features,z",po::value<string>(), "List of features not to optimize") + ("training_data,t",po::value<string>(),"Training data corpus") + ("training_agenda,a",po::value<string>(), "Text file listing a series of configuration files and the number of iterations to train using each configuration successively") + ("minibatch_size_per_proc,s", po::value<unsigned>()->default_value(5), "Number of training instances evaluated per processor in each minibatch") + ("optimization_method,m", po::value<string>()->default_value("sgd"), "Optimization method (sgd)") + ("max_walltime", po::value<unsigned>(), "Maximum walltime to run (in minutes)") + ("random_seed,S", po::value<uint32_t>(), "Random seed (if not specified, /dev/random will be used)") + ("eta_0,e", po::value<double>()->default_value(0.2), "Initial learning rate for SGD (eta_0)") + ("L1,1","Use L1 regularization") + ("regularization_strength,C", po::value<double>()->default_value(1.0), "Regularization strength (C)"); + po::options_description clo("Command line options"); + clo.add_options() + ("config", po::value<string>(), "Configuration file") + ("help,h", "Print this help message and exit"); + po::options_description dconfig_options, dcmdline_options; + dconfig_options.add(opts); + dcmdline_options.add(opts).add(clo); + + po::store(parse_command_line(argc, argv, dcmdline_options), *conf); + if (conf->count("config")) { + ifstream config((*conf)["config"].as<string>().c_str()); + po::store(po::parse_config_file(config, dconfig_options), *conf); + } + po::notify(*conf); + + if (conf->count("help") || !conf->count("training_data") || !conf->count("training_agenda")) { + cerr << dcmdline_options << endl; + return false; + } + return true; +} + +void ReadTrainingCorpus(const string& fname, int rank, int size, vector<string>* c, vector<int>* order) { + ReadFile rf(fname); + istream& in = *rf.stream(); + string line; + int id = 0; + while(in) { + getline(in, line); + if (!in) break; + if (id % size == rank) { + c->push_back(line); + order->push_back(id); + } + ++id; + } +} + +static const double kMINUS_EPSILON = -1e-6; + +struct TrainingObserver : public DecoderObserver { + void Reset() { + acc_grad.clear(); + acc_obj = 0; + total_complete = 0; + } + + void SetLocalGradientAndObjective(vector<double>* g, double* o) const { + *o = acc_obj; + for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) + (*g)[it->first] = it->second.as_float(); + } + + virtual void NotifyDecodingStart(const SentenceMetadata& smeta) { + cur_model_exp.clear(); + cur_obj = 0; + state = 1; + } + + // compute model expectations, denominator of objective + virtual void NotifyTranslationForest(const SentenceMetadata& smeta, Hypergraph* hg) { + assert(state == 1); + state = 2; + const prob_t z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &cur_model_exp); + cur_obj = log(z); + cur_model_exp /= z; + } + + // compute "empirical" expectations, numerator of objective + virtual void NotifyAlignmentForest(const SentenceMetadata& smeta, Hypergraph* hg) { + assert(state == 2); + state = 3; + SparseVector<prob_t> ref_exp; + const prob_t ref_z = InsideOutside<prob_t, + EdgeProb, + SparseVector<prob_t>, + EdgeFeaturesAndProbWeightFunction>(*hg, &ref_exp); + ref_exp /= ref_z; + + double log_ref_z; +#if 0 + if (crf_uniform_empirical) { + log_ref_z = ref_exp.dot(feature_weights); + } else { + log_ref_z = log(ref_z); + } +#else + log_ref_z = log(ref_z); +#endif + + // rounding errors means that <0 is too strict + if ((cur_obj - log_ref_z) < kMINUS_EPSILON) { + cerr << "DIFF. ERR! log_model_z < log_ref_z: " << cur_obj << " " << log_ref_z << endl; + exit(1); + } + assert(!std::isnan(log_ref_z)); + ref_exp -= cur_model_exp; + acc_grad += ref_exp; + acc_obj += (cur_obj - log_ref_z); + } + + virtual void NotifyDecodingComplete(const SentenceMetadata& smeta) { + if (state == 3) { + ++total_complete; + } else { + } + } + + void GetGradient(SparseVector<double>* g) const { + g->clear(); + for (SparseVector<prob_t>::const_iterator it = acc_grad.begin(); it != acc_grad.end(); ++it) + g->set_value(it->first, it->second.as_float()); + } + + int total_complete; + SparseVector<prob_t> cur_model_exp; + SparseVector<prob_t> acc_grad; + double acc_obj; + double cur_obj; + int state; +}; + +#ifdef HAVE_MPI +namespace boost { namespace mpi { + template<> + struct is_commutative<std::plus<SparseVector<double> >, SparseVector<double> > + : mpl::true_ { }; +} } // end namespace boost::mpi +#endif + +bool LoadAgenda(const string& file, vector<pair<string, int> >* a) { + ReadFile rf(file); + istream& in = *rf.stream(); + string line; + while(in) { + getline(in, line); + if (!in) break; + if (line.empty()) continue; + if (line[0] == '#') continue; + int sc = 0; + if (line.size() < 3) return false; + for (int i = 0; i < line.size(); ++i) { if (line[i] == ' ') ++sc; } + if (sc != 1) { cerr << "Too many spaces in line: " << line << endl; return false; } + size_t d = line.find(" "); + pair<string, int> x; + x.first = line.substr(0,d); + x.second = atoi(line.substr(d+1).c_str()); + a->push_back(x); + if (!FileExists(x.first)) { + cerr << "Can't find file " << x.first << endl; + return false; + } + } + return true; +} + +int main(int argc, char** argv) { + cerr << "THIS SOFTWARE IS DEPRECATED YOU SHOULD USE mpi_flex_optimize\n"; +#ifdef HAVE_MPI + mpi::environment env(argc, argv); + mpi::communicator world; + const int size = world.size(); + const int rank = world.rank(); +#else + const int size = 1; + const int rank = 0; +#endif + if (size > 1) SetSilent(true); // turn off verbose decoder output + register_feature_functions(); + std::tr1::shared_ptr<MT19937> rng; + + po::variables_map conf; + if (!InitCommandLine(argc, argv, &conf)) + return 1; + + vector<pair<string, int> > agenda; + if (!LoadAgenda(conf["training_agenda"].as<string>(), &agenda)) + return 1; + if (rank == 0) + cerr << "Loaded agenda defining " << agenda.size() << " training epochs\n"; + + assert(agenda.size() > 0); + + if (1) { // hack to load the feature hash functions -- TODO this should not be in cdec.ini + const string& cur_config = agenda[0].first; + const unsigned max_iteration = agenda[0].second; + ReadFile ini_rf(cur_config); + Decoder decoder(ini_rf.stream()); + } + + // load initial weights + vector<weight_t> init_weights; + if (conf.count("input_weights")) + Weights::InitFromFile(conf["input_weights"].as<string>(), &init_weights); + + vector<int> frozen_fids; + if (conf.count("frozen_features")) { + ReadFile rf(conf["frozen_features"].as<string>()); + istream& in = *rf.stream(); + string line; + while(in) { + getline(in, line); + if (line.empty()) continue; + if (line[0] == ' ' || line[line.size() - 1] == ' ') { line = Trim(line); } + frozen_fids.push_back(FD::Convert(line)); + } + if (rank == 0) cerr << "Freezing " << frozen_fids.size() << " features.\n"; + } + + vector<string> corpus; + vector<int> ids; + ReadTrainingCorpus(conf["training_data"].as<string>(), rank, size, &corpus, &ids); + assert(corpus.size() > 0); + + std::tr1::shared_ptr<OnlineOptimizer> o; + std::tr1::shared_ptr<LearningRateSchedule> lr; + + const unsigned size_per_proc = conf["minibatch_size_per_proc"].as<unsigned>(); + if (size_per_proc > corpus.size()) { + cerr << "Minibatch size must be smaller than corpus size!\n"; + return 1; + } + + size_t total_corpus_size = 0; +#ifdef HAVE_MPI + reduce(world, corpus.size(), total_corpus_size, std::plus<size_t>(), 0); +#else + total_corpus_size = corpus.size(); +#endif + + if (rank == 0) { + cerr << "Total corpus size: " << total_corpus_size << endl; + const unsigned batch_size = size_per_proc * size; + // TODO config + lr.reset(new ExponentialDecayLearningRate(batch_size, conf["eta_0"].as<double>())); + + const string omethod = conf["optimization_method"].as<string>(); + if (omethod == "sgd") { + const double C = conf["regularization_strength"].as<double>(); + o.reset(new CumulativeL1OnlineOptimizer(lr, total_corpus_size, C, frozen_fids)); + } else { + assert(!"fail"); + } + } + if (conf.count("random_seed")) + rng.reset(new MT19937(conf["random_seed"].as<uint32_t>())); + else + rng.reset(new MT19937); + + SparseVector<double> x; + Weights::InitSparseVector(init_weights, &x); + TrainingObserver observer; + + int write_weights_every_ith = 100; // TODO configure + int titer = -1; + + unsigned timeout = 0; + if (conf.count("max_walltime")) timeout = 60 * conf["max_walltime"].as<unsigned>(); + const time_t start_time = time(NULL); + for (int ai = 0; ai < agenda.size(); ++ai) { + const string& cur_config = agenda[ai].first; + const unsigned max_iteration = agenda[ai].second; + if (rank == 0) + cerr << "STARTING TRAINING EPOCH " << (ai+1) << ". CONFIG=" << cur_config << endl; + // load cdec.ini and set up decoder + ReadFile ini_rf(cur_config); + Decoder decoder(ini_rf.stream()); + vector<weight_t>& lambdas = decoder.CurrentWeightVector(); + if (ai == 0) { lambdas.swap(init_weights); init_weights.clear(); } + + if (rank == 0) + o->ResetEpoch(); // resets the learning rate-- TODO is this good? + + int iter = -1; + bool converged = false; + while (!converged) { +#ifdef HAVE_MPI + mpi::timer timer; +#endif + x.init_vector(&lambdas); + ++iter; ++titer; + observer.Reset(); + if (rank == 0) { + converged = (iter == max_iteration); + Weights::SanityCheck(lambdas); + static int cc = 0; ++cc; if (cc > 1) { Weights::ShowLargestFeatures(lambdas); } + string fname = "weights.cur.gz"; + if (iter % write_weights_every_ith == 0) { + ostringstream o; o << "weights.epoch_" << (ai+1) << '.' << iter << ".gz"; + fname = o.str(); + } + const time_t cur_time = time(NULL); + if (timeout) { + if ((cur_time - start_time) > timeout) converged = true; + } + if (converged && ((ai+1)==agenda.size())) { fname = "weights.final.gz"; } + ostringstream vv; + double minutes = (cur_time - start_time) / 60.0; + vv << "total walltime=" << minutes << "min iter=" << titer << " (of current config iter=" << iter << ") minibatch=" << size_per_proc << " sentences/proc x " << size << " procs. num_feats=" << x.size() << '/' << FD::NumFeats() << " passes_thru_data=" << (titer * size_per_proc / static_cast<double>(corpus.size())) << " eta=" << lr->eta(titer); + const string svv = vv.str(); + cerr << svv << endl; + Weights::WriteToFile(fname, lambdas, true, &svv); + } + + for (int i = 0; i < size_per_proc; ++i) { + int ei = corpus.size() * rng->next(); + int id = ids[ei]; + decoder.SetId(id); + decoder.Decode(corpus[ei], &observer); + } + SparseVector<double> local_grad, g; + observer.GetGradient(&local_grad); +#ifdef HAVE_MPI + reduce(world, local_grad, g, std::plus<SparseVector<double> >(), 0); +#else + g.swap(local_grad); +#endif + local_grad.clear(); + if (rank == 0) { + g /= (size_per_proc * size); + o->UpdateWeights(g, FD::NumFeats(), &x); + } +#ifdef HAVE_MPI + broadcast(world, x, 0); + broadcast(world, converged, 0); + world.barrier(); + if (rank == 0) { cerr << " ELAPSED TIME THIS ITERATION=" << timer.elapsed() << endl; } +#endif + } + } + return 0; +} |