diff options
author | philblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-07-15 03:39:32 +0000 |
---|---|---|
committer | philblunsom <philblunsom@ec762483-ff6d-05da-a07a-a48fb63a330f> | 2010-07-15 03:39:32 +0000 |
commit | c0eee37401d3731555346cb260330049b5dc99e7 (patch) | |
tree | a34bcd2309c23ca4d02ab3938f888a349aad12b2 /gi/pyp-topics/src/mpi-pyp-topics.cc | |
parent | a5b1162c68d6ff5bc52b646efb563a0077cd1d2a (diff) |
working on the mpi version
git-svn-id: https://ws10smt.googlecode.com/svn/trunk@257 ec762483-ff6d-05da-a07a-a48fb63a330f
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp-topics.cc')
-rw-r--r-- | gi/pyp-topics/src/mpi-pyp-topics.cc | 122 |
1 files changed, 44 insertions, 78 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc index d2daad4f..2ad28278 100644 --- a/gi/pyp-topics/src/mpi-pyp-topics.cc +++ b/gi/pyp-topics/src/mpi-pyp-topics.cc @@ -1,3 +1,5 @@ +#include <boost/mpi/communicator.hpp> + #include "timing.h" #include "mpi-pyp-topics.hh" @@ -6,37 +8,51 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, int freq_cutoff_start, int freq_cutoff_end, int freq_cutoff_interval, int max_contexts_per_document) { + std::cout << "I am process " << m_rank << " of " << m_size << "." << std::endl; Timer timer; + std::cout << m_am_root << std::endl; + + int documents = corpus.num_documents(); + m_mpi_start = 0; + m_mpi_end = documents; + if (m_size != 1) { + assert(documents < std::numeric_limits<int>::max()); + m_mpi_start = (documents / m_size) * m_rank; + if (m_rank == m_size-1) m_mpi_end = documents; + else m_mpi_end = (documents / m_size)*(m_rank+1); + } + int local_documents = m_mpi_end - m_mpi_start; + if (!m_backoff.get()) { m_word_pyps.clear(); m_word_pyps.push_back(PYPs()); } - std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" + if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level" << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl; for (int i=0; i<(int)m_word_pyps.size(); ++i) { m_word_pyps.at(i).reserve(m_num_topics); for (int j=0; j<m_num_topics; ++j) - m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0, m_seed)); + m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0)); } - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; m_document_pyps.reserve(corpus.num_documents()); for (int j=0; j<corpus.num_documents(); ++j) - m_document_pyps.push_back(new PYP<int>(0.5, 1.0, m_seed)); + m_document_pyps.push_back(new PYP<int>(0.5, 1.0)); m_topic_p0 = 1.0/m_num_topics; m_term_p0 = 1.0/corpus.num_types(); m_backoff_p0 = 1.0/corpus.num_documents(); - std::cerr << " Documents: " << corpus.num_documents() << " Terms: " + if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: " << corpus.num_types() << std::endl; int frequency_cutoff = freq_cutoff_start; - std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl; timer.Reset(); // Initialisation pass @@ -74,11 +90,11 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, m_corpus_topics[document_id][term_index] = new_topic; } } - std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; + if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n"; - int* randomDocIndices = new int[corpus.num_documents()]; - for (int i = 0; i < corpus.num_documents(); ++i) - randomDocIndices[i] = i; + int* randomDocIndices = new int[local_documents]; + for (int i = 0; i < local_documents; ++i) + randomDocIndices[i] = i+m_mpi_start; // Sampling phase for (int curr_sample=0; curr_sample < samples; ++curr_sample) { @@ -86,16 +102,15 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, && curr_sample % freq_cutoff_interval == 1 && frequency_cutoff > freq_cutoff_end) { frequency_cutoff--; - std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; + if (m_am_root) std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl; } - std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); + if (m_am_root) std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush(); // Randomize the corpus indexing array int tmp; int processed_terms=0; - for (int i = corpus.num_documents()-1; i > 0; --i) - { + for (int i = local_documents-1; i > 0; --i) { //i+1 since j \in [0,i] but rnd() \in [0,1) int j = (int)(rnd() * (i+1)); assert(j >= 0 && j <= i); @@ -106,7 +121,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, // for each document in the corpus int document_id; - for (int i=0; i<corpus.num_documents(); ++i) { + for (int i=0; i<local_documents; ++i) { document_id = randomDocIndices[i]; // for each term in the document @@ -151,15 +166,16 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, else m_document_pyps[document_id].increment(new_topic, m_topic_p0); } if (document_id && document_id % 10000 == 0) { - std::cerr << "."; std::cerr.flush(); + if (m_am_root) std::cerr << "."; std::cerr.flush(); } } - std::cerr << " ||| sampled " << processed_terms << " terms."; + m_world.barrier(); + if (m_am_root) std::cerr << " ||| sampled " << processed_terms << " terms."; if (curr_sample != 0 && curr_sample % 10 == 0) { - std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl; + if (m_am_root) std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl; timer.Reset(); - std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush(); + if (m_am_root) std::cerr << " ... Resampling hyperparameters"; std::cerr.flush(); // resample the hyperparamters F log_p=0.0; @@ -172,21 +188,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } } - WorkerPtrVect workers; - for (int i = 0; i < max_threads; ++i) - { - JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i); - workers.push_back(new SimpleResampleWorker(job)); - } - - WorkerPtrVect::iterator workerIt; - for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt) - { - //std::cerr << "Retrieving worker result.."; std::cerr.flush(); - F wresult = workerIt->getResult(); //blocks until worker done - log_p += wresult; - //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush(); - + for (PYPs::iterator pypIt=m_document_pyps.begin(); + pypIt != m_document_pyps.end(); ++pypIt) { + pypIt->resample_prior(); + log_p += pypIt->log_restaurant_prob(); } if (m_use_topic_pyp) { @@ -195,64 +200,25 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples, } std::cerr.precision(10); - std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; + if (m_am_root) std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl; timer.Reset(); int k=0; - std::cerr << "Topics distribution: "; + if (m_am_root) std::cerr << "Topics distribution: "; std::cerr.precision(2); for (PYPs::iterator pypIt=m_word_pyps.front().begin(); pypIt != m_word_pyps.front().end(); ++pypIt, ++k) { - if (k % 5 == 0) std::cerr << std::endl << '\t'; - std::cerr << "<" << k << ":" << pypIt->num_customers() << "," + if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t'; + if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << "," << pypIt->num_types() << "," << m_topic_pyp.prob(k, m_topic_p0) << "> "; } std::cerr.precision(4); - std::cerr << std::endl; + if (m_am_root) std::cerr << std::endl; } } delete [] randomDocIndices; } -PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id) -{ - int resample_counter=0; - F log_p = 0.0; - PYPs::iterator pypIt = m_document_pyps.begin(); - PYPs::iterator end = m_document_pyps.end(); - pypIt += thread_id; -// std::cerr << thread_id << " started " << std::endl; std::cerr.flush(); - - while (pypIt < end) - { - pypIt->resample_prior(); - log_p += pypIt->log_restaurant_prob(); - if (resample_counter++ % 5000 == 0) { - std::cerr << "."; std::cerr.flush(); - } - pypIt += num_threads; - } -// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush(); - - return log_p; -} - -//PYPTopics::F PYPTopics::hresample_topics() -//{ -// F log_p = 0.0; -// for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin(); -// levelIt != m_word_pyps.end(); ++levelIt) { -// for (PYPs::iterator pypIt=levelIt->begin(); -// pypIt != levelIt->end(); ++pypIt) { -// -// pypIt->resample_prior(); -// log_p += pypIt->log_restaurant_prob(); -// } -// } -// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush(); -// -// return log_p; -//} void PYPTopics::decrement(const Term& term, int topic, int level) { //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl; |