diff options
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp-topics.cc')
-rw-r--r-- | gi/pyp-topics/src/mpi-pyp-topics.cc | 148 |
1 files changed, 93 insertions, 55 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc index 2ad28278..4525302e 100644 --- a/gi/pyp-topics/src/mpi-pyp-topics.cc +++ b/gi/pyp-topics/src/mpi-pyp-topics.cc @@ -4,7 +4,7 @@ #include "mpi-pyp-topics.hh" //#include <boost/date_time/posix_time/posix_time_types.hpp> -void PYPTopics::sample_corpus(const Corpus& corpus, int samples, +void MPIPYPTopics::sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start, int freq_cutoff_end, int freq_cutoff_interval, int max_contexts_per_document) { @@ -23,33 +23,33 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } int local_documents = m_mpi_end - m_mpi_start; - if (!m_backoff.get()) { m_word_pyps.clear(); - m_word_pyps.push_back(PYPs()); + m_word_pyps.push_back(MPIPYPs()); } if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" - << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; + << (m_word_pyps.size()>1 ? ":" : "s:") << std::endl; - for (int i=0; i<(int)m_word_pyps.size(); ++i) - { + for (int i=0; i<(int)m_word_pyps.size(); ++i) { m_word_pyps.at(i).reserve(m_num_topics); for (int j=0; j<m_num_topics; ++j) - m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0)); + m_word_pyps.at(i).push_back(new MPIPYP<int>(0.5, 1.0)); } if (m_am_root) std::cerr << std::endl; - m_document_pyps.reserve(corpus.num_documents()); - for (int j=0; j<corpus.num_documents(); ++j) + m_document_pyps.reserve(local_documents); + //m_document_pyps.reserve(corpus.num_documents()); + //for (int j=0; j<corpus.num_documents(); ++j) + for (int j=0; j<local_documents; ++j) m_document_pyps.push_back(new PYP<int>(0.5, 1.0)); m_topic_p0 = 1.0/m_num_topics; m_term_p0 = 1.0/corpus.num_types(); m_backoff_p0 = 1.0/corpus.num_documents(); - if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: " - << corpus.num_types() << std::endl; + if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << "(" + << local_documents << ")" << " Terms: " << corpus.num_types() << std::endl; int frequency_cutoff = freq_cutoff_start; if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; @@ -57,13 +57,16 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, timer.Reset(); // Initialisation pass int document_id=0, topic_counter=0; - for (Corpus::const_iterator corpusIt=corpus.begin(); - corpusIt != corpus.end(); ++corpusIt, ++document_id) { - m_corpus_topics.push_back(DocumentTopics(corpusIt->size(), 0)); + for (int i=0; i<local_documents; ++i) { + document_id = i+m_mpi_start; + + //for (Corpus::const_iterator corpusIt=corpus.begin(); + // corpusIt != corpus.end(); ++corpusIt, ++document_id) { + m_corpus_topics.push_back(DocumentTopics(corpus.at(document_id).size(), 0)); int term_index=0; - for (Document::const_iterator docIt=corpusIt->begin(); - docIt != corpusIt->end(); ++docIt, ++term_index) { + for (Document::const_iterator docIt=corpus.at(document_id).begin(); + docIt != corpus.at(document_id).end(); ++docIt, ++term_index) { topic_counter++; Term term = *docIt; @@ -80,21 +83,41 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, if (m_use_topic_pyp) { F p0 = m_topic_pyp.prob(new_topic, m_topic_p0); - int table_delta = m_document_pyps[document_id].increment(new_topic, p0); + int table_delta = m_document_pyps.at(i).increment(new_topic, p0); if (table_delta) m_topic_pyp.increment(new_topic, m_topic_p0); } - else m_document_pyps[document_id].increment(new_topic, m_topic_p0); + else m_document_pyps.at(i).increment(new_topic, m_topic_p0); } - m_corpus_topics[document_id][term_index] = new_topic; + m_corpus_topics.at(i).at(term_index) = new_topic; + } + } + + // Synchronise the topic->word counds across the processes. + for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin(); + levelIt != m_word_pyps.end(); ++levelIt) { + for (MPIPYPs::iterator pypIt=levelIt->begin(); + pypIt != levelIt->end(); ++pypIt) { + if (!m_am_root) boost::mpi::communicator().barrier(); + std::cerr << "Before Sync Process " << m_rank << ":"; + pypIt->debug_info(std::cerr); std::cerr << std::endl; + if (m_am_root) boost::mpi::communicator().barrier(); + + pypIt->synchronise(); + + if (!m_am_root) boost::mpi::communicator().barrier(); + std::cerr << "After Sync Process " << m_rank << ":"; + pypIt->debug_info(std::cerr); std::cerr << std::endl; + if (m_am_root) boost::mpi::communicator().barrier(); } } + if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; int* randomDocIndices = new int[local_documents]; for (int i = 0; i < local_documents; ++i) - randomDocIndices[i] = i+m_mpi_start; + randomDocIndices[i] = i; // Sampling phase for (int curr_sample=0; curr_sample < samples; ++curr_sample) { @@ -110,8 +133,8 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, // Randomize the corpus indexing array int tmp; int processed_terms=0; - for (int i = local_documents-1; i > 0; --i) { - //i+1 since j \in [0,i] but rnd() \in [0,1) + for (int i = (local_documents-1); i > 0; --i) { + //i+1 since j \in [0,i] but rnd() \in [0,1) int j = (int)(rnd() * (i+1)); assert(j >= 0 && j <= i); tmp = randomDocIndices[i]; @@ -120,15 +143,17 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } // for each document in the corpus - int document_id; - for (int i=0; i<local_documents; ++i) { - document_id = randomDocIndices[i]; + for (int rand_doc=0; rand_doc<local_documents; ++rand_doc) { + int doc_index = randomDocIndices[rand_doc]; + int document_id = doc_index + m_mpi_start; + const Document& doc = corpus.at(document_id); // for each term in the document int term_index=0; - Document::const_iterator docEnd = corpus.at(document_id).end(); - for (Document::const_iterator docIt=corpus.at(document_id).begin(); + Document::const_iterator docEnd = doc.end(); + for (Document::const_iterator docIt=doc.begin(); docIt != docEnd; ++docIt, ++term_index) { + if (max_contexts_per_document && term_index > max_contexts_per_document) break; @@ -140,36 +165,49 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, processed_terms++; // remove the prevous topic from the PYPs - int current_topic = m_corpus_topics[document_id][term_index]; + int current_topic = m_corpus_topics.at(doc_index).at(term_index); // a negative label mean that term hasn't been sampled yet if (current_topic >= 0) { decrement(term, current_topic); - int table_delta = m_document_pyps[document_id].decrement(current_topic); + int table_delta = m_document_pyps.at(doc_index).decrement(current_topic); if (m_use_topic_pyp && table_delta < 0) m_topic_pyp.decrement(current_topic); } // sample a new_topic - int new_topic = sample(document_id, term); + int new_topic = sample(doc_index, term); // add the new topic to the PYPs - m_corpus_topics[document_id][term_index] = new_topic; + m_corpus_topics.at(doc_index).at(term_index) = new_topic; increment(term, new_topic); if (m_use_topic_pyp) { F p0 = m_topic_pyp.prob(new_topic, m_topic_p0); - int table_delta = m_document_pyps[document_id].increment(new_topic, p0); + int table_delta = m_document_pyps.at(doc_index).increment(new_topic, p0); if (table_delta) m_topic_pyp.increment(new_topic, m_topic_p0); } - else m_document_pyps[document_id].increment(new_topic, m_topic_p0); + else m_document_pyps.at(doc_index).increment(new_topic, m_topic_p0); } if (document_id && document_id % 10000 == 0) { if (m_am_root) std::cerr << "."; std::cerr.flush(); } } m_world.barrier(); + // Synchronise the topic->word counds across the processes. + for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin(); + levelIt != m_word_pyps.end(); ++levelIt) { + for (MPIPYPs::iterator pypIt=levelIt->begin(); + pypIt != levelIt->end(); ++pypIt) { + std::cerr << "Before Sync Process " << m_rank << ":"; + pypIt->debug_info(std::cerr); std::cerr << std::endl; + pypIt->synchronise(); + std::cerr << "After Sync Process " << m_rank << ":"; + pypIt->debug_info(std::cerr); std::cerr << std::endl; + } + } + if (m_am_root) std::cerr << " ||| sampled " << processed_terms << " terms."; if (curr_sample != 0 && curr_sample % 10 == 0) { @@ -179,9 +217,9 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, // resample the hyperparamters F log_p=0.0; - for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); + for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin(); levelIt != m_word_pyps.end(); ++levelIt) { - for (PYPs::iterator pypIt=levelIt->begin(); + for (MPIPYPs::iterator pypIt=levelIt->begin(); pypIt != levelIt->end(); ++pypIt) { pypIt->resample_prior(); log_p += pypIt->log_restaurant_prob(); @@ -206,7 +244,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, int k=0; if (m_am_root) std::cerr << "Topics distribution: "; std::cerr.precision(2); - for (PYPs::iterator pypIt=m_word_pyps.front().begin(); + for (MPIPYPs::iterator pypIt=m_word_pyps.front().begin(); pypIt != m_word_pyps.front().end(); ++pypIt, ++k) { if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t'; if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << "," @@ -220,8 +258,8 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } -void PYPTopics::decrement(const Term& term, int topic, int level) { - //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; +void MPIPYPTopics::decrement(const Term& term, int topic, int level) { + //std::cerr << "MPIPYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; m_word_pyps.at(level).at(topic).decrement(term); if (m_backoff.get()) { Term backoff_term = (*m_backoff)[term]; @@ -230,8 +268,8 @@ void PYPTopics::decrement(const Term& term, int topic, int level) { } } -void PYPTopics::increment(const Term& term, int topic, int level) { - //std::cerr << "PYPTopics::increment(" << term << "," << topic << "," << level << ")" << std::endl; +void MPIPYPTopics::increment(const Term& term, int topic, int level) { + //std::cerr << "MPIPYPTopics::increment(" << term << "," << topic << "," << level << ")" << std::endl; m_word_pyps.at(level).at(topic).increment(term, word_pyps_p0(term, topic, level)); if (m_backoff.get()) { @@ -241,7 +279,7 @@ void PYPTopics::increment(const Term& term, int topic, int level) { } } -int PYPTopics::sample(const DocumentId& doc, const Term& term) { +int MPIPYPTopics::sample(const DocumentId& doc, const Term& term) { // First pass: collect probs F sum=0.0; std::vector<F> sums; @@ -252,7 +290,7 @@ int PYPTopics::sample(const DocumentId& doc, const Term& term) { if (m_use_topic_pyp) topic_prob = m_topic_pyp.prob(k, m_topic_p0); //F p_k_d = m_document_pyps[doc].prob(k, topic_prob); - F p_k_d = m_document_pyps[doc].unnormalised_prob(k, topic_prob); + F p_k_d = m_document_pyps.at(doc).unnormalised_prob(k, topic_prob); sum += (p_w_k*p_k_d); sums.push_back(sum); @@ -266,9 +304,9 @@ int PYPTopics::sample(const DocumentId& doc, const Term& term) { assert(false); } -PYPTopics::F PYPTopics::word_pyps_p0(const Term& term, int topic, int level) const { +MPIPYPTopics::F MPIPYPTopics::word_pyps_p0(const Term& term, int topic, int level) const { //for (int i=0; i<level+1; ++i) std::cerr << " "; - //std::cerr << "PYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ")" << std::endl; + //std::cerr << "MPIPYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ")" << std::endl; F p0 = m_term_p0; if (m_backoff.get()) { @@ -283,24 +321,24 @@ PYPTopics::F PYPTopics::word_pyps_p0(const Term& term, int topic, int level) con p0 = m_term_p0; } //for (int i=0; i<level+1; ++i) std::cerr << " "; - //std::cerr << "PYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ") = " << p0 << std::endl; + //std::cerr << "MPIPYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ") = " << p0 << std::endl; return p0; } -PYPTopics::F PYPTopics::prob(const Term& term, int topic, int level) const { +MPIPYPTopics::F MPIPYPTopics::prob(const Term& term, int topic, int level) const { //for (int i=0; i<level+1; ++i) std::cerr << " "; - //std::cerr << "PYPTopics::prob(" << term << "," << topic << "," << level << " " << factor << ")" << std::endl; + //std::cerr << "MPIPYPTopics::prob(" << term << "," << topic << "," << level << " " << factor << ")" << std::endl; F p0 = word_pyps_p0(term, topic, level); F p_w_k = m_word_pyps.at(level).at(topic).prob(term, p0); //for (int i=0; i<level+1; ++i) std::cerr << " "; - //std::cerr << "PYPTopics::prob(" << term << "," << topic << "," << level << ") = " << p_w_k << std::endl; + //std::cerr << "MPIPYPTopics::prob(" << term << "," << topic << "," << level << ") = " << p_w_k << std::endl; return p_w_k; } -int PYPTopics::max_topic() const { +int MPIPYPTopics::max_topic() const { if (!m_use_topic_pyp) return -1; @@ -317,8 +355,8 @@ int PYPTopics::max_topic() const { return current_topic; } -int PYPTopics::max(const DocumentId& doc) const { - //std::cerr << "PYPTopics::max(" << doc << "," << term << ")" << std::endl; +int MPIPYPTopics::max(const DocumentId& doc) const { + //std::cerr << "MPIPYPTopics::max(" << doc << "," << term << ")" << std::endl; // collect probs F current_max=0.0; int current_topic=-1; @@ -342,8 +380,8 @@ int PYPTopics::max(const DocumentId& doc) const { return current_topic; } -int PYPTopics::max(const DocumentId& doc, const Term& term) const { - //std::cerr << "PYPTopics::max(" << doc << "," << term << ")" << std::endl; +int MPIPYPTopics::max(const DocumentId& doc, const Term& term) const { + //std::cerr << "MPIPYPTopics::max(" << doc << "," << term << ")" << std::endl; // collect probs F current_max=0.0; int current_topic=-1; @@ -368,7 +406,7 @@ int PYPTopics::max(const DocumentId& doc, const Term& term) const { return current_topic; } -std::ostream& PYPTopics::print_document_topics(std::ostream& out) const { +std::ostream& MPIPYPTopics::print_document_topics(std::ostream& out) const { for (CorpusTopics::const_iterator corpusIt=m_corpus_topics.begin(); corpusIt != m_corpus_topics.end(); ++corpusIt) { int term_index=0; @@ -382,7 +420,7 @@ std::ostream& PYPTopics::print_document_topics(std::ostream& out) const { return out; } -std::ostream& PYPTopics::print_topic_terms(std::ostream& out) const { +std::ostream& MPIPYPTopics::print_topic_terms(std::ostream& out) const { for (PYPs::const_iterator pypsIt=m_word_pyps.front().begin(); pypsIt != m_word_pyps.front().end(); ++pypsIt) { int term_index=0; |