From c0eee37401d3731555346cb260330049b5dc99e7 Mon Sep 17 00:00:00 2001 From: philblunsom Date: Thu, 15 Jul 2010 03:39:32 +0000 Subject: working on the mpi version git-svn-id: https://ws10smt.googlecode.com/svn/trunk@257 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pyp-topics/src/Makefile.am | 6 +- gi/pyp-topics/src/Makefile.mpi | 25 +++++++ gi/pyp-topics/src/macros.Linux | 20 ++++++ gi/pyp-topics/src/mpi-pyp-topics.cc | 122 ++++++++++++-------------------- gi/pyp-topics/src/mpi-pyp-topics.hh | 32 ++++----- gi/pyp-topics/src/mpi-pyp.hh | 4 +- gi/pyp-topics/src/mpi-train-contexts.cc | 105 ++++++++++++++------------- 7 files changed, 166 insertions(+), 148 deletions(-) create mode 100644 gi/pyp-topics/src/Makefile.mpi create mode 100644 gi/pyp-topics/src/macros.Linux diff --git a/gi/pyp-topics/src/Makefile.am b/gi/pyp-topics/src/Makefile.am index a3a30acd..c22819db 100644 --- a/gi/pyp-topics/src/Makefile.am +++ b/gi/pyp-topics/src/Makefile.am @@ -1,4 +1,4 @@ -bin_PROGRAMS = pyp-topics-train pyp-contexts-train mpi-pyp-contexts-train +bin_PROGRAMS = pyp-topics-train pyp-contexts-train #mpi-pyp-contexts-train contexts_lexer.cc: contexts_lexer.l $(LEX) -s -CF -8 -o$@ $< @@ -9,8 +9,8 @@ pyp_topics_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz pyp_contexts_train_SOURCES = mt19937ar.c corpus.cc gzstream.cc pyp-topics.cc contexts_lexer.cc contexts_corpus.cc train-contexts.cc pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz -mpi_pyp_contexts_train_SOURCES = mt19937ar.c corpus.cc gzstream.cc mpi-pyp-topics.cc contexts_lexer.cc contexts_corpus.cc mpi-train-contexts.cc -mpi_pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz +#mpi_pyp_contexts_train_SOURCES = mt19937ar.c corpus.cc gzstream.cc mpi-pyp-topics.cc contexts_lexer.cc contexts_corpus.cc mpi-train-contexts.cc +#mpi_pyp_contexts_train_LDADD = $(top_srcdir)/decoder/libcdec.a -lz AM_CPPFLAGS = -W -Wall -Wno-sign-compare -funroll-loops diff --git a/gi/pyp-topics/src/Makefile.mpi b/gi/pyp-topics/src/Makefile.mpi new file mode 100644 index 00000000..b22cc7e7 --- /dev/null +++ b/gi/pyp-topics/src/Makefile.mpi @@ -0,0 +1,25 @@ +BLD_ARCH=$(shell uname -s) +-include macros.${BLD_ARCH} + +local_objs = mt19937ar.o corpus.o gzstream.o mpi-pyp-topics.o contexts_lexer.o contexts_corpus.o mpi-train-contexts.o + +all: mpi-pyp-contexts-train + +-include makefile.depend + +#-----------------------# +# Local stuff +#-----------------------# + +mpi-pyp-contexts-train: mpi-train-contexts.o $(local_objs) + $(CXX) -o $@ $^ $(LDFLAGS) + +.PHONY: depend echo +depend: + $(CXX) -MM $(CXXFLAGS) *.cc *.c | sed 's/^\(.*\.o:\)/obj\/\1/' > makefile.depend + +clean: + rm -f obj/*.o + +#clobber: clean +# rm makefile.depend ../bin/${ARCH}/* diff --git a/gi/pyp-topics/src/macros.Linux b/gi/pyp-topics/src/macros.Linux new file mode 100644 index 00000000..ade6d92d --- /dev/null +++ b/gi/pyp-topics/src/macros.Linux @@ -0,0 +1,20 @@ +#CC=gcc-4.1 +#CXX=g++-4.1 +#LD=g++-4.1 +#FC=gfortran-4.1 +CC = mpicc +CXX = mpicxx +LD = mpicxx +FC = mpif77 + +CXXFLAGS = -Wall -I/home/pblunsom/packages/include +CFLAGS = -Wall -I/home/pblunsom/packages/include +FFLAGS = -Wall +LDFLAGS = -lm -lz -L/home/pblunsom/packages/lib \ + -lboost_program_options -lboost_mpi -lboost_serialization \ + -lboost_regex -L../../../decoder -lcdec + +FFLAGS += -g -O6 -march=native +CFLAGS += -g -O6 -march=native +CXXFLAGS += -g -O6 -march=native +LDFLAGS += -g -O6 -march=native diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc index d2daad4f..2ad28278 100644 --- a/gi/pyp-topics/src/mpi-pyp-topics.cc +++ b/gi/pyp-topics/src/mpi-pyp-topics.cc @@ -1,3 +1,5 @@ +#include + #include "timing.h" #include "mpi-pyp-topics.hh" @@ -6,37 +8,51 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start, int freq_cutoff_end, int freq_cutoff_interval, int max_contexts_per_document) { + std::cout << "I am process " << m_rank << " of " << m_size << "." << std::endl; Timer timer; + std::cout << m_am_root << std::endl; + + int documents = corpus.num_documents(); + m_mpi_start = 0; + m_mpi_end = documents; + if (m_size != 1) { + assert(documents < std::numeric_limits::max()); + m_mpi_start = (documents / m_size) * m_rank; + if (m_rank == m_size-1) m_mpi_end = documents; + else m_mpi_end = (documents / m_size)*(m_rank+1); + } + int local_documents = m_mpi_end - m_mpi_start; + if (!m_backoff.get()) { m_word_pyps.clear(); m_word_pyps.push_back(PYPs()); } - std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" + if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; for (int i=0; i<(int)m_word_pyps.size(); ++i) { m_word_pyps.at(i).reserve(m_num_topics); for (int j=0; j(0.5, 1.0, m_seed)); + m_word_pyps.at(i).push_back(new PYP(0.5, 1.0)); } - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; m_document_pyps.reserve(corpus.num_documents()); for (int j=0; j(0.5, 1.0, m_seed)); + m_document_pyps.push_back(new PYP(0.5, 1.0)); m_topic_p0 = 1.0/m_num_topics; m_term_p0 = 1.0/corpus.num_types(); m_backoff_p0 = 1.0/corpus.num_documents(); - std::cerr << " Documents: " << corpus.num_documents() << " Terms: " + if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: " << corpus.num_types() << std::endl; int frequency_cutoff = freq_cutoff_start; - std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; timer.Reset(); // Initialisation pass @@ -74,11 +90,11 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, m_corpus_topics[document_id][term_index] = new_topic; } } - std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; + if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; - int* randomDocIndices = new int[corpus.num_documents()]; - for (int i = 0; i < corpus.num_documents(); ++i) - randomDocIndices[i] = i; + int* randomDocIndices = new int[local_documents]; + for (int i = 0; i < local_documents; ++i) + randomDocIndices[i] = i+m_mpi_start; // Sampling phase for (int curr_sample=0; curr_sample < samples; ++curr_sample) { @@ -86,16 +102,15 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, && curr_sample % freq_cutoff_interval == 1 && frequency_cutoff > freq_cutoff_end) { frequency_cutoff--; - std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; } - std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); + if (m_am_root) std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); // Randomize the corpus indexing array int tmp; int processed_terms=0; - for (int i = corpus.num_documents()-1; i > 0; --i) - { + for (int i = local_documents-1; i > 0; --i) { //i+1 since j \in [0,i] but rnd() \in [0,1) int j = (int)(rnd() * (i+1)); assert(j >= 0 && j <= i); @@ -106,7 +121,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, // for each document in the corpus int document_id; - for (int i=0; igetResult(); //blocks until worker done - log_p += wresult; - //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); - + for (PYPs::iterator pypIt=m_document_pyps.begin(); + pypIt != m_document_pyps.end(); ++pypIt) { + pypIt->resample_prior(); + log_p += pypIt->log_restaurant_prob(); } if (m_use_topic_pyp) { @@ -195,64 +200,25 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } std::cerr.precision(10); - std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; + if (m_am_root) std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; timer.Reset(); int k=0; - std::cerr << "Topics distribution: "; + if (m_am_root) std::cerr << "Topics distribution: "; std::cerr.precision(2); for (PYPs::iterator pypIt=m_word_pyps.front().begin(); pypIt != m_word_pyps.front().end(); ++pypIt, ++k) { - if (k % 5 == 0) std::cerr << std::endl << '\t'; - std::cerr << "<" << k << ":" << pypIt->num_customers() << "," + if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t'; + if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << "," << pypIt->num_types() << "," << m_topic_pyp.prob(k, m_topic_p0) << "> "; } std::cerr.precision(4); - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; } } delete [] randomDocIndices; } -PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) -{ - int resample_counter=0; - F log_p = 0.0; - PYPs::iterator pypIt = m_document_pyps.begin(); - PYPs::iterator end = m_document_pyps.end(); - pypIt += thread_id; -// std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); - - while (pypIt < end) - { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); - if (resample_counter++ % 5000 == 0) { - std::cerr << "."; std::cerr.flush(); - } - pypIt += num_threads; - } -// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); - - return log_p; -} - -//PYPTopics::F PYPTopics::hresample_topics() -//{ -// F log_p = 0.0; -// for (std::vector::iterator levelIt=m_word_pyps.begin(); -// levelIt != m_word_pyps.end(); ++levelIt) { -// for (PYPs::iterator pypIt=levelIt->begin(); -// pypIt != levelIt->end(); ++pypIt) { -// -// pypIt->resample_prior(); -// log_p += pypIt->log_restaurant_prob(); -// } -// } -// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); -// -// return log_p; -//} void PYPTopics::decrement(const Term& term, int topic, int level) { //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; diff --git a/gi/pyp-topics/src/mpi-pyp-topics.hh b/gi/pyp-topics/src/mpi-pyp-topics.hh index d978c7a1..5da35d82 100644 --- a/gi/pyp-topics/src/mpi-pyp-topics.hh +++ b/gi/pyp-topics/src/mpi-pyp-topics.hh @@ -3,15 +3,16 @@ #include #include -#include +#include #include #include #include +#include +#include #include "mpi-pyp.hh" #include "corpus.hh" -#include "workers.hh" class PYPTopics { public: @@ -20,13 +21,17 @@ public: typedef double F; public: - PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0, - int max_threads = 1) + PYPTopics(int num_topics, bool use_topic_pyp=false, unsigned long seed = 0) : m_num_topics(num_topics), m_word_pyps(1), - m_topic_pyp(0.5,1.0,seed), m_use_topic_pyp(use_topic_pyp), + m_topic_pyp(0.5,1.0), m_use_topic_pyp(use_topic_pyp), m_seed(seed), uni_dist(0,1), rng(seed == 0 ? (unsigned long)this : seed), - rnd(rng, uni_dist), max_threads(max_threads) {} + rnd(rng, uni_dist), m_mpi_start(-1), m_mpi_end(-1) { + boost::mpi::communicator m_world; + m_rank = m_world.rank(); + m_size = m_world.size(); + m_am_root = (m_rank == 0); + } void sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start=0, int freq_cutoff_end=0, @@ -81,17 +86,12 @@ private: gen_type rnd; //instantiate: rnd(rng, uni_dist) //call: rnd() generates uniform on [0,1) - typedef boost::function JobReturnsF; - typedef SimpleWorker SimpleResampleWorker; - typedef boost::ptr_vector WorkerPtrVect; - - F hresample_docs(int num_threads, int thread_id); - -// F hresample_topics(); - - int max_threads; - TermBackoffPtr m_backoff; + + boost::mpi::communicator m_world; + bool m_am_root; + int m_rank, m_size; + int m_mpi_start, m_mpi_end; }; #endif // PYP_TOPICS_HH diff --git a/gi/pyp-topics/src/mpi-pyp.hh b/gi/pyp-topics/src/mpi-pyp.hh index dc47244b..3396f92b 100644 --- a/gi/pyp-topics/src/mpi-pyp.hh +++ b/gi/pyp-topics/src/mpi-pyp.hh @@ -32,7 +32,7 @@ public: // using google::sparse_hash_map::begin; // using google::sparse_hash_map::end; - PYP(double a, double b, unsigned long seed = 0, Hash hash=Hash()); + PYP(double a, double b, Hash hash=Hash()); int increment(Dish d, double p0); int decrement(Dish d); @@ -153,7 +153,7 @@ private: }; template -PYP::PYP(double a, double b, unsigned long seed, Hash) +PYP::PYP(double a, double b, Hash) : std::tr1::unordered_map(10), _a(a), _b(b), //: google::sparse_hash_map(10), _a(a), _b(b), _a_beta_a(1), _a_beta_b(1), _b_gamma_s(1), _b_gamma_c(1), diff --git a/gi/pyp-topics/src/mpi-train-contexts.cc b/gi/pyp-topics/src/mpi-train-contexts.cc index 6309fe93..956ce123 100644 --- a/gi/pyp-topics/src/mpi-train-contexts.cc +++ b/gi/pyp-topics/src/mpi-train-contexts.cc @@ -8,6 +8,8 @@ #include #include #include +#include +#include // Local #include "mpi-pyp-topics.hh" @@ -24,8 +26,12 @@ using namespace std; int main(int argc, char **argv) { - cout << "Pitman Yor topic models: Copyright 2010 Phil Blunsom\n"; - cout << REVISION << '\n' <()->default_value(0), "initial frequency cutoff.") ("freq-cutoff-end", value()->default_value(0), "final frequency cutoff.") ("freq-cutoff-interval", value()->default_value(0), "number of iterations between frequency decrement.") - ("max-threads", value()->default_value(1), "maximum number of simultaneous threads allowed") ("max-contexts-per-document", value()->default_value(0), "Only sample the n most frequent contexts for a document.") ; @@ -81,7 +86,7 @@ int main(int argc, char **argv) // seed the random number generator: 0 = automatic, specify value otherwise unsigned long seed = 0; - PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed, vm["max-threads"].as()); + PYPTopics model(vm["topics"].as(), vm.count("hierarchical-topics"), seed); // read the data BackoffGenerator* backoff_gen=0; @@ -112,58 +117,60 @@ int main(int argc, char **argv) vm["freq-cutoff-interval"].as(), vm["max-contexts-per-document"].as()); - if (vm.count("document-topics-out")) { - ogzstream documents_out(vm["document-topics-out"].as().c_str()); - - int document_id=0; - map all_terms; - for (Corpus::const_iterator corpusIt=contexts_corpus.begin(); - corpusIt != contexts_corpus.end(); ++corpusIt, ++document_id) { - vector unique_terms; - for (Document::const_iterator docIt=corpusIt->begin(); - docIt != corpusIt->end(); ++docIt) { - if (unique_terms.empty() || *docIt != unique_terms.back()) - unique_terms.push_back(*docIt); - // increment this terms frequency - pair::iterator,bool> insert_result = all_terms.insert(make_pair(*docIt,1)); - if (!insert_result.second) - all_terms[*docIt] = all_terms[*docIt] + 1; + if (world.rank() == 0) { + if (vm.count("document-topics-out")) { + ogzstream documents_out(vm["document-topics-out"].as().c_str()); + + int document_id=0; + map all_terms; + for (Corpus::const_iterator corpusIt=contexts_corpus.begin(); + corpusIt != contexts_corpus.end(); ++corpusIt, ++document_id) { + vector unique_terms; + for (Document::const_iterator docIt=corpusIt->begin(); + docIt != corpusIt->end(); ++docIt) { + if (unique_terms.empty() || *docIt != unique_terms.back()) + unique_terms.push_back(*docIt); + // increment this terms frequency + pair::iterator,bool> insert_result = all_terms.insert(make_pair(*docIt,1)); + if (!insert_result.second) + all_terms[*docIt] = all_terms[*docIt] + 1; //insert_result.first++; + } + documents_out << contexts_corpus.key(document_id) << '\t'; + documents_out << model.max(document_id) << " " << corpusIt->size() << " ||| "; + for (std::vector::const_iterator termIt=unique_terms.begin(); + termIt != unique_terms.end(); ++termIt) { + if (termIt != unique_terms.begin()) + documents_out << " ||| "; + vector strings = contexts_corpus.context2string(*termIt); + copy(strings.begin(), strings.end(),ostream_iterator(documents_out, " ")); + documents_out << "||| C=" << model.max(document_id, *termIt); + + } + documents_out <size() << " ||| "; - for (std::vector::const_iterator termIt=unique_terms.begin(); - termIt != unique_terms.end(); ++termIt) { - if (termIt != unique_terms.begin()) - documents_out << " ||| "; - vector strings = contexts_corpus.context2string(*termIt); - copy(strings.begin(), strings.end(),ostream_iterator(documents_out, " ")); - documents_out << "||| C=" << model.max(document_id, *termIt); - + documents_out.close(); + + if (vm.count("default-topics-out")) { + ofstream default_topics(vm["default-topics-out"].as().c_str()); + default_topics << model.max_topic() <::const_iterator termIt=all_terms.begin(); termIt != all_terms.end(); ++termIt) { + vector strings = contexts_corpus.context2string(termIt->first); + default_topics << model.max(-1, termIt->first) << " ||| " << termIt->second << " ||| "; + copy(strings.begin(), strings.end(),ostream_iterator(default_topics, " ")); + default_topics <().c_str()); - default_topics << model.max_topic() <::const_iterator termIt=all_terms.begin(); termIt != all_terms.end(); ++termIt) { - vector strings = contexts_corpus.context2string(termIt->first); - default_topics << model.max(-1, termIt->first) << " ||| " << termIt->second << " ||| "; - copy(strings.begin(), strings.end(),ostream_iterator(default_topics, " ")); - default_topics <().c_str()); + model.print_topic_terms(topics_out); + topics_out.close(); } - } - if (vm.count("topic-words-out")) { - ogzstream topics_out(vm["topic-words-out"].as().c_str()); - model.print_topic_terms(topics_out); - topics_out.close(); + cout <