summaryrefslogtreecommitdiff
path: root/gi/pyp-topics/src/mpi-pyp-topics.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gi/pyp-topics/src/mpi-pyp-topics.cc')
-rw-r--r--gi/pyp-topics/src/mpi-pyp-topics.cc148
1 files changed, 93 insertions, 55 deletions
diff --git a/gi/pyp-topics/src/mpi-pyp-topics.cc b/gi/pyp-topics/src/mpi-pyp-topics.cc
index 2ad28278..4525302e 100644
--- a/gi/pyp-topics/src/mpi-pyp-topics.cc
+++ b/gi/pyp-topics/src/mpi-pyp-topics.cc
@@ -4,7 +4,7 @@
#include "mpi-pyp-topics.hh"
//#include <boost/date_time/posix_time/posix_time_types.hpp>
-void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
+void MPIPYPTopics::sample_corpus(const Corpus& corpus, int samples,
int freq_cutoff_start, int freq_cutoff_end,
int freq_cutoff_interval,
int max_contexts_per_document) {
@@ -23,33 +23,33 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
int local_documents = m_mpi_end - m_mpi_start;
-
if (!m_backoff.get()) {
m_word_pyps.clear();
- m_word_pyps.push_back(PYPs());
+ m_word_pyps.push_back(MPIPYPs());
}
if (m_am_root) std::cerr << "\n Training with " << m_word_pyps.size()-1 << " backoff level"
- << (m_word_pyps.size()==2 ? ":" : "s:") << std::endl;
+ << (m_word_pyps.size()>1 ? ":" : "s:") << std::endl;
- for (int i=0; i<(int)m_word_pyps.size(); ++i)
- {
+ for (int i=0; i<(int)m_word_pyps.size(); ++i) {
m_word_pyps.at(i).reserve(m_num_topics);
for (int j=0; j<m_num_topics; ++j)
- m_word_pyps.at(i).push_back(new PYP<int>(0.5, 1.0));
+ m_word_pyps.at(i).push_back(new MPIPYP<int>(0.5, 1.0));
}
if (m_am_root) std::cerr << std::endl;
- m_document_pyps.reserve(corpus.num_documents());
- for (int j=0; j<corpus.num_documents(); ++j)
+ m_document_pyps.reserve(local_documents);
+ //m_document_pyps.reserve(corpus.num_documents());
+ //for (int j=0; j<corpus.num_documents(); ++j)
+ for (int j=0; j<local_documents; ++j)
m_document_pyps.push_back(new PYP<int>(0.5, 1.0));
m_topic_p0 = 1.0/m_num_topics;
m_term_p0 = 1.0/corpus.num_types();
m_backoff_p0 = 1.0/corpus.num_documents();
- if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << " Terms: "
- << corpus.num_types() << std::endl;
+ if (m_am_root) std::cerr << " Documents: " << corpus.num_documents() << "("
+ << local_documents << ")" << " Terms: " << corpus.num_types() << std::endl;
int frequency_cutoff = freq_cutoff_start;
if (m_am_root) std::cerr << " Context frequency cutoff set to " << frequency_cutoff << std::endl;
@@ -57,13 +57,16 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
timer.Reset();
// Initialisation pass
int document_id=0, topic_counter=0;
- for (Corpus::const_iterator corpusIt=corpus.begin();
- corpusIt != corpus.end(); ++corpusIt, ++document_id) {
- m_corpus_topics.push_back(DocumentTopics(corpusIt->size(), 0));
+ for (int i=0; i<local_documents; ++i) {
+ document_id = i+m_mpi_start;
+
+ //for (Corpus::const_iterator corpusIt=corpus.begin();
+ // corpusIt != corpus.end(); ++corpusIt, ++document_id) {
+ m_corpus_topics.push_back(DocumentTopics(corpus.at(document_id).size(), 0));
int term_index=0;
- for (Document::const_iterator docIt=corpusIt->begin();
- docIt != corpusIt->end(); ++docIt, ++term_index) {
+ for (Document::const_iterator docIt=corpus.at(document_id).begin();
+ docIt != corpus.at(document_id).end(); ++docIt, ++term_index) {
topic_counter++;
Term term = *docIt;
@@ -80,21 +83,41 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
if (m_use_topic_pyp) {
F p0 = m_topic_pyp.prob(new_topic, m_topic_p0);
- int table_delta = m_document_pyps[document_id].increment(new_topic, p0);
+ int table_delta = m_document_pyps.at(i).increment(new_topic, p0);
if (table_delta)
m_topic_pyp.increment(new_topic, m_topic_p0);
}
- else m_document_pyps[document_id].increment(new_topic, m_topic_p0);
+ else m_document_pyps.at(i).increment(new_topic, m_topic_p0);
}
- m_corpus_topics[document_id][term_index] = new_topic;
+ m_corpus_topics.at(i).at(term_index) = new_topic;
+ }
+ }
+
+ // Synchronise the topic->word counds across the processes.
+ for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin();
+ levelIt != m_word_pyps.end(); ++levelIt) {
+ for (MPIPYPs::iterator pypIt=levelIt->begin();
+ pypIt != levelIt->end(); ++pypIt) {
+ if (!m_am_root) boost::mpi::communicator().barrier();
+ std::cerr << "Before Sync Process " << m_rank << ":";
+ pypIt->debug_info(std::cerr); std::cerr << std::endl;
+ if (m_am_root) boost::mpi::communicator().barrier();
+
+ pypIt->synchronise();
+
+ if (!m_am_root) boost::mpi::communicator().barrier();
+ std::cerr << "After Sync Process " << m_rank << ":";
+ pypIt->debug_info(std::cerr); std::cerr << std::endl;
+ if (m_am_root) boost::mpi::communicator().barrier();
}
}
+
if (m_am_root) std::cerr << " Initialized in " << timer.Elapsed() << " seconds\n";
int* randomDocIndices = new int[local_documents];
for (int i = 0; i < local_documents; ++i)
- randomDocIndices[i] = i+m_mpi_start;
+ randomDocIndices[i] = i;
// Sampling phase
for (int curr_sample=0; curr_sample < samples; ++curr_sample) {
@@ -110,8 +133,8 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
// Randomize the corpus indexing array
int tmp;
int processed_terms=0;
- for (int i = local_documents-1; i > 0; --i) {
- //i+1 since j \in [0,i] but rnd() \in [0,1)
+ for (int i = (local_documents-1); i > 0; --i) {
+ //i+1 since j \in [0,i] but rnd() \in [0,1)
int j = (int)(rnd() * (i+1));
assert(j >= 0 && j <= i);
tmp = randomDocIndices[i];
@@ -120,15 +143,17 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
// for each document in the corpus
- int document_id;
- for (int i=0; i<local_documents; ++i) {
- document_id = randomDocIndices[i];
+ for (int rand_doc=0; rand_doc<local_documents; ++rand_doc) {
+ int doc_index = randomDocIndices[rand_doc];
+ int document_id = doc_index + m_mpi_start;
+ const Document& doc = corpus.at(document_id);
// for each term in the document
int term_index=0;
- Document::const_iterator docEnd = corpus.at(document_id).end();
- for (Document::const_iterator docIt=corpus.at(document_id).begin();
+ Document::const_iterator docEnd = doc.end();
+ for (Document::const_iterator docIt=doc.begin();
docIt != docEnd; ++docIt, ++term_index) {
+
if (max_contexts_per_document && term_index > max_contexts_per_document)
break;
@@ -140,36 +165,49 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
processed_terms++;
// remove the prevous topic from the PYPs
- int current_topic = m_corpus_topics[document_id][term_index];
+ int current_topic = m_corpus_topics.at(doc_index).at(term_index);
// a negative label mean that term hasn't been sampled yet
if (current_topic >= 0) {
decrement(term, current_topic);
- int table_delta = m_document_pyps[document_id].decrement(current_topic);
+ int table_delta = m_document_pyps.at(doc_index).decrement(current_topic);
if (m_use_topic_pyp && table_delta < 0)
m_topic_pyp.decrement(current_topic);
}
// sample a new_topic
- int new_topic = sample(document_id, term);
+ int new_topic = sample(doc_index, term);
// add the new topic to the PYPs
- m_corpus_topics[document_id][term_index] = new_topic;
+ m_corpus_topics.at(doc_index).at(term_index) = new_topic;
increment(term, new_topic);
if (m_use_topic_pyp) {
F p0 = m_topic_pyp.prob(new_topic, m_topic_p0);
- int table_delta = m_document_pyps[document_id].increment(new_topic, p0);
+ int table_delta = m_document_pyps.at(doc_index).increment(new_topic, p0);
if (table_delta)
m_topic_pyp.increment(new_topic, m_topic_p0);
}
- else m_document_pyps[document_id].increment(new_topic, m_topic_p0);
+ else m_document_pyps.at(doc_index).increment(new_topic, m_topic_p0);
}
if (document_id && document_id % 10000 == 0) {
if (m_am_root) std::cerr << "."; std::cerr.flush();
}
}
m_world.barrier();
+ // Synchronise the topic->word counds across the processes.
+ for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin();
+ levelIt != m_word_pyps.end(); ++levelIt) {
+ for (MPIPYPs::iterator pypIt=levelIt->begin();
+ pypIt != levelIt->end(); ++pypIt) {
+ std::cerr << "Before Sync Process " << m_rank << ":";
+ pypIt->debug_info(std::cerr); std::cerr << std::endl;
+ pypIt->synchronise();
+ std::cerr << "After Sync Process " << m_rank << ":";
+ pypIt->debug_info(std::cerr); std::cerr << std::endl;
+ }
+ }
+
if (m_am_root) std::cerr << " ||| sampled " << processed_terms << " terms.";
if (curr_sample != 0 && curr_sample % 10 == 0) {
@@ -179,9 +217,9 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
// resample the hyperparamters
F log_p=0.0;
- for (std::vector<PYPs>::iterator levelIt=m_word_pyps.begin();
+ for (std::vector<MPIPYPs>::iterator levelIt=m_word_pyps.begin();
levelIt != m_word_pyps.end(); ++levelIt) {
- for (PYPs::iterator pypIt=levelIt->begin();
+ for (MPIPYPs::iterator pypIt=levelIt->begin();
pypIt != levelIt->end(); ++pypIt) {
pypIt->resample_prior();
log_p += pypIt->log_restaurant_prob();
@@ -206,7 +244,7 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
int k=0;
if (m_am_root) std::cerr << "Topics distribution: ";
std::cerr.precision(2);
- for (PYPs::iterator pypIt=m_word_pyps.front().begin();
+ for (MPIPYPs::iterator pypIt=m_word_pyps.front().begin();
pypIt != m_word_pyps.front().end(); ++pypIt, ++k) {
if (m_am_root && k % 5 == 0) std::cerr << std::endl << '\t';
if (m_am_root) std::cerr << "<" << k << ":" << pypIt->num_customers() << ","
@@ -220,8 +258,8 @@ void PYPTopics::sample_corpus(const Corpus& corpus, int samples,
}
-void PYPTopics::decrement(const Term& term, int topic, int level) {
- //std::cerr << "PYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;
+void MPIPYPTopics::decrement(const Term& term, int topic, int level) {
+ //std::cerr << "MPIPYPTopics::decrement(" << term << "," << topic << "," << level << ")" << std::endl;
m_word_pyps.at(level).at(topic).decrement(term);
if (m_backoff.get()) {
Term backoff_term = (*m_backoff)[term];
@@ -230,8 +268,8 @@ void PYPTopics::decrement(const Term& term, int topic, int level) {
}
}
-void PYPTopics::increment(const Term& term, int topic, int level) {
- //std::cerr << "PYPTopics::increment(" << term << "," << topic << "," << level << ")" << std::endl;
+void MPIPYPTopics::increment(const Term& term, int topic, int level) {
+ //std::cerr << "MPIPYPTopics::increment(" << term << "," << topic << "," << level << ")" << std::endl;
m_word_pyps.at(level).at(topic).increment(term, word_pyps_p0(term, topic, level));
if (m_backoff.get()) {
@@ -241,7 +279,7 @@ void PYPTopics::increment(const Term& term, int topic, int level) {
}
}
-int PYPTopics::sample(const DocumentId& doc, const Term& term) {
+int MPIPYPTopics::sample(const DocumentId& doc, const Term& term) {
// First pass: collect probs
F sum=0.0;
std::vector<F> sums;
@@ -252,7 +290,7 @@ int PYPTopics::sample(const DocumentId& doc, const Term& term) {
if (m_use_topic_pyp) topic_prob = m_topic_pyp.prob(k, m_topic_p0);
//F p_k_d = m_document_pyps[doc].prob(k, topic_prob);
- F p_k_d = m_document_pyps[doc].unnormalised_prob(k, topic_prob);
+ F p_k_d = m_document_pyps.at(doc).unnormalised_prob(k, topic_prob);
sum += (p_w_k*p_k_d);
sums.push_back(sum);
@@ -266,9 +304,9 @@ int PYPTopics::sample(const DocumentId& doc, const Term& term) {
assert(false);
}
-PYPTopics::F PYPTopics::word_pyps_p0(const Term& term, int topic, int level) const {
+MPIPYPTopics::F MPIPYPTopics::word_pyps_p0(const Term& term, int topic, int level) const {
//for (int i=0; i<level+1; ++i) std::cerr << " ";
- //std::cerr << "PYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ")" << std::endl;
+ //std::cerr << "MPIPYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ")" << std::endl;
F p0 = m_term_p0;
if (m_backoff.get()) {
@@ -283,24 +321,24 @@ PYPTopics::F PYPTopics::word_pyps_p0(const Term& term, int topic, int level) con
p0 = m_term_p0;
}
//for (int i=0; i<level+1; ++i) std::cerr << " ";
- //std::cerr << "PYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ") = " << p0 << std::endl;
+ //std::cerr << "MPIPYPTopics::word_pyps_p0(" << term << "," << topic << "," << level << ") = " << p0 << std::endl;
return p0;
}
-PYPTopics::F PYPTopics::prob(const Term& term, int topic, int level) const {
+MPIPYPTopics::F MPIPYPTopics::prob(const Term& term, int topic, int level) const {
//for (int i=0; i<level+1; ++i) std::cerr << " ";
- //std::cerr << "PYPTopics::prob(" << term << "," << topic << "," << level << " " << factor << ")" << std::endl;
+ //std::cerr << "MPIPYPTopics::prob(" << term << "," << topic << "," << level << " " << factor << ")" << std::endl;
F p0 = word_pyps_p0(term, topic, level);
F p_w_k = m_word_pyps.at(level).at(topic).prob(term, p0);
//for (int i=0; i<level+1; ++i) std::cerr << " ";
- //std::cerr << "PYPTopics::prob(" << term << "," << topic << "," << level << ") = " << p_w_k << std::endl;
+ //std::cerr << "MPIPYPTopics::prob(" << term << "," << topic << "," << level << ") = " << p_w_k << std::endl;
return p_w_k;
}
-int PYPTopics::max_topic() const {
+int MPIPYPTopics::max_topic() const {
if (!m_use_topic_pyp)
return -1;
@@ -317,8 +355,8 @@ int PYPTopics::max_topic() const {
return current_topic;
}
-int PYPTopics::max(const DocumentId& doc) const {
- //std::cerr << "PYPTopics::max(" << doc << "," << term << ")" << std::endl;
+int MPIPYPTopics::max(const DocumentId& doc) const {
+ //std::cerr << "MPIPYPTopics::max(" << doc << "," << term << ")" << std::endl;
// collect probs
F current_max=0.0;
int current_topic=-1;
@@ -342,8 +380,8 @@ int PYPTopics::max(const DocumentId& doc) const {
return current_topic;
}
-int PYPTopics::max(const DocumentId& doc, const Term& term) const {
- //std::cerr << "PYPTopics::max(" << doc << "," << term << ")" << std::endl;
+int MPIPYPTopics::max(const DocumentId& doc, const Term& term) const {
+ //std::cerr << "MPIPYPTopics::max(" << doc << "," << term << ")" << std::endl;
// collect probs
F current_max=0.0;
int current_topic=-1;
@@ -368,7 +406,7 @@ int PYPTopics::max(const DocumentId& doc, const Term& term) const {
return current_topic;
}
-std::ostream& PYPTopics::print_document_topics(std::ostream& out) const {
+std::ostream& MPIPYPTopics::print_document_topics(std::ostream& out) const {
for (CorpusTopics::const_iterator corpusIt=m_corpus_topics.begin();
corpusIt != m_corpus_topics.end(); ++corpusIt) {
int term_index=0;
@@ -382,7 +420,7 @@ std::ostream& PYPTopics::print_document_topics(std::ostream& out) const {
return out;
}
-std::ostream& PYPTopics::print_topic_terms(std::ostream& out) const {
+std::ostream& MPIPYPTopics::print_topic_terms(std::ostream& out) const {
for (PYPs::const_iterator pypsIt=m_word_pyps.front().begin();
pypsIt != m_word_pyps.front().end(); ++pypsIt) {
int term_index=0;