From 597db20b3b38fd0cbb3e3d9a7105b0c3c5c37e84 Mon Sep 17 00:00:00 2001 From: redpony Date: Thu, 30 Sep 2010 23:21:44 +0000 Subject: filtering program, currently crashes with SEGV of unknown origin when run on many processors git-svn-id: https://ws10smt.googlecode.com/svn/trunk@667 ec762483-ff6d-05da-a07a-a48fb63a330f --- training/cllh_filter_grammar.cc | 94 +++++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 27 deletions(-) (limited to 'training') diff --git a/training/cllh_filter_grammar.cc b/training/cllh_filter_grammar.cc index b5a4c35d..90fe9fba 100644 --- a/training/cllh_filter_grammar.cc +++ b/training/cllh_filter_grammar.cc @@ -1,6 +1,8 @@ #include #include #include +#include // fork +#include // waitpid #include #include @@ -19,7 +21,8 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) { po::options_description opts("Configuration options"); opts.add_options() ("training_data,t",po::value(),"Training data corpus") - ("decoder_config,c",po::value(),"Decoder configuration file"); + ("decoder_config,c",po::value(),"Decoder configuration file") + ("ncpus,n",po::value()->default_value(1),"Number of CPUs to use"); po::options_description clo("Command line options"); clo.add_options() ("config", po::value(), "Configuration file") @@ -41,14 +44,19 @@ void InitCommandLine(int argc, char** argv, po::variables_map* conf) { } } -void ReadTrainingCorpus(const string& fname, vector* c) { +void ReadTrainingCorpus(const string& fname, int rank, int size, vector* c, vector* ids) { ReadFile rf(fname); istream& in = *rf.stream(); string line; + int lc = 0; while(in) { getline(in, line); if (!in) break; - c->push_back(line); + if (lc % size == rank) { + c->push_back(line); + ids->push_back(lc); + } + ++lc; } } @@ -68,11 +76,10 @@ struct TrainingObserver : public DecoderObserver { virtual void NotifyTranslationForest(const SentenceMetadata& smeta, Hypergraph* hg) { assert(state == 1); for (int i = 0; i < hg->edges_.size(); ++i) { - const TRulePtr& rule = hg->edges_[i].rule_; + const TRule* rule = hg->edges_[i].rule_.get(); if (rule->lhs_ == s_lhs || rule->lhs_ == goal_lhs) // fragile hack to filter out glue rules continue; - if (rule->prev_i == -1) - used.insert(rule); + used.insert(rule); } state = 2; } @@ -90,7 +97,7 @@ struct TrainingObserver : public DecoderObserver { } } - set used; + set used; const int s_lhs; const int goal_lhs; @@ -99,39 +106,72 @@ struct TrainingObserver : public DecoderObserver { int state; }; +void work(const string& fname, int rank, int size, Decoder* decoder) { + cerr << "Worker " << rank << '/' << size << " starting.\n"; + vector corpus; + vector ids; + ReadTrainingCorpus(fname, rank, size, &corpus, &ids); + assert(corpus.size() > 0); + cerr << " " << rank << '/' << size << ": has " << corpus.size() << " sentences to process\n"; + ostringstream oc; oc << "corpus." << rank << "_of_" << size; + WriteFile foc(oc.str()); + ostringstream og; og << "grammar." << rank << "_of_" << size << ".gz"; + WriteFile fog(og.str()); + + set all_used; + TrainingObserver observer; + for (int i = 0; i < corpus.size(); ++i) { + int ex_num = ids[i]; + decoder->SetId(ex_num); + decoder->Decode(corpus[ex_num], &observer); + if (observer.failed) { + (*foc.stream()) << "*** id=" << ex_num << " is unreachable\n"; + } else { + (*foc.stream()) << corpus[ex_num] << endl; + for (set::iterator it = observer.used.begin(); it != observer.used.end(); ++it) { + if (all_used.insert(*it).second) + (*fog.stream()) << **it << endl; + } + } + } +} + int main(int argc, char** argv) { - SetSilent(true); // turn off verbose decoder output register_feature_functions(); po::variables_map conf; InitCommandLine(argc, argv, &conf); - + const string fname = conf["training_data"].as(); + const unsigned ncpus = conf["ncpus"].as(); + assert(ncpus > 0); ReadFile ini_rf(conf["decoder_config"].as()); Decoder decoder(ini_rf.stream()); if (decoder.GetConf()["input"].as() != "-") { cerr << "cdec.ini must not set an input file\n"; abort(); } - - vector corpus; - ReadTrainingCorpus(conf["training_data"].as(), &corpus); - assert(corpus.size() > 0); - - TrainingObserver observer; - for (int i = 0; i < corpus.size(); ++i) { - int ex_num = i; - decoder.SetId(ex_num); - decoder.Decode(corpus[ex_num], &observer); - if (observer.failed) { - cerr << "*** id=" << ex_num << " is unreachable\n"; - observer.used.clear(); + SetSilent(true); // turn off verbose decoder output + cerr << "Forking " << ncpus << " time(s)\n"; + vector children; + for (int i = 0; i < ncpus; ++i) { + pid_t pid = fork(); + if (pid < 0) { + cerr << "Fork failed!\n"; + exit(1); + } + if (pid > 0) { + children.push_back(pid); } else { - cerr << corpus[ex_num] << endl; - for (set::iterator it = observer.used.begin(); it != observer.used.end(); ++it) { - cout << **it << endl; - (*it)->prev_i = 0; - } + work(fname, i, ncpus, &decoder); + cerr << " " << i << "/" << ncpus << " finished.\n"; + _exit(0); } } + for (int i = 0; i < children.size(); ++i) { + int status; + int w = waitpid(children[i], &status, 0); + if (w < 0) { cerr << "Error while waiting for children!"; return 1; } + cerr << "Child " << i << ": status=" << status << " sig?=" << WIFSIGNALED(status) << " sig=" << WTERMSIG(status) << endl; + } return 0; } -- cgit v1.2.3