From c0eee37401d3731555346cb260330049b5dc99e7 Mon Sep 17 00:00:00 2001 From: philblunsom Date: Thu, 15 Jul 2010 03:39:32 +0000 Subject: working on the mpi version git-svn-id: https://ws10smt.googlecode.com/svn/trunk@257 ec762483-ff6d-05da-a07a-a48fb63a330f --- gi/pyp-topics/src/mpi-pyp-topics.cc | 122 +++++++++++++----------------------- 1 file changed, 44 insertions(+), 78 deletions(-) (limited to 'gi/pyp-topics/src/mpi-pyp-topics.cc') diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc index d2daad4f..2ad28278 100644 --- a/gi/pyp-topics/src/mpi-pyp-topics.cc +++ b/gi/pyp-topics/src/mpi-pyp-topics.cc @@ -1,3 +1,5 @@ +#include + #include "timing.h" #include "mpi-pyp-topics.hh" @@ -6,37 +8,51 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start, int freq_cutoff_end, int freq_cutoff_interval, int max_contexts_per_document) { + std::cout << "I am process " << m_rank << " of " << m_size << "." << std::endl; Timer timer; + std::cout << m_am_root << std::endl; + + int documents = corpus.num_documents(); + m_mpi_start = 0; + m_mpi_end = documents; + if (m_size != 1) { + assert(documents < std::numeric_limits::max()); + m_mpi_start = (documents / m_size) * m_rank; + if (m_rank == m_size-1) m_mpi_end = documents; + else m_mpi_end = (documents / m_size)*(m_rank+1); + } + int local_documents = m_mpi_end - m_mpi_start; + if (!m_backoff.get()) { m_word_pyps.clear(); m_word_pyps.push_back(PYPs()); } - std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" + if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; for (int i=0; i<(int)m_word_pyps.size(); ++i) { m_word_pyps.at(i).reserve(m_num_topics); for (int j=0; j(0.5, 1.0, m_seed)); + m_word_pyps.at(i).push_back(new PYP(0.5, 1.0)); } - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; m_document_pyps.reserve(corpus.num_documents()); for (int j=0; j(0.5, 1.0, m_seed)); + m_document_pyps.push_back(new PYP(0.5, 1.0)); m_topic_p0 = 1.0/m_num_topics; m_term_p0 = 1.0/corpus.num_types(); m_backoff_p0 = 1.0/corpus.num_documents(); - std::cerr << " Documents: " << corpus.num_documents() << " Terms: " + if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: " << corpus.num_types() << std::endl; int frequency_cutoff = freq_cutoff_start; - std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; timer.Reset(); // Initialisation pass @@ -74,11 +90,11 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, m_corpus_topics[document_id][term_index] = new_topic; } } - std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; + if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; - int* randomDocIndices = new int[corpus.num_documents()]; - for (int i = 0; i < corpus.num_documents(); ++i) - randomDocIndices[i] = i; + int* randomDocIndices = new int[local_documents]; + for (int i = 0; i < local_documents; ++i) + randomDocIndices[i] = i+m_mpi_start; // Sampling phase for (int curr_sample=0; curr_sample < samples; ++curr_sample) { @@ -86,16 +102,15 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, && curr_sample % freq_cutoff_interval == 1 && frequency_cutoff > freq_cutoff_end) { frequency_cutoff--; - std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; } - std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); + if (m_am_root) std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); // Randomize the corpus indexing array int tmp; int processed_terms=0; - for (int i = corpus.num_documents()-1; i > 0; --i) - { + for (int i = local_documents-1; i > 0; --i) { //i+1 since j \in [0,i] but rnd() \in [0,1) int j = (int)(rnd() * (i+1)); assert(j >= 0 && j <= i); @@ -106,7 +121,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, // for each document in the corpus int document_id; - for (int i=0; igetResult(); //blocks until worker done - log_p += wresult; - //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); - + for (PYPs::iterator pypIt=m_document_pyps.begin(); + pypIt != m_document_pyps.end(); ++pypIt) { + pypIt->resample_prior(); + log_p += pypIt->log_restaurant_prob(); } if (m_use_topic_pyp) { @@ -195,64 +200,25 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } std::cerr.precision(10); - std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; + if (m_am_root) std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; timer.Reset(); int k=0; - std::cerr << "Topics distribution: "; + if (m_am_root) std::cerr << "Topics distribution: "; std::cerr.precision(2); for (PYPs::iterator pypIt=m_word_pyps.front().begin(); pypIt != m_word_pyps.front().end(); ++pypIt, ++k) { - if (k % 5 == 0) std::cerr << std::endl << '\t'; - std::cerr << "<" << k << ":" << pypIt->num_customers() << "," + if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t'; + if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << "," << pypIt->num_types() << "," << m_topic_pyp.prob(k, m_topic_p0) << "> "; } std::cerr.precision(4); - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; } } delete [] randomDocIndices; } -PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) -{ - int resample_counter=0; - F log_p = 0.0; - PYPs::iterator pypIt = m_document_pyps.begin(); - PYPs::iterator end = m_document_pyps.end(); - pypIt += thread_id; -// std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); - - while (pypIt < end) - { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); - if (resample_counter++ % 5000 == 0) { - std::cerr << "."; std::cerr.flush(); - } - pypIt += num_threads; - } -// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); - - return log_p; -} - -//PYPTopics::F PYPTopics::hresample_topics() -//{ -// F log_p = 0.0; -// for (std::vector::iterator levelIt=m_word_pyps.begin(); -// levelIt != m_word_pyps.end(); ++levelIt) { -// for (PYPs::iterator pypIt=levelIt->begin(); -// pypIt != levelIt->end(); ++pypIt) { -// -// pypIt->resample_prior(); -// log_p += pypIt->log_restaurant_prob(); -// } -// } -// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); -// -// return log_p; -//} void PYPTopics::decrement(const Term& term, int topic, int level) { //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; -- cgit v1.2.3