summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src/mpi-pyp-topics.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp-topics.cc')
-rw-r--r--gi/pyp-topics/src/mpi-pyp-topics.cc122
1 files changed, 44 insertions, 78 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc
index d2daad4f..2ad28278 100644
--- a/gi/pyp-topics/src/mpi-pyp-topics.cc
+++ b/gi/pyp-topics/src/mpi-pyp-topics.cc
@@ -1,3 +1,5 @@
+#include <boost/mpi/communicator.hpp>
+
#include "timing.h"
#include "mpi-pyp-topics.hh"
@@ -6,37 +8,51 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
int freq_cutoff_start, int freq_cutoff_end,
int freq_cutoff_interval,
int max_contexts_per_document) {
+ std::cout << "I am process " << m_rank << " of " << m_size << "." << std::endl;
Timer timer;
+ std::cout << m_am_root << std::endl;
+
+ int documents = corpus.num_documents();
+ m_mpi_start = 0;
+ m_mpi_end = documents;
+ if (m_size != 1) {
+ assert(documents < std::numeric_limits<int>::max());
+ m_mpi_start = (documents / m_size) * m_rank;
+ if (m_rank == m_size-1) m_mpi_end = documents;
+ else m_mpi_end = (documents / m_size)*(m_rank+1);
+ }
+ int local_documents = m_mpi_end - m_mpi_start;
+
if (!m_backoff.get()) {
m_word_pyps.clear();
m_word_pyps.push_back(PYPs());
}
- std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level"
+ if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level"
<< (m_word_pyps.size()==2 ? ":" : "s:") << std::endl;
for (int i=0; i<(int)m_word_pyps.size(); ++i)
{
m_word_pyps.at(i).reserve(m_num_topics);
for (int j=0; j<m_num_topics; ++j)
- m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0, m_seed));
+ m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0));
}
- std::cerr << std::endl;
+ if (m_am_root) std::cerr << std::endl;
m_document_pyps.reserve(corpus.num_documents());
for (int j=0; j<corpus.num_documents(); ++j)
- m_document_pyps.push_back(new PYP<int>(0.5, 1.0, m_seed));
+ m_document_pyps.push_back(new PYP<int>(0.5, 1.0));
m_topic_p0 = 1.0/m_num_topics;
m_term_p0 = 1.0/corpus.num_types();
m_backoff_p0 = 1.0/corpus.num_documents();
- std::cerr << " Documents: " << corpus.num_documents() << " Terms: "
+ if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: "
<< corpus.num_types() << std::endl;
int frequency_cutoff = freq_cutoff_start;
- std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl;
+ if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl;
timer.Reset();
// Initialisation pass
@@ -74,11 +90,11 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
m_corpus_topics[document_id][term_index] = new_topic;
}
}
- std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n";
+ if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n";
- int* randomDocIndices = new int[corpus.num_documents()];
- for (int i = 0; i < corpus.num_documents(); ++i)
- randomDocIndices[i] = i;
+ int* randomDocIndices = new int[local_documents];
+ for (int i = 0; i < local_documents; ++i)
+ randomDocIndices[i] = i+m_mpi_start;
// Sampling phase
for (int curr_sample=0; curr_sample < samples; ++curr_sample) {
@@ -86,16 +102,15 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
&& curr_sample % freq_cutoff_interval == 1
&& frequency_cutoff > freq_cutoff_end) {
frequency_cutoff--;
- std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl;
+ if (m_am_root) std::cerr << "\n Context frequency cutoff set to " << frequency_cutoff << std::endl;
}
- std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush();
+ if (m_am_root) std::cerr << "\n -- Sample " << curr_sample << " "; std::cerr.flush();
// Randomize the corpus indexing array
int tmp;
int processed_terms=0;
- for (int i = corpus.num_documents()-1; i > 0; --i)
- {
+ for (int i = local_documents-1; i > 0; --i) {
//i+1 since j \in [0,i] but rnd() \in [0,1)
int j = (int)(rnd() * (i+1));
assert(j >= 0 && j <= i);
@@ -106,7 +121,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
// for each document in the corpus
int document_id;
- for (int i=0; i<corpus.num_documents(); ++i) {
+ for (int i=0; i<local_documents; ++i) {
document_id = randomDocIndices[i];
// for each term in the document
@@ -151,15 +166,16 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
else m_document_pyps[document_id].increment(new_topic, m_topic_p0);
}
if (document_id && document_id % 10000 == 0) {
- std::cerr << "."; std::cerr.flush();
+ if (m_am_root) std::cerr << "."; std::cerr.flush();
}
}
- std::cerr << " ||| sampled " << processed_terms << " terms.";
+ m_world.barrier();
+ if (m_am_root) std::cerr << " ||| sampled " << processed_terms << " terms.";
if (curr_sample != 0 && curr_sample % 10 == 0) {
- std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;
+ if (m_am_root) std::cerr << " ||| time=" << (timer.Elapsed() / 10.0) << " sec/sample" << std::endl;
timer.Reset();
- std::cerr << " ... Resampling hyperparameters (" << max_threads << " threads)"; std::cerr.flush();
+ if (m_am_root) std::cerr << " ... Resampling hyperparameters"; std::cerr.flush();
// resample the hyperparamters
F log_p=0.0;
@@ -172,21 +188,10 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
}
- WorkerPtrVect workers;
- for (int i = 0; i < max_threads; ++i)
- {
- JobReturnsF job = boost::bind(&PYPTopics::hresample_docs, this, max_threads, i);
- workers.push_back(new SimpleResampleWorker(job));
- }
-
- WorkerPtrVect::iterator workerIt;
- for (workerIt = workers.begin(); workerIt != workers.end(); ++workerIt)
- {
- //std::cerr << "Retrieving worker result.."; std::cerr.flush();
- F wresult = workerIt->getResult(); //blocks until worker done
- log_p += wresult;
- //std::cerr << ".. got " << wresult << std::endl; std::cerr.flush();
-
+ for (PYPs::iterator pypIt=m_document_pyps.begin();
+ pypIt != m_document_pyps.end(); ++pypIt) {
+ pypIt->resample_prior();
+ log_p += pypIt->log_restaurant_prob();
}
if (m_use_topic_pyp) {
@@ -195,64 +200,25 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
std::cerr.precision(10);
- std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl;
+ if (m_am_root) std::cerr << " ||| LLH=" << log_p << " ||| resampling time=" << timer.Elapsed() << " sec" << std::endl;
timer.Reset();
int k=0;
- std::cerr << "Topics distribution: ";
+ if (m_am_root) std::cerr << "Topics distribution: ";
std::cerr.precision(2);
for (PYPs::iterator pypIt=m_word_pyps.front().begin();
pypIt != m_word_pyps.front().end(); ++pypIt, ++k) {
- if (k % 5 == 0) std::cerr << std::endl << '\t';
- std::cerr << "<" << k << ":" << pypIt->num_customers() << ","
+ if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t';
+ if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << ","
<< pypIt->num_types() << "," << m_topic_pyp.prob(k, m_topic_p0) << "> ";
}
std::cerr.precision(4);
- std::cerr << std::endl;
+ if (m_am_root) std::cerr << std::endl;
}
}
delete [] randomDocIndices;
}
-PYPTopics::F PYPTopics::hresample_docs(int num_threads, int thread_id)
-{
- int resample_counter=0;
- F log_p = 0.0;
- PYPs::iterator pypIt = m_document_pyps.begin();
- PYPs::iterator end = m_document_pyps.end();
- pypIt += thread_id;
-// std::cerr << thread_id << " started " << std::endl; std::cerr.flush();
-
- while (pypIt < end)
- {
- pypIt->resample_prior();
- log_p += pypIt->log_restaurant_prob();
- if (resample_counter++ % 5000 == 0) {
- std::cerr << "."; std::cerr.flush();
- }
- pypIt += num_threads;
- }
-// std::cerr << thread_id << " did " << resample_counter << " with answer " << log_p << std::endl; std::cerr.flush();
-
- return log_p;
-}
-
-//PYPTopics::F PYPTopics::hresample_topics()
-//{
-// F log_p = 0.0;
-// for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
-// levelIt != m_word_pyps.end(); ++levelIt) {
-// for (PYPs::iterator pypIt=levelIt->begin();
-// pypIt != levelIt->end(); ++pypIt) {
-//
-// pypIt->resample_prior();
-// log_p += pypIt->log_restaurant_prob();
-// }
-// }
-// //std::cerr << "topicworker has answer " << log_p << std::endl; std::cerr.flush();
-//
-// return log_p;
-//}
void PYPTopics::decrement(const Term& term, int topic, int level) {
//std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;